mirror of
https://github.com/open-webui/open-webui.git
synced 2026-06-14 03:30:25 +00:00
refac
This commit is contained in:
@@ -826,6 +826,41 @@ class KnowledgeTable:
|
||||
result = await db.execute(stmt)
|
||||
return [KnowledgeDirectoryModel.model_validate(d) for d in result.scalars().all()]
|
||||
|
||||
async def get_all_directories(
|
||||
self,
|
||||
knowledge_id: str,
|
||||
db: Optional[AsyncSession] = None,
|
||||
) -> list[KnowledgeDirectoryModel]:
|
||||
"""Get ALL directories for a KB (no parent filter). Used for tree building."""
|
||||
async with get_async_db_context(db) as db:
|
||||
stmt = (
|
||||
select(KnowledgeDirectory)
|
||||
.filter(KnowledgeDirectory.knowledge_id == knowledge_id)
|
||||
.order_by(KnowledgeDirectory.name.asc())
|
||||
)
|
||||
result = await db.execute(stmt)
|
||||
return [KnowledgeDirectoryModel.model_validate(d) for d in result.scalars().all()]
|
||||
|
||||
async def get_files_with_directory_ids(
|
||||
self,
|
||||
knowledge_id: str,
|
||||
db: Optional[AsyncSession] = None,
|
||||
) -> list[tuple[FileModel, Optional[str]]]:
|
||||
"""Get all files in a KB with their directory_id from KnowledgeFile."""
|
||||
try:
|
||||
async with get_async_db_context(db) as db:
|
||||
result = await db.execute(
|
||||
select(File, KnowledgeFile.directory_id)
|
||||
.join(KnowledgeFile, File.id == KnowledgeFile.file_id)
|
||||
.filter(KnowledgeFile.knowledge_id == knowledge_id)
|
||||
)
|
||||
return [
|
||||
(FileModel.model_validate(file), dir_id)
|
||||
for file, dir_id in result.all()
|
||||
]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
async def get_directory_by_id(
|
||||
self, directory_id: str, db: Optional[AsyncSession] = None
|
||||
) -> Optional[KnowledgeDirectoryModel]:
|
||||
|
||||
@@ -138,16 +138,109 @@ def _extract_numeric_flag(tokens: list[str]) -> tuple[Optional[int], list[str]]:
|
||||
return num, remaining
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DIRECTORY TREE & PATH RESOLUTION
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def _build_directory_tree(knowledge_id: str) -> dict:
|
||||
"""Build an in-memory directory tree for a KB. Returns {dirs, files, path_to_dir_id, dir_id_to_path}."""
|
||||
from open_webui.models.knowledge import Knowledges
|
||||
|
||||
all_dirs = await Knowledges.get_all_directories(knowledge_id)
|
||||
files_with_dirs = await Knowledges.get_files_with_directory_ids(knowledge_id)
|
||||
|
||||
# Build dir_id -> dir info map
|
||||
dir_map = {}
|
||||
for d in all_dirs:
|
||||
dir_map[d.id] = {'name': d.name, 'parent_id': d.parent_id, 'id': d.id}
|
||||
|
||||
# Compute full path for each directory
|
||||
dir_id_to_path = {}
|
||||
def _get_dir_path(dir_id):
|
||||
if dir_id in dir_id_to_path:
|
||||
return dir_id_to_path[dir_id]
|
||||
d = dir_map.get(dir_id)
|
||||
if not d:
|
||||
return ''
|
||||
if d['parent_id'] and d['parent_id'] in dir_map:
|
||||
parent_path = _get_dir_path(d['parent_id'])
|
||||
path = f"{parent_path}/{d['name']}" if parent_path else d['name']
|
||||
else:
|
||||
path = d['name']
|
||||
dir_id_to_path[dir_id] = path
|
||||
return path
|
||||
|
||||
for d_id in dir_map:
|
||||
_get_dir_path(d_id)
|
||||
|
||||
path_to_dir_id = {v: k for k, v in dir_id_to_path.items()}
|
||||
|
||||
# Build file list with paths
|
||||
files = []
|
||||
for file_model, directory_id in files_with_dirs:
|
||||
if directory_id and directory_id in dir_id_to_path:
|
||||
file_path = f"{dir_id_to_path[directory_id]}/{file_model.filename}"
|
||||
else:
|
||||
file_path = file_model.filename
|
||||
files.append({
|
||||
'id': file_model.id, 'filename': file_model.filename,
|
||||
'path': file_path, 'directory_id': directory_id,
|
||||
'size': file_model.meta.get('size') if file_model.meta else None,
|
||||
'type': file_model.meta.get('content_type') if file_model.meta else None,
|
||||
'updated_at': file_model.updated_at,
|
||||
})
|
||||
|
||||
return {
|
||||
'dirs': dir_map,
|
||||
'files': files,
|
||||
'path_to_dir_id': path_to_dir_id,
|
||||
'dir_id_to_path': dir_id_to_path,
|
||||
}
|
||||
|
||||
|
||||
def _resolve_path(path: str, tree: dict) -> str | None:
|
||||
"""Resolve a directory path string to a dir_id. Returns None if not found."""
|
||||
path = path.strip('/')
|
||||
return tree['path_to_dir_id'].get(path)
|
||||
|
||||
|
||||
def _get_files_in_dir(tree: dict, dir_id: str | None) -> list[dict]:
|
||||
"""Get files directly in a directory (None = root)."""
|
||||
return [f for f in tree['files'] if f['directory_id'] == dir_id]
|
||||
|
||||
|
||||
def _get_subdirs(tree: dict, parent_id: str | None) -> list[dict]:
|
||||
"""Get immediate child directories."""
|
||||
return sorted(
|
||||
[d for d in tree['dirs'].values() if d['parent_id'] == parent_id],
|
||||
key=lambda d: d['name']
|
||||
)
|
||||
|
||||
|
||||
def _get_files_under_dir(tree: dict, dir_id: str) -> list[dict]:
|
||||
"""Get all files recursively under a directory."""
|
||||
# Collect this dir + all descendant dir IDs
|
||||
target_ids = {dir_id}
|
||||
changed = True
|
||||
while changed:
|
||||
changed = False
|
||||
for d in tree['dirs'].values():
|
||||
if d['parent_id'] in target_ids and d['id'] not in target_ids:
|
||||
target_ids.add(d['id'])
|
||||
changed = True
|
||||
return [f for f in tree['files'] if f['directory_id'] in target_ids]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# FILE RESOLUTION & ACCESS CONTROL
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def _get_accessible_files(user: dict, model_knowledge: list[dict] | None,
|
||||
knowledge_id: str | None = None) -> list[dict]:
|
||||
"""Get all files the user can access, with KB metadata."""
|
||||
async def _get_accessible_kb_ids(user: dict, model_knowledge: list[dict] | None,
|
||||
knowledge_id: str | None = None) -> list[tuple[str, str]]:
|
||||
"""Get list of (kb_id, kb_name) the user can access."""
|
||||
from open_webui.models.access_grants import AccessGrants
|
||||
from open_webui.models.files import Files
|
||||
from open_webui.models.groups import Groups
|
||||
from open_webui.models.knowledge import Knowledges
|
||||
|
||||
@@ -155,102 +248,83 @@ async def _get_accessible_files(user: dict, model_knowledge: list[dict] | None,
|
||||
user_role = user.get('role', 'user')
|
||||
user_group_ids = [g.id for g in await Groups.get_groups_by_member_id(user_id)]
|
||||
|
||||
files = [] # list of {id, filename, size, type, updated_at, knowledge_id, knowledge_name, file_obj}
|
||||
async def _has_access(kb):
|
||||
return (user_role == 'admin' or kb.user_id == user_id
|
||||
or await AccessGrants.has_access(
|
||||
user_id=user_id, resource_type='knowledge',
|
||||
resource_id=kb.id, permission='read',
|
||||
user_group_ids=set(user_group_ids)))
|
||||
|
||||
result = []
|
||||
|
||||
if model_knowledge:
|
||||
attached_kb_ids = set()
|
||||
attached_file_ids = set()
|
||||
|
||||
for item in model_knowledge:
|
||||
t, i = item.get('type'), item.get('id')
|
||||
if t == 'collection':
|
||||
attached_kb_ids.add(i)
|
||||
elif t == 'file':
|
||||
attached_file_ids.add(i)
|
||||
|
||||
if item.get('type') == 'collection':
|
||||
attached_kb_ids.add(item.get('id'))
|
||||
if knowledge_id:
|
||||
if knowledge_id not in attached_kb_ids:
|
||||
return []
|
||||
attached_kb_ids = {knowledge_id}
|
||||
|
||||
for kb_id in attached_kb_ids:
|
||||
knowledge = await Knowledges.get_knowledge_by_id(kb_id)
|
||||
if not knowledge:
|
||||
continue
|
||||
if not (user_role == 'admin' or knowledge.user_id == user_id
|
||||
or await AccessGrants.has_access(
|
||||
user_id=user_id, resource_type='knowledge',
|
||||
resource_id=knowledge.id, permission='read',
|
||||
user_group_ids=set(user_group_ids))):
|
||||
continue
|
||||
kb = await Knowledges.get_knowledge_by_id(kb_id)
|
||||
if kb and await _has_access(kb):
|
||||
result.append((kb.id, kb.name))
|
||||
elif knowledge_id:
|
||||
kb = await Knowledges.get_knowledge_by_id(knowledge_id)
|
||||
if kb and await _has_access(kb):
|
||||
result.append((kb.id, kb.name))
|
||||
else:
|
||||
search = await Knowledges.search_knowledge_bases(
|
||||
user_id, filter={'query': '', 'user_id': user_id, 'group_ids': user_group_ids},
|
||||
skip=0, limit=50,
|
||||
)
|
||||
for kb in search.items:
|
||||
result.append((kb.id, kb.name))
|
||||
|
||||
kb_files = await Knowledges.get_files_by_id(kb_id)
|
||||
if kb_files:
|
||||
for f in kb_files:
|
||||
files.append({
|
||||
'id': f.id, 'filename': f.filename,
|
||||
'size': f.meta.get('size') if f.meta else None,
|
||||
'type': f.meta.get('content_type') if f.meta else None,
|
||||
'updated_at': f.updated_at,
|
||||
'knowledge_id': kb_id,
|
||||
'knowledge_name': knowledge.name,
|
||||
})
|
||||
return result
|
||||
|
||||
|
||||
async def _get_accessible_files(user: dict, model_knowledge: list[dict] | None,
|
||||
knowledge_id: str | None = None) -> list[dict]:
|
||||
"""Get all files the user can access, with KB metadata and directory paths."""
|
||||
from open_webui.models.files import Files
|
||||
|
||||
kb_ids = await _get_accessible_kb_ids(user, model_knowledge, knowledge_id)
|
||||
files = []
|
||||
|
||||
for kb_id, kb_name in kb_ids:
|
||||
tree = await _build_directory_tree(kb_id)
|
||||
for f in tree['files']:
|
||||
files.append({
|
||||
**f,
|
||||
'knowledge_id': kb_id,
|
||||
'knowledge_name': kb_name,
|
||||
})
|
||||
|
||||
# Also handle directly attached files (not in any KB)
|
||||
if model_knowledge:
|
||||
attached_file_ids = set()
|
||||
for item in model_knowledge:
|
||||
if item.get('type') == 'file':
|
||||
attached_file_ids.add(item.get('id'))
|
||||
for fid in attached_file_ids:
|
||||
f = await Files.get_file_by_id(fid)
|
||||
if f:
|
||||
files.append({
|
||||
'id': f.id, 'filename': f.filename,
|
||||
'path': f.filename, 'directory_id': None,
|
||||
'size': f.meta.get('size') if f.meta else None,
|
||||
'type': f.meta.get('content_type') if f.meta else None,
|
||||
'updated_at': f.updated_at,
|
||||
'knowledge_id': None, 'knowledge_name': None,
|
||||
})
|
||||
|
||||
elif knowledge_id:
|
||||
knowledge = await Knowledges.get_knowledge_by_id(knowledge_id)
|
||||
if not knowledge:
|
||||
return []
|
||||
if not (user_role == 'admin' or knowledge.user_id == user_id
|
||||
or await AccessGrants.has_access(
|
||||
user_id=user_id, resource_type='knowledge',
|
||||
resource_id=knowledge.id, permission='read',
|
||||
user_group_ids=set(user_group_ids))):
|
||||
return []
|
||||
|
||||
kb_files = await Knowledges.get_files_by_id(knowledge_id)
|
||||
if kb_files:
|
||||
for f in kb_files:
|
||||
files.append({
|
||||
'id': f.id, 'filename': f.filename,
|
||||
'size': f.meta.get('size') if f.meta else None,
|
||||
'type': f.meta.get('content_type') if f.meta else None,
|
||||
'updated_at': f.updated_at,
|
||||
'knowledge_id': knowledge_id,
|
||||
'knowledge_name': knowledge.name,
|
||||
})
|
||||
else:
|
||||
result = await Knowledges.search_knowledge_bases(
|
||||
user_id, filter={'query': '', 'user_id': user_id, 'group_ids': user_group_ids},
|
||||
skip=0, limit=50,
|
||||
)
|
||||
for kb in result.items:
|
||||
kb_files = await Knowledges.get_files_by_id(kb.id)
|
||||
if kb_files:
|
||||
for f in kb_files:
|
||||
files.append({
|
||||
'id': f.id, 'filename': f.filename,
|
||||
'size': f.meta.get('size') if f.meta else None,
|
||||
'type': f.meta.get('content_type') if f.meta else None,
|
||||
'updated_at': f.updated_at,
|
||||
'knowledge_id': kb.id, 'knowledge_name': kb.name,
|
||||
})
|
||||
|
||||
return files
|
||||
|
||||
|
||||
async def _resolve_file(ref: str, user: dict, model_knowledge: list[dict] | None) -> dict | None:
|
||||
"""Resolve a file reference (ID or filename) to a file info dict with content."""
|
||||
"""Resolve a file reference (ID, path, or filename) to a file info dict with content."""
|
||||
from open_webui.models.files import Files
|
||||
|
||||
# Get the set of files this user can access
|
||||
@@ -265,6 +339,21 @@ async def _resolve_file(ref: str, user: dict, model_knowledge: list[dict] | None
|
||||
return {'id': f.id, 'filename': f.filename, 'content': f.data.get('content', ''),
|
||||
'meta': f.meta, 'updated_at': f.updated_at, 'created_at': f.created_at}
|
||||
|
||||
# Try path match (e.g. "docs/api/auth.md")
|
||||
ref_clean = ref.strip('/')
|
||||
if '/' in ref_clean:
|
||||
path_matches = [fi for fi in accessible if fi.get('path') == ref_clean]
|
||||
if len(path_matches) == 1:
|
||||
f = await Files.get_file_by_id(path_matches[0]['id'])
|
||||
if f and f.data:
|
||||
return {'id': f.id, 'filename': f.filename, 'content': f.data.get('content', ''),
|
||||
'meta': f.meta, 'updated_at': f.updated_at, 'created_at': f.created_at,
|
||||
'knowledge_id': path_matches[0].get('knowledge_id'),
|
||||
'knowledge_name': path_matches[0].get('knowledge_name')}
|
||||
elif len(path_matches) > 1:
|
||||
return {'error': f'Ambiguous path "{ref}". Matches:\n' +
|
||||
'\n'.join(f" {m['id']} {m['path']} ({m.get('knowledge_name', 'direct')})" for m in path_matches)}
|
||||
|
||||
# Try filename match within accessible files
|
||||
matches = [fi for fi in accessible if fi['filename'] == ref]
|
||||
|
||||
@@ -276,8 +365,8 @@ async def _resolve_file(ref: str, user: dict, model_knowledge: list[dict] | None
|
||||
'knowledge_id': matches[0].get('knowledge_id'),
|
||||
'knowledge_name': matches[0].get('knowledge_name')}
|
||||
elif len(matches) > 1:
|
||||
return {'error': f'Ambiguous filename "{ref}". Matches:\n' +
|
||||
'\n'.join(f" {m['id']} {m['filename']} ({m.get('knowledge_name', 'direct')})" for m in matches)}
|
||||
return {'error': f'Ambiguous filename "{ref}". Use full path to disambiguate:\n' +
|
||||
'\n'.join(f" {m['id']} {m.get('path', m['filename'])} ({m.get('knowledge_name', 'direct')})" for m in matches)}
|
||||
|
||||
return None
|
||||
|
||||
@@ -298,42 +387,84 @@ async def _get_file_content(file_id: str) -> str | None:
|
||||
|
||||
async def _kb_ls(args: list[str], flags: set[str], user: dict,
|
||||
model_knowledge: list[dict] | None) -> str:
|
||||
"""List files with metadata."""
|
||||
knowledge_id = args[0] if args else None
|
||||
files = await _get_accessible_files(user, model_knowledge, knowledge_id)
|
||||
"""List files and directories. Supports: ls, ls <path>, ls -a (flat)."""
|
||||
flat_mode = 'a' in flags
|
||||
path_arg = args[0] if args else None
|
||||
|
||||
if not files:
|
||||
return 'No files found.'
|
||||
# Determine which KBs are accessible
|
||||
kb_ids = await _get_accessible_kb_ids(user, model_knowledge, knowledge_id=None)
|
||||
|
||||
# Group by KB
|
||||
by_kb: dict[str, list[dict]] = {}
|
||||
for f in files:
|
||||
key = f.get('knowledge_name') or 'Direct Files'
|
||||
by_kb.setdefault(key, []).append(f)
|
||||
# If path_arg looks like a KB ID, scope to that KB
|
||||
target_kb_id = None
|
||||
dir_path = None
|
||||
if path_arg:
|
||||
for kb_id, kb_name in kb_ids:
|
||||
if kb_id == path_arg:
|
||||
target_kb_id = kb_id
|
||||
break
|
||||
if not target_kb_id:
|
||||
dir_path = path_arg.strip('/')
|
||||
|
||||
if target_kb_id:
|
||||
kb_ids = [(kid, kn) for kid, kn in kb_ids if kid == target_kb_id]
|
||||
|
||||
if not kb_ids:
|
||||
return 'No knowledge bases found.'
|
||||
|
||||
lines = []
|
||||
for kb_name, kb_files in by_kb.items():
|
||||
kb_id = kb_files[0].get('knowledge_id', '')
|
||||
if kb_id:
|
||||
lines.append(f'Knowledge Base: {kb_name} ({kb_id})')
|
||||
else:
|
||||
lines.append(f'{kb_name}')
|
||||
for kb_id, kb_name in kb_ids:
|
||||
tree = await _build_directory_tree(kb_id)
|
||||
lines.append(f'Knowledge Base: {kb_name} ({kb_id})')
|
||||
|
||||
for f in kb_files:
|
||||
size_str = f'{f["size"]:,} bytes' if f.get('size') else 'unknown size'
|
||||
type_str = f.get('type') or 'unknown type'
|
||||
date_str = ''
|
||||
if f.get('updated_at'):
|
||||
from datetime import datetime, timezone
|
||||
dt = datetime.fromtimestamp(f['updated_at'], tz=timezone.utc)
|
||||
date_str = dt.strftime('%Y-%m-%d')
|
||||
lines.append(f' {f["id"]} {f["filename"]} {size_str} {type_str} {date_str}')
|
||||
if flat_mode:
|
||||
# Flat: show all files with full paths
|
||||
for f in tree['files']:
|
||||
lines.append(f' {f["id"]} {f["path"]} {_fmt_size(f)} {_fmt_date(f)}')
|
||||
lines.append('')
|
||||
continue
|
||||
|
||||
# Resolve target directory
|
||||
target_dir_id = None
|
||||
if dir_path:
|
||||
target_dir_id = _resolve_path(dir_path, tree)
|
||||
if target_dir_id is None:
|
||||
lines.append(f' Directory not found: {dir_path}')
|
||||
lines.append('')
|
||||
continue
|
||||
lines.append(f' Path: {dir_path}/')
|
||||
|
||||
# Show subdirectories
|
||||
subdirs = _get_subdirs(tree, target_dir_id)
|
||||
for d in subdirs:
|
||||
# Count files recursively under this dir
|
||||
child_files = _get_files_under_dir(tree, d['id'])
|
||||
count_str = f'{len(child_files)} file{"s" if len(child_files) != 1 else ""}'
|
||||
lines.append(f' 📁 {d["name"]}/ ({count_str})')
|
||||
|
||||
# Show files at this level
|
||||
dir_files = _get_files_in_dir(tree, target_dir_id)
|
||||
for f in dir_files:
|
||||
lines.append(f' {f["id"]} {f["filename"]} {_fmt_size(f)} {_fmt_date(f)}')
|
||||
|
||||
if not subdirs and not dir_files:
|
||||
lines.append(' (empty)')
|
||||
lines.append('')
|
||||
|
||||
return '\n'.join(lines).rstrip()
|
||||
|
||||
|
||||
def _fmt_size(f: dict) -> str:
|
||||
return f'{f["size"]:,} bytes' if f.get('size') else ''
|
||||
|
||||
|
||||
def _fmt_date(f: dict) -> str:
|
||||
if f.get('updated_at'):
|
||||
from datetime import datetime, timezone
|
||||
dt = datetime.fromtimestamp(f['updated_at'], tz=timezone.utc)
|
||||
return dt.strftime('%Y-%m-%d')
|
||||
return ''
|
||||
|
||||
|
||||
async def _kb_cat(args: list[str], flags: set[str], user: dict,
|
||||
model_knowledge: list[dict] | None) -> str:
|
||||
"""Read file content. Use -n for line numbers."""
|
||||
@@ -433,10 +564,13 @@ async def _kb_grep(args: list[str], flags: set[str], user: dict,
|
||||
pattern = args[0]
|
||||
file_ref = None
|
||||
ext_filter = None
|
||||
dir_scope = None
|
||||
|
||||
for arg in args[1:]:
|
||||
if '*' in arg or arg.startswith('.'):
|
||||
ext_filter = arg.lstrip('*').lstrip('.')
|
||||
elif arg.endswith('/'):
|
||||
dir_scope = arg.strip('/')
|
||||
else:
|
||||
file_ref = arg
|
||||
|
||||
@@ -451,45 +585,53 @@ async def _kb_grep(args: list[str], flags: set[str], user: dict,
|
||||
|
||||
# Grep on piped input
|
||||
if piped_input is not None:
|
||||
lines = piped_input.split('\n')
|
||||
lines = piped_input.split('\\n')
|
||||
matched = []
|
||||
for i, line in enumerate(lines, 1):
|
||||
if _matches(line):
|
||||
matched.append(f'{i}: {line}')
|
||||
return '\n'.join(matched) if matched else f'No matches for "{pattern}"'
|
||||
return '\\n'.join(matched) if matched else f'No matches for "{pattern}"'
|
||||
|
||||
# Single file grep
|
||||
if file_ref:
|
||||
if file_ref and not dir_scope:
|
||||
resolved = await _resolve_file(file_ref, user, model_knowledge)
|
||||
if not resolved:
|
||||
return f'File not found: {file_ref}'
|
||||
if 'error' in resolved:
|
||||
# Maybe it's a directory path without trailing /
|
||||
dir_scope = file_ref
|
||||
elif 'error' in resolved:
|
||||
return resolved['error']
|
||||
else:
|
||||
lines = resolved['content'].split('\\n')
|
||||
matched = []
|
||||
for i, line in enumerate(lines, 1):
|
||||
if _matches(line):
|
||||
matched.append(f'{i}: {line}')
|
||||
|
||||
lines = resolved['content'].split('\n')
|
||||
matched = []
|
||||
for i, line in enumerate(lines, 1):
|
||||
if _matches(line):
|
||||
matched.append(f'{i}: {line}')
|
||||
if count_only:
|
||||
return f'{resolved["id"]} {resolved["filename"]}: {len(matched)}'
|
||||
if filenames_only:
|
||||
return f'{resolved["id"]} {resolved["filename"]}' if matched else f'No matches for "{pattern}"'
|
||||
|
||||
if count_only:
|
||||
return f'{resolved["id"]} {resolved["filename"]}: {len(matched)}'
|
||||
if filenames_only:
|
||||
return f'{resolved["id"]} {resolved["filename"]}' if matched else f'No matches for "{pattern}"'
|
||||
if not matched:
|
||||
return f'No matches for "{pattern}" in {resolved["filename"]}'
|
||||
return '\\n'.join(matched)
|
||||
|
||||
if not matched:
|
||||
return f'No matches for "{pattern}" in {resolved["filename"]}'
|
||||
return '\n'.join(matched)
|
||||
|
||||
# Cross-file grep
|
||||
# Cross-file grep (optionally scoped to directory)
|
||||
accessible = await _get_accessible_files(user, model_knowledge)
|
||||
|
||||
if dir_scope:
|
||||
# Scope to files under this directory path
|
||||
accessible = [f for f in accessible if f.get('path', '').startswith(dir_scope + '/') or
|
||||
(f.get('directory_id') and f.get('path', '').startswith(dir_scope))]
|
||||
if not accessible:
|
||||
return f'No files found under "{dir_scope}/"'
|
||||
|
||||
if ext_filter:
|
||||
accessible = [f for f in accessible if f['filename'].endswith(f'.{ext_filter}')]
|
||||
|
||||
if len(accessible) > MAX_GREP_FILES:
|
||||
return (f'Too many files ({len(accessible)}). '
|
||||
f'Scope your search: grep "{pattern}" <knowledge_id> or grep "{pattern}" *.py')
|
||||
f'Scope your search: grep "{pattern}" docs/ or grep "{pattern}" *.py')
|
||||
|
||||
from open_webui.models.files import Files
|
||||
|
||||
@@ -522,20 +664,20 @@ async def _kb_grep(args: list[str], flags: set[str], user: dict,
|
||||
for line_num, line_text in file_matches:
|
||||
if len(results) < MAX_GREP_MATCHES:
|
||||
results.append(
|
||||
f'{file_info["id"]} {file_info["filename"]}:{line_num}: {line_text.rstrip()}'
|
||||
f'{file_info["id"]} {file_info.get("path", file_info["filename"])}:{line_num}: {line_text.rstrip()}'
|
||||
)
|
||||
|
||||
if count_only:
|
||||
if not file_match_counts:
|
||||
return f'No matches for "{pattern}"'
|
||||
lines = [f'{fi["id"]} {fi["filename"]}: {cnt}' for fi, cnt in file_match_counts]
|
||||
lines = [f'{fi["id"]} {fi.get("path", fi["filename"])}: {cnt}' for fi, cnt in file_match_counts]
|
||||
lines.append(f'Total: {total_matches} matches in {len(file_match_counts)} files')
|
||||
return '\n'.join(lines)
|
||||
|
||||
if filenames_only:
|
||||
if not files_with_matches:
|
||||
return f'No matches for "{pattern}"'
|
||||
return '\n'.join(f'{fi["id"]} {fi["filename"]}' for fi in files_with_matches)
|
||||
return '\n'.join(f'{fi["id"]} {fi.get("path", fi["filename"])}' for fi in files_with_matches)
|
||||
|
||||
if not results:
|
||||
return f'No matches for "{pattern}" across {len(accessible)} files'
|
||||
@@ -548,24 +690,36 @@ async def _kb_grep(args: list[str], flags: set[str], user: dict,
|
||||
|
||||
async def _kb_find(args: list[str], flags: set[str], user: dict,
|
||||
model_knowledge: list[dict] | None) -> str:
|
||||
"""Find files by name/glob pattern."""
|
||||
"""Find files by name/glob pattern, optionally scoped to a directory."""
|
||||
if not args:
|
||||
return 'Usage: find "*.md" or find "config*"'
|
||||
return 'Usage: find "*.md" or find docs/ "*.md"'
|
||||
|
||||
import fnmatch
|
||||
|
||||
# If two args and first looks like a dir scope
|
||||
dir_scope = None
|
||||
if len(args) >= 2 and ('/' in args[0] or not ('*' in args[0] or '?' in args[0])):
|
||||
dir_scope = args[0].strip('/')
|
||||
pattern = args[1]
|
||||
else:
|
||||
pattern = args[0]
|
||||
|
||||
pattern = args[0]
|
||||
accessible = await _get_accessible_files(user, model_knowledge)
|
||||
|
||||
# Simple glob matching
|
||||
import fnmatch
|
||||
if dir_scope:
|
||||
accessible = [f for f in accessible if f.get('path', '').startswith(dir_scope + '/') or
|
||||
f.get('path', '').startswith(dir_scope)]
|
||||
|
||||
matched = [f for f in accessible if fnmatch.fnmatch(f['filename'], pattern)]
|
||||
|
||||
if not matched:
|
||||
return f'No files matching "{pattern}"'
|
||||
scope_str = f' under "{dir_scope}/"' if dir_scope else ''
|
||||
return f'No files matching "{pattern}"{scope_str}'
|
||||
|
||||
lines = []
|
||||
for f in matched:
|
||||
kb_info = f' ({f["knowledge_name"]})' if f.get('knowledge_name') else ''
|
||||
lines.append(f'{f["id"]} {f["filename"]}{kb_info}')
|
||||
lines.append(f'{f["id"]} {f.get("path", f["filename"])}{kb_info}')
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
@@ -700,6 +854,59 @@ async def _kb_sed(args: list[str], flags: set[str], user: dict,
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def _kb_tree(args: list[str], flags: set[str], user: dict,
|
||||
model_knowledge: list[dict] | None) -> str:
|
||||
"""Show directory tree structure."""
|
||||
kb_ids = await _get_accessible_kb_ids(user, model_knowledge)
|
||||
if not kb_ids:
|
||||
return 'No knowledge bases found.'
|
||||
|
||||
dir_scope = args[0].strip('/') if args else None
|
||||
output = []
|
||||
|
||||
for kb_id, kb_name in kb_ids:
|
||||
tree = await _build_directory_tree(kb_id)
|
||||
output.append(f'Knowledge Base: {kb_name} ({kb_id})')
|
||||
|
||||
# Find root to start from
|
||||
root_dir_id = None
|
||||
if dir_scope:
|
||||
root_dir_id = _resolve_path(dir_scope, tree)
|
||||
if root_dir_id is None:
|
||||
output.append(f' Directory not found: {dir_scope}')
|
||||
output.append('')
|
||||
continue
|
||||
output.append(f' {dir_scope}/')
|
||||
|
||||
def _render_tree(parent_id, prefix=' '):
|
||||
items = []
|
||||
subdirs = _get_subdirs(tree, parent_id)
|
||||
files = _get_files_in_dir(tree, parent_id)
|
||||
entries = [('dir', d) for d in subdirs] + [('file', f) for f in files]
|
||||
|
||||
for idx, (etype, entry) in enumerate(entries):
|
||||
is_last = idx == len(entries) - 1
|
||||
connector = '└── ' if is_last else '├── '
|
||||
child_prefix = prefix + (' ' if is_last else '│ ')
|
||||
|
||||
if etype == 'dir':
|
||||
items.append(f'{prefix}{connector}📁 {entry["name"]}/')
|
||||
items.extend(_render_tree(entry['id'], child_prefix))
|
||||
else:
|
||||
items.append(f'{prefix}{connector}{entry["filename"]}')
|
||||
return items
|
||||
|
||||
output.extend(_render_tree(root_dir_id))
|
||||
|
||||
# Summary
|
||||
total_dirs = len(tree['dirs'])
|
||||
total_files = len(tree['files'])
|
||||
output.append(f'\n {total_dirs} directories, {total_files} files')
|
||||
output.append('')
|
||||
|
||||
return '\n'.join(output).rstrip()
|
||||
|
||||
|
||||
COMMAND_MAP = {
|
||||
'ls': _kb_ls,
|
||||
'cat': _kb_cat,
|
||||
@@ -710,6 +917,7 @@ COMMAND_MAP = {
|
||||
'wc': _kb_wc,
|
||||
'stat': _kb_stat,
|
||||
'sed': _kb_sed,
|
||||
'tree': _kb_tree,
|
||||
}
|
||||
|
||||
|
||||
@@ -755,7 +963,11 @@ async def kb_exec(
|
||||
Run a filesystem command against the knowledge base.
|
||||
|
||||
Commands:
|
||||
ls — list all files
|
||||
ls — list root files and directories
|
||||
ls docs/ — list contents of a directory
|
||||
ls -a — flat list of all files with full paths
|
||||
tree — recursive directory tree view
|
||||
tree docs/ — subtree from a directory
|
||||
cat -n <file> — read file with line numbers
|
||||
head -20 <file> — first 20 lines
|
||||
tail -10 <file> — last 10 lines
|
||||
@@ -764,14 +976,15 @@ async def kb_exec(
|
||||
grep -i "text" — case-insensitive
|
||||
grep -l "text" — filenames-only
|
||||
grep -c "text" — match counts
|
||||
grep "error|warn" <file> — regex alternation (auto-detected)
|
||||
grep "text" docs/ — search within a directory
|
||||
grep "text" *.py — filter by extension
|
||||
find "*.md" — find files by glob
|
||||
find docs/ "*.md" — find within a directory
|
||||
wc <file> — line/word/char counts
|
||||
stat <file> — file metadata
|
||||
|
||||
Pipes: grep "auth" | head -5
|
||||
Files: reference by filename or file ID from ls
|
||||
Files: reference by path (docs/api/auth.md), filename, or file ID
|
||||
|
||||
:param command: A filesystem command string
|
||||
:return: Command output as text
|
||||
|
||||
Reference in New Issue
Block a user