From ad9f2eeb15a448147d5e47e6a391cabb9aefd0ea Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Mon, 1 Jun 2026 11:34:46 -0700 Subject: [PATCH] refac --- backend/open_webui/models/files.py | 43 ++++++++++++ backend/open_webui/routers/knowledge.py | 67 ++++++++++++++++++- src/lib/apis/knowledge/index.ts | 44 ++++++++++++ .../workspace/Knowledge/KnowledgeBase.svelte | 40 +++++++++++ 4 files changed, 193 insertions(+), 1 deletion(-) diff --git a/backend/open_webui/models/files.py b/backend/open_webui/models/files.py index 5e17e35f3f..8d2ff54510 100644 --- a/backend/open_webui/models/files.py +++ b/backend/open_webui/models/files.py @@ -389,6 +389,49 @@ class FilesTable: except Exception: return None + async def get_pending_files_for_knowledge( + self, knowledge_id: str, db: AsyncSession | None = None + ) -> list[FileModelResponse]: + """Return files still being processed for this knowledge base. + + These are files uploaded with ``meta.data.knowledge_id`` set, whose + ``data.status`` is still ``pending`` or ``processing``, and which + have not yet been added to the ``knowledge_file`` join table. + + The JSON subscript syntax (``Column['key']['subkey'].as_string()``) + is supported by both SQLite (``json_extract``) and PostgreSQL + (``->>``/``->``). + """ + async with get_async_db_context(db) as db: + try: + # Lazy import to avoid circular dependency + from open_webui.models.knowledge import KnowledgeFile + + # Subquery: file IDs already linked to this knowledge base + linked_ids = ( + select(KnowledgeFile.file_id) + .filter(KnowledgeFile.knowledge_id == knowledge_id) + .correlate(None) + ) + + stmt = ( + select(File) + .filter( + File.meta['data']['knowledge_id'].as_string() == knowledge_id, + File.data['status'].as_string().in_(['pending', 'processing']), + File.id.notin_(linked_ids), + ) + .order_by(File.created_at.desc()) + ) + result = await db.execute(stmt) + return [ + FileModelResponse.model_validate(f, from_attributes=True) + for f in result.scalars().all() + ] + except Exception as e: + log.warning(f'Error fetching pending files for knowledge {knowledge_id}: {e}') + return [] + async def delete_file_by_id(self, id: str, db: AsyncSession | None = None) -> bool: async with get_async_db_context(db) as db: try: diff --git a/backend/open_webui/routers/knowledge.py b/backend/open_webui/routers/knowledge.py index a9f867eb6e..fc7a4ce71b 100644 --- a/backend/open_webui/routers/knowledge.py +++ b/backend/open_webui/routers/knowledge.py @@ -13,7 +13,7 @@ from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL from open_webui.constants import ERROR_MESSAGES from open_webui.internal.db import get_async_session from open_webui.models.access_grants import AccessGrants -from open_webui.models.files import FileMetadataResponse, FileModel, Files +from open_webui.models.files import FileMetadataResponse, FileModel, FileModelResponse, Files from open_webui.models.groups import Groups from open_webui.models.knowledge import ( KnowledgeDirectoryForm, @@ -552,6 +552,71 @@ async def update_knowledge_access_by_id( ) +############################ +# GetPendingKnowledgeFiles +############################ + + +@router.get('/{id}/files/pending') +async def get_pending_knowledge_files( + id: str, + stream: bool = Query(False), + user=Depends(get_verified_user), + db: AsyncSession = Depends(get_async_session), +): + """Return files that are being processed for this knowledge base but not yet linked. + + After a file is uploaded with ``knowledge_id`` in its metadata, the backend + processes it in a background task before linking it to the ``knowledge_file`` + join table. During this window the file is invisible to the normal file + list endpoint. This endpoint exposes those in-flight files so the frontend + can show them with a processing indicator even after a page reload. + + When ``stream=true``, returns an SSE stream that polls every 3 seconds + and emits the current pending file list. Closes when no files remain. + """ + knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db) + if not knowledge: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=ERROR_MESSAGES.NOT_FOUND, + ) + + if not ( + user.role == 'admin' + or knowledge.user_id == user.id + or await AccessGrants.has_access( + user_id=user.id, + resource_type='knowledge', + resource_id=knowledge.id, + permission='read', + db=db, + ) + ): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=ERROR_MESSAGES.ACCESS_PROHIBITED, + ) + + if not stream: + return await Files.get_pending_files_for_knowledge(id, db=db) + + async def event_stream(knowledge_id: str): + MAX_POLL_DURATION = 3600 # 1 hour max + for _ in range(MAX_POLL_DURATION // 3): + pending = await Files.get_pending_files_for_knowledge(knowledge_id) + data = [f.model_dump() for f in pending] + yield f'data: {json.dumps(data)}\n\n' + if len(pending) == 0: + break + await asyncio.sleep(3) + + return StreamingResponse( + event_stream(id), + media_type='text/event-stream', + ) + + ############################ # GetKnowledgeFilesById ############################ diff --git a/src/lib/apis/knowledge/index.ts b/src/lib/apis/knowledge/index.ts index 7e00160ffe..d99e36f284 100644 --- a/src/lib/apis/knowledge/index.ts +++ b/src/lib/apis/knowledge/index.ts @@ -253,6 +253,50 @@ export const searchKnowledgeFilesById = async ( return res; }; +export const getPendingKnowledgeFiles = async (token: string, id: string) => { + let error = null; + + const res = await fetch(`${WEBUI_API_BASE_URL}/knowledge/${id}/files/pending`, { + method: 'GET', + headers: { + Accept: 'application/json', + 'Content-Type': 'application/json', + authorization: `Bearer ${token}` + } + }) + .then(async (res) => { + if (!res.ok) throw await res.json(); + return res.json(); + }) + .catch((err) => { + error = err.detail; + console.error(err); + return []; + }); + + if (error) { + throw error; + } + + return res; +}; + +export const streamPendingKnowledgeFiles = async (token: string, id: string) => { + const res = await fetch(`${WEBUI_API_BASE_URL}/knowledge/${id}/files/pending?stream=true`, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + authorization: `Bearer ${token}` + } + }); + + if (!res.ok) { + throw new Error('Failed to stream pending files'); + } + + return res; +}; + type KnowledgeUpdateForm = { name?: string; description?: string; diff --git a/src/lib/components/workspace/Knowledge/KnowledgeBase.svelte b/src/lib/components/workspace/Knowledge/KnowledgeBase.svelte index 9a125bd084..61095455b4 100644 --- a/src/lib/components/workspace/Knowledge/KnowledgeBase.svelte +++ b/src/lib/components/workspace/Knowledge/KnowledgeBase.svelte @@ -28,6 +28,7 @@ import { addFileToKnowledgeById, getKnowledgeById, + getPendingKnowledgeFiles, removeFileFromKnowledgeById, resetKnowledgeById, updateFileFromKnowledgeById, @@ -130,6 +131,8 @@ let pendingDeleteDirectoryId: string | null = null; let deleteDirectoryContents = true; + let pendingPollTimer: ReturnType | null = null; + const reset = () => { currentPage = 1; }; @@ -198,7 +201,42 @@ fileItemsTotal = res.total; directoryItems = res.directories ?? []; breadcrumbs = res.breadcrumbs ?? []; + + // Merge in-flight files not yet linked to the knowledge base + try { + const pendingFiles = await getPendingKnowledgeFiles(localStorage.token, knowledgeId); + if (pendingFiles && pendingFiles.length > 0) { + const existingIds = new Set(fileItems.map((f) => f.id)); + const newPending = pendingFiles + .filter((f) => !existingIds.has(f.id)) + .map((f) => ({ + ...f, + name: f.meta?.name ?? f.filename, + status: 'uploading' + })); + if (newPending.length > 0) { + fileItems = [...newPending, ...fileItems]; + + // Start polling for completion (if not already polling) + if (!pendingPollTimer) { + pendingPollTimer = setInterval(async () => { + try { + const still = await getPendingKnowledgeFiles(localStorage.token, knowledgeId); + if (!still || still.length === 0) { + clearInterval(pendingPollTimer); + pendingPollTimer = null; + init(); + } + } catch {} + }, 5000); + } + } + } + } catch (e) { + console.warn('Failed to fetch pending files:', e); + } } + return res; }; @@ -1079,6 +1117,7 @@ onDestroy(() => { clearTimeout(searchDebounceTimer); + if (pendingPollTimer) { clearInterval(pendingPollTimer); pendingPollTimer = null; } mediaQuery?.removeEventListener('change', handleMediaQuery); const dropZone = document.querySelector('body'); dropZone?.removeEventListener('dragover', onDragOver); @@ -1086,6 +1125,7 @@ dropZone?.removeEventListener('dragleave', onDragLeave); }); + const decodeString = (str: string) => { try { return decodeURIComponent(str);