mirror of
https://github.com/open-webui/open-webui.git
synced 2026-06-14 03:30:25 +00:00
refac
This commit is contained in:
@@ -389,6 +389,49 @@ class FilesTable:
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
async def get_pending_files_for_knowledge(
|
||||
self, knowledge_id: str, db: AsyncSession | None = None
|
||||
) -> list[FileModelResponse]:
|
||||
"""Return files still being processed for this knowledge base.
|
||||
|
||||
These are files uploaded with ``meta.data.knowledge_id`` set, whose
|
||||
``data.status`` is still ``pending`` or ``processing``, and which
|
||||
have not yet been added to the ``knowledge_file`` join table.
|
||||
|
||||
The JSON subscript syntax (``Column['key']['subkey'].as_string()``)
|
||||
is supported by both SQLite (``json_extract``) and PostgreSQL
|
||||
(``->>``/``->``).
|
||||
"""
|
||||
async with get_async_db_context(db) as db:
|
||||
try:
|
||||
# Lazy import to avoid circular dependency
|
||||
from open_webui.models.knowledge import KnowledgeFile
|
||||
|
||||
# Subquery: file IDs already linked to this knowledge base
|
||||
linked_ids = (
|
||||
select(KnowledgeFile.file_id)
|
||||
.filter(KnowledgeFile.knowledge_id == knowledge_id)
|
||||
.correlate(None)
|
||||
)
|
||||
|
||||
stmt = (
|
||||
select(File)
|
||||
.filter(
|
||||
File.meta['data']['knowledge_id'].as_string() == knowledge_id,
|
||||
File.data['status'].as_string().in_(['pending', 'processing']),
|
||||
File.id.notin_(linked_ids),
|
||||
)
|
||||
.order_by(File.created_at.desc())
|
||||
)
|
||||
result = await db.execute(stmt)
|
||||
return [
|
||||
FileModelResponse.model_validate(f, from_attributes=True)
|
||||
for f in result.scalars().all()
|
||||
]
|
||||
except Exception as e:
|
||||
log.warning(f'Error fetching pending files for knowledge {knowledge_id}: {e}')
|
||||
return []
|
||||
|
||||
async def delete_file_by_id(self, id: str, db: AsyncSession | None = None) -> bool:
|
||||
async with get_async_db_context(db) as db:
|
||||
try:
|
||||
|
||||
@@ -13,7 +13,7 @@ from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL
|
||||
from open_webui.constants import ERROR_MESSAGES
|
||||
from open_webui.internal.db import get_async_session
|
||||
from open_webui.models.access_grants import AccessGrants
|
||||
from open_webui.models.files import FileMetadataResponse, FileModel, Files
|
||||
from open_webui.models.files import FileMetadataResponse, FileModel, FileModelResponse, Files
|
||||
from open_webui.models.groups import Groups
|
||||
from open_webui.models.knowledge import (
|
||||
KnowledgeDirectoryForm,
|
||||
@@ -552,6 +552,71 @@ async def update_knowledge_access_by_id(
|
||||
)
|
||||
|
||||
|
||||
############################
|
||||
# GetPendingKnowledgeFiles
|
||||
############################
|
||||
|
||||
|
||||
@router.get('/{id}/files/pending')
|
||||
async def get_pending_knowledge_files(
|
||||
id: str,
|
||||
stream: bool = Query(False),
|
||||
user=Depends(get_verified_user),
|
||||
db: AsyncSession = Depends(get_async_session),
|
||||
):
|
||||
"""Return files that are being processed for this knowledge base but not yet linked.
|
||||
|
||||
After a file is uploaded with ``knowledge_id`` in its metadata, the backend
|
||||
processes it in a background task before linking it to the ``knowledge_file``
|
||||
join table. During this window the file is invisible to the normal file
|
||||
list endpoint. This endpoint exposes those in-flight files so the frontend
|
||||
can show them with a processing indicator even after a page reload.
|
||||
|
||||
When ``stream=true``, returns an SSE stream that polls every 3 seconds
|
||||
and emits the current pending file list. Closes when no files remain.
|
||||
"""
|
||||
knowledge = await Knowledges.get_knowledge_by_id(id=id, db=db)
|
||||
if not knowledge:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=ERROR_MESSAGES.NOT_FOUND,
|
||||
)
|
||||
|
||||
if not (
|
||||
user.role == 'admin'
|
||||
or knowledge.user_id == user.id
|
||||
or await AccessGrants.has_access(
|
||||
user_id=user.id,
|
||||
resource_type='knowledge',
|
||||
resource_id=knowledge.id,
|
||||
permission='read',
|
||||
db=db,
|
||||
)
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
||||
)
|
||||
|
||||
if not stream:
|
||||
return await Files.get_pending_files_for_knowledge(id, db=db)
|
||||
|
||||
async def event_stream(knowledge_id: str):
|
||||
MAX_POLL_DURATION = 3600 # 1 hour max
|
||||
for _ in range(MAX_POLL_DURATION // 3):
|
||||
pending = await Files.get_pending_files_for_knowledge(knowledge_id)
|
||||
data = [f.model_dump() for f in pending]
|
||||
yield f'data: {json.dumps(data)}\n\n'
|
||||
if len(pending) == 0:
|
||||
break
|
||||
await asyncio.sleep(3)
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(id),
|
||||
media_type='text/event-stream',
|
||||
)
|
||||
|
||||
|
||||
############################
|
||||
# GetKnowledgeFilesById
|
||||
############################
|
||||
|
||||
@@ -253,6 +253,50 @@ export const searchKnowledgeFilesById = async (
|
||||
return res;
|
||||
};
|
||||
|
||||
export const getPendingKnowledgeFiles = async (token: string, id: string) => {
|
||||
let error = null;
|
||||
|
||||
const res = await fetch(`${WEBUI_API_BASE_URL}/knowledge/${id}/files/pending`, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
Accept: 'application/json',
|
||||
'Content-Type': 'application/json',
|
||||
authorization: `Bearer ${token}`
|
||||
}
|
||||
})
|
||||
.then(async (res) => {
|
||||
if (!res.ok) throw await res.json();
|
||||
return res.json();
|
||||
})
|
||||
.catch((err) => {
|
||||
error = err.detail;
|
||||
console.error(err);
|
||||
return [];
|
||||
});
|
||||
|
||||
if (error) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
return res;
|
||||
};
|
||||
|
||||
export const streamPendingKnowledgeFiles = async (token: string, id: string) => {
|
||||
const res = await fetch(`${WEBUI_API_BASE_URL}/knowledge/${id}/files/pending?stream=true`, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
Accept: 'text/event-stream',
|
||||
authorization: `Bearer ${token}`
|
||||
}
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
throw new Error('Failed to stream pending files');
|
||||
}
|
||||
|
||||
return res;
|
||||
};
|
||||
|
||||
type KnowledgeUpdateForm = {
|
||||
name?: string;
|
||||
description?: string;
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
import {
|
||||
addFileToKnowledgeById,
|
||||
getKnowledgeById,
|
||||
getPendingKnowledgeFiles,
|
||||
removeFileFromKnowledgeById,
|
||||
resetKnowledgeById,
|
||||
updateFileFromKnowledgeById,
|
||||
@@ -130,6 +131,8 @@
|
||||
let pendingDeleteDirectoryId: string | null = null;
|
||||
let deleteDirectoryContents = true;
|
||||
|
||||
let pendingPollTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
const reset = () => {
|
||||
currentPage = 1;
|
||||
};
|
||||
@@ -198,7 +201,42 @@
|
||||
fileItemsTotal = res.total;
|
||||
directoryItems = res.directories ?? [];
|
||||
breadcrumbs = res.breadcrumbs ?? [];
|
||||
|
||||
// Merge in-flight files not yet linked to the knowledge base
|
||||
try {
|
||||
const pendingFiles = await getPendingKnowledgeFiles(localStorage.token, knowledgeId);
|
||||
if (pendingFiles && pendingFiles.length > 0) {
|
||||
const existingIds = new Set(fileItems.map((f) => f.id));
|
||||
const newPending = pendingFiles
|
||||
.filter((f) => !existingIds.has(f.id))
|
||||
.map((f) => ({
|
||||
...f,
|
||||
name: f.meta?.name ?? f.filename,
|
||||
status: 'uploading'
|
||||
}));
|
||||
if (newPending.length > 0) {
|
||||
fileItems = [...newPending, ...fileItems];
|
||||
|
||||
// Start polling for completion (if not already polling)
|
||||
if (!pendingPollTimer) {
|
||||
pendingPollTimer = setInterval(async () => {
|
||||
try {
|
||||
const still = await getPendingKnowledgeFiles(localStorage.token, knowledgeId);
|
||||
if (!still || still.length === 0) {
|
||||
clearInterval(pendingPollTimer);
|
||||
pendingPollTimer = null;
|
||||
init();
|
||||
}
|
||||
} catch {}
|
||||
}, 5000);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
console.warn('Failed to fetch pending files:', e);
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
};
|
||||
|
||||
@@ -1079,6 +1117,7 @@
|
||||
|
||||
onDestroy(() => {
|
||||
clearTimeout(searchDebounceTimer);
|
||||
if (pendingPollTimer) { clearInterval(pendingPollTimer); pendingPollTimer = null; }
|
||||
mediaQuery?.removeEventListener('change', handleMediaQuery);
|
||||
const dropZone = document.querySelector('body');
|
||||
dropZone?.removeEventListener('dragover', onDragOver);
|
||||
@@ -1086,6 +1125,7 @@
|
||||
dropZone?.removeEventListener('dragleave', onDragLeave);
|
||||
});
|
||||
|
||||
|
||||
const decodeString = (str: string) => {
|
||||
try {
|
||||
return decodeURIComponent(str);
|
||||
|
||||
Reference in New Issue
Block a user