Files
Timothy Jaeryang Baek 6fce92aa12 chore: format
2026-06-01 13:56:55 -07:00

1585 lines
51 KiB
Python

from __future__ import annotations
import asyncio
import io
import logging
import zipfile
from typing import List, Optional
from urllib.parse import quote
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from fastapi.responses import StreamingResponse
from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL
from open_webui.constants import ERROR_MESSAGES
from open_webui.internal.db import get_async_session
from open_webui.models.access_grants import AccessGrants
from open_webui.models.files import FileMetadataResponse, FileModel, FileModelResponse, Files
from open_webui.models.groups import Groups
from open_webui.models.knowledge import (
KnowledgeDirectoryForm,
KnowledgeDirectoryModel,
KnowledgeFileListResponse,
KnowledgeForm,
KnowledgeResponse,
Knowledges,
KnowledgeUserResponse,
)
from open_webui.models.models import ModelForm, Models
from open_webui.retrieval.vector.async_client import ASYNC_VECTOR_DB_CLIENT
from open_webui.routers.retrieval import (
BatchProcessFilesForm,
ProcessFileForm,
process_file,
process_files_batch,
)
from open_webui.storage.provider import Storage
from open_webui.utils.access_control import filter_allowed_access_grants, has_permission
from open_webui.utils.access_control.files import has_access_to_file
from open_webui.utils.auth import get_admin_user, get_verified_user
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
log = logging.getLogger(__name__)
router = APIRouter()
############################
# getKnowledgeBases
############################
PAGE_ITEM_COUNT = 30
############################
# Knowledge Base Embedding
############################
# Knowledge that sits unread serves no one. Let what is
# stored here find the ones who need it.
KNOWLEDGE_BASES_COLLECTION = 'knowledge-bases'
async def embed_knowledge_base_metadata(
request: Request,
knowledge_base_id: str,
name: str,
description: str,
) -> bool:
"""Generate and store embedding for knowledge base."""
try:
content = f'{name}\n\n{description}' if description else name
embedding = await request.app.state.EMBEDDING_FUNCTION(content)
await ASYNC_VECTOR_DB_CLIENT.upsert(
collection_name=KNOWLEDGE_BASES_COLLECTION,
items=[
{
'id': knowledge_base_id,
'text': content,
'vector': embedding,
'metadata': {
'knowledge_base_id': knowledge_base_id,
},
}
],
)
return True
except Exception as e:
log.error(f'Failed to embed knowledge base {knowledge_base_id}: {e}')
return False
async def remove_knowledge_base_metadata_embedding(knowledge_base_id: str) -> bool:
"""Remove knowledge base embedding."""
try:
await ASYNC_VECTOR_DB_CLIENT.delete(
collection_name=KNOWLEDGE_BASES_COLLECTION,
ids=[knowledge_base_id],
)
return True
except Exception as e:
log.debug(f'Failed to remove embedding for {knowledge_base_id}: {e}')
return False
class KnowledgeAccessResponse(KnowledgeUserResponse):
write_access: bool | None = False
class KnowledgeAccessListResponse(BaseModel):
items: list[KnowledgeAccessResponse]
total: int
@router.get('/', response_model=KnowledgeAccessListResponse)
async def get_knowledge_bases(
page: int | None = 1,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
page = max(page, 1)
limit = PAGE_ITEM_COUNT
skip = (page - 1) * limit
filter = {}
groups = await Groups.get_groups_by_member_id(user.id, db=db)
user_group_ids = {group.id for group in groups}
if not user.role == 'admin' or not BYPASS_ADMIN_ACCESS_CONTROL:
if groups:
filter['group_ids'] = [group.id for group in groups]
filter['user_id'] = user.id
result = await Knowledges.search_knowledge_bases(user.id, filter=filter, skip=skip, limit=limit, db=db)
# Batch-fetch writable knowledge IDs in a single query instead of N has_access calls
knowledge_base_ids = [knowledge_base.id for knowledge_base in result.items]
writable_knowledge_base_ids = await AccessGrants.get_accessible_resource_ids(
user_id=user.id,
resource_type='knowledge',
resource_ids=knowledge_base_ids,
permission='write',
user_group_ids=user_group_ids,
db=db,
)
return KnowledgeAccessListResponse(
items=[
KnowledgeAccessResponse(
**knowledge_base.model_dump(),
write_access=(
user.id == knowledge_base.user_id
or (user.role == 'admin' and BYPASS_ADMIN_ACCESS_CONTROL)
or knowledge_base.id in writable_knowledge_base_ids
),
)
for knowledge_base in result.items
],
total=result.total,
)
@router.get('/search', response_model=KnowledgeAccessListResponse)
async def search_knowledge_bases(
query: str | None = None,
view_option: str | None = None,
page: int | None = 1,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
page = max(page, 1)
limit = PAGE_ITEM_COUNT
skip = (page - 1) * limit
filter = {}
if query:
filter['query'] = query
if view_option:
filter['view_option'] = view_option
groups = await Groups.get_groups_by_member_id(user.id, db=db)
user_group_ids = {group.id for group in groups}
if not user.role == 'admin' or not BYPASS_ADMIN_ACCESS_CONTROL:
if groups:
filter['group_ids'] = [group.id for group in groups]
filter['user_id'] = user.id
result = await Knowledges.search_knowledge_bases(user.id, filter=filter, skip=skip, limit=limit, db=db)
# Batch-fetch writable knowledge IDs in a single query instead of N has_access calls
knowledge_base_ids = [knowledge_base.id for knowledge_base in result.items]
writable_knowledge_base_ids = await AccessGrants.get_accessible_resource_ids(
user_id=user.id,
resource_type='knowledge',
resource_ids=knowledge_base_ids,
permission='write',
user_group_ids=user_group_ids,
db=db,
)
return KnowledgeAccessListResponse(
items=[
KnowledgeAccessResponse(
**knowledge_base.model_dump(),
write_access=(
user.id == knowledge_base.user_id
or (user.role == 'admin' and BYPASS_ADMIN_ACCESS_CONTROL)
or knowledge_base.id in writable_knowledge_base_ids
),
)
for knowledge_base in result.items
],
total=result.total,
)
@router.get('/search/files', response_model=KnowledgeFileListResponse)
async def search_knowledge_files(
query: str | None = None,
include_content: bool = Query(False, description='Include file content in search (expensive).'),
page: int | None = 1,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
page = max(page, 1)
limit = PAGE_ITEM_COUNT
skip = (page - 1) * limit
filter = {}
if query:
filter['query'] = query
if include_content:
filter['include_content'] = True
groups = await Groups.get_groups_by_member_id(user.id, db=db)
if groups:
filter['group_ids'] = [group.id for group in groups]
filter['user_id'] = user.id
return await Knowledges.search_knowledge_files(filter=filter, skip=skip, limit=limit, db=db)
############################
# CreateNewKnowledge
############################
@router.post('/create', response_model=KnowledgeResponse | None)
async def create_new_knowledge(
request: Request,
form_data: KnowledgeForm,
user=Depends(get_verified_user),
):
# NOTE: We intentionally do NOT use Depends(get_async_session) here.
# Database operations (has_permission, filter_allowed_access_grants, insert_new_knowledge) manage their own sessions.
# This prevents holding a connection during embed_knowledge_base_metadata()
# which makes external embedding API calls (1-5+ seconds).
if user.role != 'admin' and not await has_permission(
user.id, 'workspace.knowledge', request.app.state.config.USER_PERMISSIONS
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
form_data.access_grants = await filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
'sharing.public_knowledge',
)
knowledge = await Knowledges.insert_new_knowledge(user.id, form_data)
if knowledge:
# Embed knowledge base for semantic search
await embed_knowledge_base_metadata(
request,
knowledge.id,
knowledge.name,
knowledge.description,
)
return knowledge
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.FILE_EXISTS,
)
############################
# ReindexKnowledgeFiles
############################
@router.post('/reindex', response_model=bool)
async def reindex_knowledge_files(
request: Request,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
if user.role != 'admin':
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
knowledge_bases = await Knowledges.get_knowledge_bases(db=db)
log.info(f'Starting reindexing for {len(knowledge_bases)} knowledge bases')
for knowledge_base in knowledge_bases:
try:
files = await Knowledges.get_files_by_id(knowledge_base.id, db=db)
try:
if await ASYNC_VECTOR_DB_CLIENT.has_collection(collection_name=knowledge_base.id):
await ASYNC_VECTOR_DB_CLIENT.delete_collection(collection_name=knowledge_base.id)
except Exception as e:
log.error(f'Error deleting collection {knowledge_base.id}: {str(e)}')
continue # Skip, don't raise
failed_files = []
for file in files:
try:
await process_file(
request,
ProcessFileForm(file_id=file.id, collection_name=knowledge_base.id),
user=user,
db=db,
)
except Exception as e:
log.error(f'Error processing file {file.filename} (ID: {file.id}): {str(e)}')
failed_files.append({'file_id': file.id, 'error': str(e)})
continue
except Exception as e:
log.error(f'Error processing knowledge base {knowledge_base.id}: {str(e)}')
# Don't raise, just continue
continue
if failed_files:
log.warning(f'Failed to process {len(failed_files)} files in knowledge base {knowledge_base.id}')
for failed in failed_files:
log.warning(f'File ID: {failed["file_id"]}, Error: {failed["error"]}')
log.info(f'Reindexing completed.')
return True
############################
# ReindexKnowledgeBases
############################
@router.post('/metadata/reindex', response_model=dict)
async def reindex_knowledge_base_metadata_embeddings(
request: Request,
user=Depends(get_admin_user),
):
"""Batch embed all existing knowledge bases. Admin only.
NOTE: We intentionally do NOT use Depends(get_async_session) here.
This endpoint loops through ALL knowledge bases and calls embed_knowledge_base_metadata()
for each one, making N external embedding API calls. Holding a session during
this entire operation would exhaust the connection pool.
"""
knowledge_bases = await Knowledges.get_knowledge_bases()
log.info(f'Reindexing embeddings for {len(knowledge_bases)} knowledge bases')
success_count = 0
for kb in knowledge_bases:
if await embed_knowledge_base_metadata(request, kb.id, kb.name, kb.description):
success_count += 1
log.info(f'Embedding reindex complete: {success_count}/{len(knowledge_bases)}')
return {'total': len(knowledge_bases), 'success': success_count}
############################
# GetKnowledgeById
############################
class KnowledgeFilesResponse(KnowledgeResponse):
files: list[FileMetadataResponse | None] = None
write_access: bool | None = False
@router.get('/{id}', response_model=KnowledgeFilesResponse | None)
async def get_knowledge_by_id(id: str, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)):
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if knowledge:
if (
user.role == 'admin'
or knowledge.user_id == user.id
or await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='read',
db=db,
)
):
return KnowledgeFilesResponse(
**knowledge.model_dump(),
write_access=(
user.id == knowledge.user_id
or (user.role == 'admin' and BYPASS_ADMIN_ACCESS_CONTROL)
or await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
db=db,
)
),
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# UpdateKnowledgeById
############################
@router.post('/{id}/update', response_model=KnowledgeFilesResponse | None)
async def update_knowledge_by_id(
request: Request,
id: str,
form_data: KnowledgeForm,
user=Depends(get_verified_user),
):
# NOTE: We intentionally do NOT use Depends(get_async_session) here.
# Database operations manage their own short-lived sessions internally.
# This prevents holding a connection during embed_knowledge_base_metadata()
# which makes external embedding API calls (1-5+ seconds).
knowledge = await Knowledges.get_knowledge_by_id(id=id)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Is the user the original creator, in a group with write access, or an admin
if (
knowledge.user_id != user.id
and not await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
)
and user.role != 'admin'
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
form_data.access_grants = await filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
'sharing.public_knowledge',
)
knowledge = await Knowledges.update_knowledge_by_id(id=id, form_data=form_data)
if knowledge:
# Re-embed knowledge base for semantic search
await embed_knowledge_base_metadata(
request,
knowledge.id,
knowledge.name,
knowledge.description,
)
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=await Knowledges.get_file_metadatas_by_id(knowledge.id),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ID_TAKEN,
)
############################
# UpdateKnowledgeAccessById
############################
class KnowledgeAccessGrantsForm(BaseModel):
access_grants: list[dict]
@router.post('/{id}/access/update', response_model=KnowledgeFilesResponse | None)
async def update_knowledge_access_by_id(
request: Request,
id: str,
form_data: KnowledgeAccessGrantsForm,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
db=db,
)
and user.role != 'admin'
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
form_data.access_grants = await filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
'sharing.public_knowledge',
)
knowledge.access_grants = await AccessGrants.set_access_grants('knowledge', id, form_data.access_grants, db=db)
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=await Knowledges.get_file_metadatas_by_id(id, db=db),
)
############################
# GetPendingKnowledgeFiles
############################
@router.get('/{id}/files/pending')
async def get_pending_knowledge_files(
id: str,
stream: bool = Query(False),
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
"""Return files that are being processed for this knowledge base but not yet linked.
After a file is uploaded with ``knowledge_id`` in its metadata, the backend
processes it in a background task before linking it to the ``knowledge_file``
join table. During this window the file is invisible to the normal file
list endpoint. This endpoint exposes those in-flight files so the frontend
can show them with a processing indicator even after a page reload.
When ``stream=true``, returns an SSE stream that polls every 3 seconds
and emits the current pending file list. Closes when no files remain.
"""
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if not (
user.role == 'admin'
or knowledge.user_id == user.id
or await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='read',
db=db,
)
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
if not stream:
return await Files.get_pending_files_for_knowledge(id, db=db)
async def event_stream(knowledge_id: str):
MAX_POLL_DURATION = 3600 # 1 hour max
for _ in range(MAX_POLL_DURATION // 3):
pending = await Files.get_pending_files_for_knowledge(knowledge_id)
data = [f.model_dump() for f in pending]
yield f'data: {json.dumps(data)}\n\n'
if len(pending) == 0:
break
await asyncio.sleep(3)
return StreamingResponse(
event_stream(id),
media_type='text/event-stream',
)
############################
# GetKnowledgeFilesById
############################
@router.get('/{id}/files', response_model=KnowledgeFileListResponse)
async def get_knowledge_files_by_id(
id: str,
query: str | None = None,
include_content: bool = Query(False, description='Include file content in search (expensive).'),
view_option: str | None = None,
order_by: str | None = None,
direction: str | None = None,
directory_id: str | None = Query(None, description='Filter by directory ID. Pass empty string for root.'),
page: int | None = 1,
limit: int | None = Query(None, description='Page size (admin only). Defaults to 30.'),
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if not (
user.role == 'admin'
or knowledge.user_id == user.id
or await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='read',
db=db,
)
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
page = max(page, 1)
# Allow admins to configure page size; non-admins always get the default
if user.role == 'admin' and limit is not None:
limit = max(1, limit)
else:
limit = PAGE_ITEM_COUNT
skip = (page - 1) * limit
filter = {}
if query:
filter['query'] = query
if include_content:
filter['include_content'] = True
if view_option:
filter['view_option'] = view_option
if order_by:
filter['order_by'] = order_by
if direction:
filter['direction'] = direction
# directory_id filtering: present in filter = scope to that directory (None = root)
if directory_id is not None:
filter['directory_id'] = directory_id if directory_id else None
return await Knowledges.search_files_by_id(id, user.id, filter=filter, skip=skip, limit=limit, db=db)
############################
# AddFileToKnowledge
############################
class KnowledgeFileIdForm(BaseModel):
file_id: str
directory_id: Optional[str] = None
@router.post('/{id}/file/add', response_model=KnowledgeFilesResponse | None)
async def add_file_to_knowledge_by_id(
request: Request,
id: str,
form_data: KnowledgeFileIdForm,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
db=db,
)
and user.role != 'admin'
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
file = await Files.get_file_by_id(form_data.file_id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if not file.data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.FILE_NOT_PROCESSED,
)
# KB write-access alone is not enough — caller must also be able to read the file.
if file.user_id != user.id and user.role != 'admin':
if not await has_access_to_file(file.id, 'read', user, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# Add content to the vector database
try:
await process_file(
request,
ProcessFileForm(file_id=form_data.file_id, collection_name=id),
user=user,
db=db,
)
# Add file to knowledge base
await Knowledges.add_file_to_knowledge_by_id(
knowledge_id=id,
file_id=form_data.file_id,
user_id=user.id,
directory_id=form_data.directory_id,
db=db,
)
except Exception as e:
log.debug(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
if knowledge:
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=await Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.post('/{id}/file/update', response_model=KnowledgeFilesResponse | None)
async def update_file_from_knowledge_by_id(
request: Request,
id: str,
form_data: KnowledgeFileIdForm,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
db=db,
)
and user.role != 'admin'
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
file = await Files.get_file_by_id(form_data.file_id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Validate the file actually belongs to this knowledge base
if not await Knowledges.has_file(knowledge_id=id, file_id=form_data.file_id, db=db):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Remove content from the vector database
await ASYNC_VECTOR_DB_CLIENT.delete(collection_name=knowledge.id, filter={'file_id': form_data.file_id})
# Add content to the vector database
try:
await process_file(
request,
ProcessFileForm(file_id=form_data.file_id, collection_name=id),
user=user,
db=db,
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
if knowledge:
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=await Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# RemoveFileFromKnowledge
############################
@router.post('/{id}/file/remove', response_model=KnowledgeFilesResponse | None)
async def remove_file_from_knowledge_by_id(
id: str,
form_data: KnowledgeFileIdForm,
delete_file: bool = Query(True),
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
db=db,
)
and user.role != 'admin'
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
file = await Files.get_file_by_id(form_data.file_id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Validate the file actually belongs to this knowledge base
if not await Knowledges.has_file(knowledge_id=id, file_id=form_data.file_id, db=db):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
await Knowledges.remove_file_from_knowledge_by_id(knowledge_id=id, file_id=form_data.file_id, db=db)
# Remove content from the vector database
try:
await ASYNC_VECTOR_DB_CLIENT.delete(
collection_name=knowledge.id, filter={'file_id': form_data.file_id}
) # Remove by file_id first
await ASYNC_VECTOR_DB_CLIENT.delete(
collection_name=knowledge.id, filter={'hash': file.hash}
) # Remove by hash as well in case of duplicates
except Exception as e:
log.debug('This was most likely caused by bypassing embedding processing')
log.debug(e)
pass
# Anyone with write permission or higher can delete files
if delete_file and (file.user_id == user.id or user.role == 'admin'):
try:
# Remove the file's collection from vector database
file_collection = f'file-{form_data.file_id}'
if await ASYNC_VECTOR_DB_CLIENT.has_collection(collection_name=file_collection):
await ASYNC_VECTOR_DB_CLIENT.delete_collection(collection_name=file_collection)
except Exception as e:
log.debug('This was most likely caused by bypassing embedding processing')
log.debug(e)
pass
# Delete file from database
await Files.delete_file_by_id(form_data.file_id, db=db)
if knowledge:
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=await Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# DeleteKnowledgeById
############################
@router.delete('/{id}/delete', response_model=bool)
async def delete_knowledge_by_id(
id: str, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)
):
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
db=db,
)
and user.role != 'admin'
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
log.info(f'Deleting knowledge base: {id} (name: {knowledge.name})')
# Get all models
models = await Models.get_all_models(db=db)
log.info(f'Found {len(models)} models to check for knowledge base {id}')
# Update models that reference this knowledge base
for model in models:
if model.meta and hasattr(model.meta, 'knowledge'):
knowledge_list = model.meta.knowledge or []
# Filter out the deleted knowledge base
updated_knowledge = [k for k in knowledge_list if k.get('id') != id]
# If the knowledge list changed, update the model
if len(updated_knowledge) != len(knowledge_list):
log.info(f'Updating model {model.id} to remove knowledge base {id}')
model.meta.knowledge = updated_knowledge
model_form = ModelForm(**model.model_dump())
await Models.update_model_by_id(model.id, model_form, db=db)
# Clean up vector DB
try:
await ASYNC_VECTOR_DB_CLIENT.delete_collection(collection_name=id)
except Exception as e:
log.debug(e)
pass
# Remove knowledge base embedding
await remove_knowledge_base_metadata_embedding(id)
result = await Knowledges.delete_knowledge_by_id(id=id, db=db)
return result
############################
# ResetKnowledgeById
############################
@router.post('/{id}/reset', response_model=KnowledgeResponse | None)
async def reset_knowledge_by_id(
id: str,
include_directories: bool = Query(True),
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
db=db,
)
and user.role != 'admin'
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
try:
await ASYNC_VECTOR_DB_CLIENT.delete_collection(collection_name=id)
except Exception as e:
log.debug(e)
pass
knowledge = await Knowledges.reset_knowledge_by_id(id=id, include_directories=include_directories, db=db)
return knowledge
############################
# SyncKnowledgeDiff
############################
class FileManifestEntry(BaseModel):
filename: str # basename: "readme.md"
path: str # relative dir: "docs/api" or "" for root
checksum: str # SHA-256 of raw bytes
size: int
class SyncDiffForm(BaseModel):
manifest: list[FileManifestEntry]
class SyncDiffResponse(BaseModel):
added: list[dict] # [{filename, path}] — new files
modified: list[dict] # [{filename, path, stale_file_id}] — changed files
deleted: list[dict] # [{file_id, filename}] — files to remove
mkdir: list[str] # directory paths to create
rmdir: list[str] # directory IDs to remove
unmodified_count: int
directory_map: dict[str, str] # existing path → directory ID
@router.post('/{id}/sync/diff', response_model=SyncDiffResponse)
async def sync_knowledge_diff(
id: str,
form_data: SyncDiffForm,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
"""
Compare a local file manifest against the knowledge base to determine
which files need uploading, removing, and which directories to create/remove.
"""
await _verify_knowledge_write_access(id, user, db)
# ── Index existing state ──
knowledge_files = await Knowledges.get_files_with_directory_ids(id, db=db)
existing_directories = await Knowledges.get_all_directories(id, db=db)
# Build directory path lookups
directory_path_by_id: dict[str, str] = {}
directory_id_by_path: dict[str, str] = {}
for directory in existing_directories:
segments = [directory.name]
parent_id = directory.parent_id
while parent_id:
parent = next((d for d in existing_directories if d.id == parent_id), None)
if not parent:
break
segments.insert(0, parent.name)
parent_id = parent.parent_id
full_path = '/'.join(segments)
directory_path_by_id[directory.id] = full_path
directory_id_by_path[full_path] = directory.id
# Index existing files by (path, filename) → {file_id, checksum}
indexed_files: dict[tuple[str, str], dict] = {}
for file_model, directory_id in knowledge_files:
file_path = directory_path_by_id.get(directory_id, '') if directory_id else ''
stored_checksum = (file_model.meta or {}).get('file_hash')
indexed_files[(file_path, file_model.filename)] = {
'file_id': file_model.id,
'checksum': stored_checksum,
}
# ── Diff files ──
added: list[dict] = []
modified: list[dict] = []
deleted: list[dict] = []
unmodified_count = 0
manifest_keys: set[tuple[str, str]] = set()
for entry in form_data.manifest:
key = (entry.path, entry.filename)
manifest_keys.add(key)
if key not in indexed_files:
added.append({'filename': entry.filename, 'path': entry.path})
elif indexed_files[key]['checksum'] != entry.checksum:
modified.append(
{
'filename': entry.filename,
'path': entry.path,
'stale_file_id': indexed_files[key]['file_id'],
}
)
else:
unmodified_count += 1
for key, file_info in indexed_files.items():
if key not in manifest_keys:
deleted.append({'file_id': file_info['file_id'], 'filename': key[1]})
# ── Diff directories ──
required_directory_paths: set[str] = set()
for entry in form_data.manifest:
if entry.path:
segments = entry.path.split('/')
for depth in range(len(segments)):
required_directory_paths.add('/'.join(segments[: depth + 1]))
mkdir = sorted([p for p in required_directory_paths if p not in directory_id_by_path], key=lambda p: p.count('/'))
orphaned_directory_paths = set(directory_id_by_path) - required_directory_paths
rmdir = [directory_id_by_path[p] for p in orphaned_directory_paths]
return SyncDiffResponse(
added=added,
modified=modified,
deleted=deleted,
mkdir=mkdir,
rmdir=rmdir,
unmodified_count=unmodified_count,
directory_map=directory_id_by_path,
)
############################
# SyncKnowledgeCleanup
############################
class SyncCleanupForm(BaseModel):
file_ids: list[str] # file IDs to delete
dir_ids: list[str] = [] # directory IDs to rmdir
@router.post('/{id}/sync/cleanup')
async def sync_knowledge_cleanup(
id: str,
form_data: SyncCleanupForm,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
"""
Remove stale files and orphaned directories from a knowledge base
after an incremental sync.
"""
await _verify_knowledge_write_access(id, user, db)
# ── Remove deleted files ──
for file_id in form_data.file_ids:
file = await Files.get_file_by_id(file_id, db=db)
if not file:
continue
await Knowledges.remove_file_from_knowledge_by_id(id, file_id, db=db)
try:
await ASYNC_VECTOR_DB_CLIENT.delete(collection_name=id, filter={'file_id': file_id})
await ASYNC_VECTOR_DB_CLIENT.delete(collection_name=id, filter={'hash': file.hash})
except Exception:
pass
try:
collection_name = f'file-{file_id}'
if await ASYNC_VECTOR_DB_CLIENT.has_collection(collection_name):
await ASYNC_VECTOR_DB_CLIENT.delete_collection(collection_name)
except Exception:
pass
if file.user_id == user.id or user.role == 'admin':
await Files.delete_file_by_id(file_id, db=db)
try:
await asyncio.to_thread(Storage.delete_file, file.path)
except Exception:
pass
# ── Remove orphaned directories (children before parents) ──
for dir_id in reversed(form_data.dir_ids):
await Knowledges.delete_directory(dir_id, move_files_to_parent=False, db=db)
return {'status': True}
############################
# AddFilesToKnowledge
############################
@router.post('/{id}/files/batch/add', response_model=KnowledgeFilesResponse | None)
async def add_files_to_knowledge_batch(
request: Request,
id: str,
form_data: list[KnowledgeFileIdForm],
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
"""
Add multiple files to a knowledge base
"""
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
db=db,
)
and user.role != 'admin'
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# Batch-fetch all files to avoid N+1 queries
log.info(f'files/batch/add - {len(form_data)} files')
file_ids = [form.file_id for form in form_data]
files = await Files.get_files_by_ids(file_ids, db=db)
# Verify all requested files were found
found_ids = {file.id for file in files}
missing_ids = [fid for fid in file_ids if fid not in found_ids]
if missing_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f'File {missing_ids[0]} not found',
)
# Per-file read-access check — same gate as the single-file endpoint.
if user.role != 'admin':
for file in files:
if file.user_id != user.id and not await has_access_to_file(file.id, 'read', user, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# Filter out files already linked to this knowledge base to prevent
# duplicate embeddings in the vector DB (issue #10679).
new_entries = []
for form in form_data:
if not await Knowledges.has_file(knowledge_id=id, file_id=form.file_id, db=db):
new_entries.append(form)
if not new_entries:
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=await Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
)
# Narrow the file list to only new files for processing
new_file_ids = {form.file_id for form in new_entries}
files = [f for f in files if f.id in new_file_ids]
# Process files
try:
result = await process_files_batch(
request=request,
form_data=BatchProcessFilesForm(files=files, collection_name=id),
user=user,
db=db,
)
except Exception as e:
log.error(f'add_files_to_knowledge_batch: Exception occurred: {e}', exc_info=True)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
# Only add files that were successfully processed
successful_file_ids = [r.file_id for r in result.results if r.status == 'completed']
dir_map = {form.file_id: form.directory_id for form in new_entries}
for file_id in successful_file_ids:
await Knowledges.add_file_to_knowledge_by_id(
knowledge_id=id,
file_id=file_id,
user_id=user.id,
directory_id=dir_map.get(file_id),
db=db,
)
# If there were any errors, include them in the response
if result.errors:
error_details = [f'{err.file_id}: {err.error}' for err in result.errors]
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=await Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
warnings={
'message': 'Some files failed to process',
'errors': error_details,
},
)
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=await Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
)
############################
# ExportKnowledgeById
############################
@router.get('/{id}/export')
async def export_knowledge_by_id(id: str, user=Depends(get_admin_user), db: AsyncSession = Depends(get_async_session)):
"""
Export a knowledge base as a zip file containing .txt files.
Admin only.
"""
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
files = await Knowledges.get_files_by_id(id, db=db)
# Create zip file in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for file in files:
content = file.data.get('content', '') if file.data else ''
if content:
# Use original filename with .txt extension
filename = file.filename
if not filename.endswith('.txt'):
filename = f'{filename}.txt'
zf.writestr(filename, content)
zip_buffer.seek(0)
# Sanitize knowledge name for filename
# ASCII-safe fallback for the basic filename parameter (latin-1 safe)
safe_name = ''.join(c if c.isascii() and (c.isalnum() or c in ' -_') else '_' for c in knowledge.name)
zip_filename = f'{safe_name}.zip'
# Use RFC 5987 filename* for non-ASCII names so the browser gets the real name
quoted_name = quote(f'{knowledge.name}.zip')
content_disposition = f'attachment; filename="{zip_filename}"; filename*=UTF-8\'\'{quoted_name}'
return StreamingResponse(
zip_buffer,
media_type='application/zip',
headers={'Content-Disposition': content_disposition},
)
############################
# Directory endpoints
############################
class KnowledgeDirectoryCreateForm(BaseModel):
name: str
parent_id: Optional[str] = None
class KnowledgeDirectoryUpdateForm(BaseModel):
name: Optional[str] = None
parent_id: Optional[str] = '__unset__'
class KnowledgeFileMoveForm(BaseModel):
file_id: str
directory_id: Optional[str] = None
async def _verify_knowledge_write_access(id: str, user, db: AsyncSession):
"""Verify the user has write access to the knowledge base. Returns the knowledge model."""
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not await AccessGrants.has_access(
user_id=user.id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
db=db,
)
and user.role != 'admin'
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
return knowledge
@router.post('/{id}/dirs/create', response_model=KnowledgeDirectoryModel)
async def create_knowledge_directory(
id: str,
form_data: KnowledgeDirectoryCreateForm,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
await _verify_knowledge_write_access(id, user, db)
directory = await Knowledges.create_directory(
knowledge_id=id,
name=form_data.name,
user_id=user.id,
parent_id=form_data.parent_id,
db=db,
)
if not directory:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Failed to create directory. A directory with this name may already exist at this level.',
)
return directory
@router.post('/{id}/dirs/{dir_id}/update', response_model=KnowledgeDirectoryModel)
async def update_knowledge_directory(
id: str,
dir_id: str,
form_data: KnowledgeDirectoryUpdateForm,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
await _verify_knowledge_write_access(id, user, db)
# Verify directory belongs to this knowledge base
directory = await Knowledges.get_directory_by_id(dir_id, db=db)
if not directory or directory.knowledge_id != id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
result = await Knowledges.update_directory(
directory_id=dir_id,
name=form_data.name,
parent_id=form_data.parent_id,
db=db,
)
if not result:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Failed to update directory. This may be caused by a naming conflict or circular move.',
)
return result
@router.delete('/{id}/dirs/{dir_id}/delete')
async def delete_knowledge_directory(
id: str,
dir_id: str,
move_files: bool = Query(True, description='If true, move contained files to parent. If false, delete them.'),
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
await _verify_knowledge_write_access(id, user, db)
# Verify directory belongs to this knowledge base
directory = await Knowledges.get_directory_by_id(dir_id, db=db)
if not directory or directory.knowledge_id != id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
success = await Knowledges.delete_directory(
directory_id=dir_id,
move_files_to_parent=move_files,
db=db,
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Failed to delete directory.',
)
return {'status': True}
@router.post('/{id}/file/move')
async def move_file_in_knowledge(
id: str,
form_data: KnowledgeFileMoveForm,
user=Depends(get_verified_user),
db: AsyncSession = Depends(get_async_session),
):
await _verify_knowledge_write_access(id, user, db)
# Verify file belongs to this knowledge base
if not await Knowledges.has_file(knowledge_id=id, file_id=form_data.file_id, db=db):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# If target directory is set, verify it belongs to this knowledge base
if form_data.directory_id:
directory = await Knowledges.get_directory_by_id(form_data.directory_id, db=db)
if not directory or directory.knowledge_id != id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Target directory not found.',
)
success = await Knowledges.move_file_to_directory(
knowledge_id=id,
file_id=form_data.file_id,
directory_id=form_data.directory_id,
db=db,
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Failed to move file.',
)
return {'status': True}