fix: enforce ownership check on user-memory collection queries (#22109)

* fix: enforce ownership check on user-memory collection queries

fix: enforce ownership check on user-memory collection queries

Prevent authenticated users from querying other users' memory
collections via the /query/doc and /query/collection endpoints.
A new _validate_collection_access helper rejects requests for
user-memory-{UUID} collections where the UUID does not match
the requesting user. Admins bypass the check.

* Update retrieval.py

* Update retrieval.py
This commit is contained in:
Classic298
2026-03-01 21:03:37 +01:00
committed by GitHub
parent 93bab8d822
commit 2054ee0b73
+31 -21
View File
@@ -2491,6 +2491,34 @@ async def process_web_search(
)
def _validate_collection_access(collection_names: list[str], user) -> None:
"""
Prevent users from querying collections they don't own.
Enforces ownership on user-memory-* and file-* collections.
Admins bypass this check.
"""
if user.role == "admin":
return
for name in collection_names:
if name.startswith("user-memory-") and name != f"user-memory-{user.id}":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
elif name.startswith("file-"):
file_id = name[len("file-"):]
if not has_access_to_file(
file_id=file_id,
access_type="read",
user=user,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
class QueryDocForm(BaseModel):
collection_name: str
query: str
@@ -2506,6 +2534,8 @@ async def query_doc_handler(
form_data: QueryDocForm,
user=Depends(get_verified_user),
):
_validate_collection_access([form_data.collection_name], user)
try:
if request.app.state.config.ENABLE_RAG_HYBRID_SEARCH and (
form_data.hybrid is None or form_data.hybrid
@@ -2580,27 +2610,7 @@ async def query_collection_handler(
form_data: QueryCollectionsForm,
user=Depends(get_verified_user),
):
# Ownership validation: prevent accessing other users' private collections
if user.role != "admin":
for collection_name in form_data.collection_names:
if collection_name.startswith("user-memory-"):
owner_id = collection_name[len("user-memory-") :]
if owner_id != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
elif collection_name.startswith("file-"):
file_id = collection_name[len("file-") :]
if not has_access_to_file(
file_id=file_id,
access_type="read",
user=user,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
_validate_collection_access(form_data.collection_names, user)
try:
if request.app.state.config.ENABLE_RAG_HYBRID_SEARCH and (