diff --git a/backend/open_webui/retrieval/vector/dbs/valkey.py b/backend/open_webui/retrieval/vector/dbs/valkey.py index dbac8e016c..987993dc2b 100644 --- a/backend/open_webui/retrieval/vector/dbs/valkey.py +++ b/backend/open_webui/retrieval/vector/dbs/valkey.py @@ -8,29 +8,6 @@ import re import struct from urllib.parse import urlparse -from glide_sync import ( - Batch, - DataType, - DistanceMetricType, - FtCreateOptions, - FtSearchLimit, - FtSearchOptions, - GlideClient, - GlideClientConfiguration, - NodeAddress, - RequestError, - ReturnField, - TagField, - TextField, - VectorAlgorithm, - VectorField, - VectorFieldAttributesFlat, - VectorFieldAttributesHnsw, - VectorType, -) -from glide_sync import ( - ft as glide_ft, -) from open_webui.config import ( VALKEY_COLLECTION_PREFIX, VALKEY_DISTANCE_METRIC, @@ -50,6 +27,61 @@ from open_webui.retrieval.vector.utils import process_metadata log = logging.getLogger(__name__) + +def _import_glide(): + """Lazily import glide_sync so the module can be loaded without valkey-glide-sync installed.""" + try: + from glide_sync import ( + Batch, + DataType, + DistanceMetricType, + FtCreateOptions, + FtSearchLimit, + FtSearchOptions, + GlideClient, + GlideClientConfiguration, + NodeAddress, + RequestError, + ReturnField, + TagField, + TextField, + VectorAlgorithm, + VectorField, + VectorFieldAttributesFlat, + VectorFieldAttributesHnsw, + VectorType, + ) + from glide_sync import ( + ft as glide_ft, + ) + except ImportError as e: + raise ImportError( + 'valkey-glide-sync is required when VECTOR_DB=valkey. ' + 'Install it with: pip install valkey-glide-sync==2.3.1' + ) from e + return { + 'Batch': Batch, + 'DataType': DataType, + 'DistanceMetricType': DistanceMetricType, + 'FtCreateOptions': FtCreateOptions, + 'FtSearchLimit': FtSearchLimit, + 'FtSearchOptions': FtSearchOptions, + 'GlideClient': GlideClient, + 'GlideClientConfiguration': GlideClientConfiguration, + 'NodeAddress': NodeAddress, + 'RequestError': RequestError, + 'ReturnField': ReturnField, + 'TagField': TagField, + 'TextField': TextField, + 'VectorAlgorithm': VectorAlgorithm, + 'VectorField': VectorField, + 'VectorFieldAttributesFlat': VectorFieldAttributesFlat, + 'VectorFieldAttributesHnsw': VectorFieldAttributesHnsw, + 'VectorType': VectorType, + 'glide_ft': glide_ft, + } + + # valkey-search 1.2.0 requires Valkey core 9.0.1+ per upstream release notes. # Unlike RediSearch (dialects 1-4), valkey-search only implements DIALECT 2 — GLIDE's # FtSearchOptions doesn't expose a dialect parameter because it's always dialect 2. @@ -62,11 +94,6 @@ _NEVER_MATCH_SENTINEL = '__open_webui_valkey_never_match__' # Compile once at module load — includes `?` which is a single-char wildcard in TAG queries. _TAG_SPECIAL_RE = re.compile(r'([,.<>{}\[\]"\':;!@#$%^&*()\-+=~?\\/| \t\n\r])') -_DISTANCE_METRIC_MAP = { - 'COSINE': DistanceMetricType.COSINE, - 'L2': DistanceMetricType.L2, - 'IP': DistanceMetricType.IP, -} _SAFE_FIELD_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') @@ -130,6 +157,9 @@ class ValkeyClient(VectorDBBase): 'Set it to your Valkey server URL (e.g., valkey://localhost:6379).' ) + # Lazily import glide_sync — only needed when this backend is actually used. + self._g = _import_glide() + # Validate distance metric at init — invalid values pass through to FT.CREATE # and fail with a cryptic server error. metric = VALKEY_DISTANCE_METRIC.upper() @@ -139,6 +169,13 @@ class ValkeyClient(VectorDBBase): f'Must be one of: {", ".join(sorted(_VALID_DISTANCE_METRICS))}.' ) + DistanceMetricType = self._g['DistanceMetricType'] + self._distance_metric_map = { + 'COSINE': DistanceMetricType.COSINE, + 'L2': DistanceMetricType.L2, + 'IP': DistanceMetricType.IP, + } + self.collection_prefix = VALKEY_COLLECTION_PREFIX self.index_type = VALKEY_INDEX_TYPE self.distance_metric = metric @@ -148,6 +185,10 @@ class ValkeyClient(VectorDBBase): port = parsed.port or 6379 db = int(parsed.path.lstrip('/') or 0) + GlideClientConfiguration = self._g['GlideClientConfiguration'] + NodeAddress = self._g['NodeAddress'] + GlideClient = self._g['GlideClient'] + config = GlideClientConfiguration( addresses=[NodeAddress(host=host, port=port)], database_id=db if db else None, @@ -155,7 +196,7 @@ class ValkeyClient(VectorDBBase): client_name="open_webui_vector_store_client", ) try: - self.client: GlideClient = GlideClient.create(config) + self.client = GlideClient.create(config) except Exception as e: raise ConnectionError(f'Failed to connect to Valkey at {host}:{port}: {e}') from e @@ -168,7 +209,7 @@ class ValkeyClient(VectorDBBase): client_name="open_webui_vector_store_batch_client", ) try: - self.batch_client: GlideClient = GlideClient.create(batch_config) + self.batch_client = GlideClient.create(batch_config) except Exception as e: raise ConnectionError(f'Failed to create batch write client for Valkey at {host}:{port}: {e}') from e @@ -304,50 +345,51 @@ class ValkeyClient(VectorDBBase): def _create_index(self, collection_name: str, dimension: int) -> None: """Create an FT index for a collection with the given vector dimension.""" + g = self._g index_name = self._index_name(collection_name) prefix = self._key_prefix(collection_name) - distance_metric = _DISTANCE_METRIC_MAP[self.distance_metric] + distance_metric = self._distance_metric_map[self.distance_metric] if self.index_type == 'HNSW': - vector_attrs = VectorFieldAttributesHnsw( + vector_attrs = g['VectorFieldAttributesHnsw']( dimensions=dimension, distance_metric=distance_metric, - type=VectorType.FLOAT32, + type=g['VectorType'].FLOAT32, number_of_edges=VALKEY_HNSW_M, vectors_examined_on_construction=VALKEY_HNSW_EF_CONSTRUCTION, vectors_examined_on_runtime=VALKEY_HNSW_EF_RUNTIME, ) - algo = VectorAlgorithm.HNSW + algo = g['VectorAlgorithm'].HNSW else: if self.index_type != 'FLAT': log.warning(f'Unrecognized VALKEY_INDEX_TYPE={self.index_type!r}; falling back to FLAT.') - vector_attrs = VectorFieldAttributesFlat( + vector_attrs = g['VectorFieldAttributesFlat']( dimensions=dimension, distance_metric=distance_metric, - type=VectorType.FLOAT32, + type=g['VectorType'].FLOAT32, ) - algo = VectorAlgorithm.FLAT + algo = g['VectorAlgorithm'].FLAT schema = [ - VectorField(name='vector', algorithm=algo, attributes=vector_attrs), - TextField(name='text'), - TagField(name='id'), - TextField(name='metadata_json'), - TagField(name='hash'), - TagField(name='file_id'), - TagField(name='source'), - TagField(name='knowledge_base_id'), + g['VectorField'](name='vector', algorithm=algo, attributes=vector_attrs), + g['TextField'](name='text'), + g['TagField'](name='id'), + g['TextField'](name='metadata_json'), + g['TagField'](name='hash'), + g['TagField'](name='file_id'), + g['TagField'](name='source'), + g['TagField'](name='knowledge_base_id'), ] - options = FtCreateOptions(data_type=DataType.HASH, prefixes=[prefix]) + options = g['FtCreateOptions'](data_type=g['DataType'].HASH, prefixes=[prefix]) try: - glide_ft.create(self.client, index_name, schema, options) + g['glide_ft'].create(self.client, index_name, schema, options) log.info( f'Created Valkey index {index_name} with dimension={dimension}, ' f'type={self.index_type}, metric={self.distance_metric}' ) - except RequestError as e: + except g['RequestError'] as e: if 'already exists' in str(e).lower(): log.debug(f'Index {index_name} already exists, skipping creation.') else: @@ -356,7 +398,7 @@ class ValkeyClient(VectorDBBase): def _verify_collection_dimension(self, collection_name: str, dimension: int) -> None: index_name = self._index_name(collection_name) try: - info = glide_ft.info(self.client, index_name) + info = self._g['glide_ft'].info(self.client, index_name) except Exception as e: log.warning(f'Could not FT.INFO {index_name} for dimension check, skipping: {e}') return @@ -402,9 +444,9 @@ class ValkeyClient(VectorDBBase): def has_collection(self, collection_name: str) -> bool: index_name = self._index_name(collection_name) try: - glide_ft.info(self.client, index_name) + self._g['glide_ft'].info(self.client, index_name) return True - except RequestError as e: + except self._g['RequestError'] as e: msg = str(e).lower() if 'no such index' in msg or 'unknown index' in msg or 'not found in database' in msg: return False @@ -414,9 +456,9 @@ class ValkeyClient(VectorDBBase): def delete_collection(self, collection_name: str): index_name = self._index_name(collection_name) try: - glide_ft.dropindex(self.client, index_name) + self._g['glide_ft'].dropindex(self.client, index_name) log.info(f'Dropped index {index_name}') - except RequestError as e: + except self._g['RequestError'] as e: log.debug(f'Could not drop index {index_name}: {e}') self._delete_keys_by_prefix(self._key_prefix(collection_name)) @@ -475,13 +517,14 @@ class ValkeyClient(VectorDBBase): else f'*=>[KNN {limit} @vector $query_vec]' ) + g = self._g try: - opts = FtSearchOptions( + opts = g['FtSearchOptions']( params={'query_vec': _vector_to_bytes(vectors[0])}, - limit=FtSearchLimit(offset=0, count=limit), + limit=g['FtSearchLimit'](offset=0, count=limit), ) - result = glide_ft.search(self.client, self._index_name(collection_name), query_str, opts) - except RequestError as e: + result = g['glide_ft'].search(self.client, self._index_name(collection_name), query_str, opts) + except g['RequestError'] as e: log.error(f'Valkey search error on collection {collection_name}: {e}') return None @@ -505,17 +548,18 @@ class ValkeyClient(VectorDBBase): f'capping at {effective_limit} results. Pass an explicit limit to avoid silent truncation.' ) + g = self._g try: - opts = FtSearchOptions( + opts = g['FtSearchOptions']( return_fields=[ - ReturnField(field_identifier='id'), - ReturnField(field_identifier='text'), - ReturnField(field_identifier='metadata_json'), + g['ReturnField'](field_identifier='id'), + g['ReturnField'](field_identifier='text'), + g['ReturnField'](field_identifier='metadata_json'), ], - limit=FtSearchLimit(offset=0, count=effective_limit), + limit=g['FtSearchLimit'](offset=0, count=effective_limit), ) - result = glide_ft.search(self.client, self._index_name(collection_name), query_str, opts) - except RequestError as e: + result = g['glide_ft'].search(self.client, self._index_name(collection_name), query_str, opts) + except g['RequestError'] as e: log.error(f'Valkey query error on collection {collection_name}: {e}') return None @@ -535,7 +579,7 @@ class ValkeyClient(VectorDBBase): cursor = _decode(scan_result[0]) keys = scan_result[1] if keys: - batch = Batch(is_atomic=False) + batch = self._g['Batch'](is_atomic=False) for key in keys: batch.hgetall(key) results = self.client.exec(batch, raise_on_error=False) or [] @@ -565,7 +609,7 @@ class ValkeyClient(VectorDBBase): keys = [self._item_key(collection_name, item_id) for item_id in ids] try: self.batch_client.delete(keys) - except RequestError as e: + except self._g['RequestError'] as e: log.error(f'Valkey delete error on collection {collection_name}: {e}') return @@ -578,14 +622,15 @@ class ValkeyClient(VectorDBBase): index_name = self._index_name(collection_name) page_size = 10000 + g = self._g while True: try: - opts = FtSearchOptions( - return_fields=[ReturnField(field_identifier='id')], - limit=FtSearchLimit(offset=0, count=page_size), + opts = g['FtSearchOptions']( + return_fields=[g['ReturnField'](field_identifier='id')], + limit=g['FtSearchLimit'](offset=0, count=page_size), ) - result = glide_ft.search(self.client, index_name, filter_expr, opts) - except RequestError as e: + result = g['glide_ft'].search(self.client, index_name, filter_expr, opts) + except g['RequestError'] as e: log.error(f'Valkey delete-by-filter error on collection {collection_name}: {e}') return @@ -601,6 +646,7 @@ class ValkeyClient(VectorDBBase): return def reset(self): + glide_ft = self._g['glide_ft'] collections: list[str] = [] try: indexes = glide_ft.list(self.client) or [] diff --git a/backend/requirements.txt b/backend/requirements.txt index 23f7cdf386..425a72b254 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -124,7 +124,7 @@ pgvector==0.4.2 PyMySQL==1.1.2 boto3==1.42.62 # mariadb==1.1.14 should be added if you want to support MariaDB -valkey-glide-sync==2.3.1 +# valkey-glide-sync==2.3.1 # optional: install manually if VECTOR_DB=valkey pymilvus==2.6.9 qdrant-client==1.17.0 diff --git a/pyproject.toml b/pyproject.toml index 96adb297ea..7d98f25b41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ dependencies = [ "pycrdt==0.12.47", "redis==7.4.0", - "valkey-glide-sync==2.3.1", + # "valkey-glide-sync==2.3.1", # optional: install manually if VECTOR_DB=valkey "pytz==2026.1.post1", "APScheduler==3.11.2",