This commit is contained in:
Timothy Jaeryang Baek
2026-06-01 12:24:45 -07:00
parent 567c4aabe9
commit c0f1aa2919
3 changed files with 119 additions and 73 deletions
+111 -65
View File
@@ -8,6 +8,29 @@ import re
import struct
from urllib.parse import urlparse
from open_webui.config import (
VALKEY_COLLECTION_PREFIX,
VALKEY_DISTANCE_METRIC,
VALKEY_HNSW_EF_CONSTRUCTION,
VALKEY_HNSW_EF_RUNTIME,
VALKEY_HNSW_M,
VALKEY_INDEX_TYPE,
VALKEY_URL,
)
from open_webui.retrieval.vector.main import (
GetResult,
SearchResult,
VectorDBBase,
VectorItem,
)
from open_webui.retrieval.vector.utils import process_metadata
log = logging.getLogger(__name__)
def _import_glide():
"""Lazily import glide_sync so the module can be loaded without valkey-glide-sync installed."""
try:
from glide_sync import (
Batch,
DataType,
@@ -31,24 +54,33 @@ from glide_sync import (
from glide_sync import (
ft as glide_ft,
)
from open_webui.config import (
VALKEY_COLLECTION_PREFIX,
VALKEY_DISTANCE_METRIC,
VALKEY_HNSW_EF_CONSTRUCTION,
VALKEY_HNSW_EF_RUNTIME,
VALKEY_HNSW_M,
VALKEY_INDEX_TYPE,
VALKEY_URL,
)
from open_webui.retrieval.vector.main import (
GetResult,
SearchResult,
VectorDBBase,
VectorItem,
)
from open_webui.retrieval.vector.utils import process_metadata
except ImportError as e:
raise ImportError(
'valkey-glide-sync is required when VECTOR_DB=valkey. '
'Install it with: pip install valkey-glide-sync==2.3.1'
) from e
return {
'Batch': Batch,
'DataType': DataType,
'DistanceMetricType': DistanceMetricType,
'FtCreateOptions': FtCreateOptions,
'FtSearchLimit': FtSearchLimit,
'FtSearchOptions': FtSearchOptions,
'GlideClient': GlideClient,
'GlideClientConfiguration': GlideClientConfiguration,
'NodeAddress': NodeAddress,
'RequestError': RequestError,
'ReturnField': ReturnField,
'TagField': TagField,
'TextField': TextField,
'VectorAlgorithm': VectorAlgorithm,
'VectorField': VectorField,
'VectorFieldAttributesFlat': VectorFieldAttributesFlat,
'VectorFieldAttributesHnsw': VectorFieldAttributesHnsw,
'VectorType': VectorType,
'glide_ft': glide_ft,
}
log = logging.getLogger(__name__)
# valkey-search 1.2.0 requires Valkey core 9.0.1+ per upstream release notes.
# Unlike RediSearch (dialects 1-4), valkey-search only implements DIALECT 2 — GLIDE's
@@ -62,11 +94,6 @@ _NEVER_MATCH_SENTINEL = '__open_webui_valkey_never_match__'
# Compile once at module load — includes `?` which is a single-char wildcard in TAG queries.
_TAG_SPECIAL_RE = re.compile(r'([,.<>{}\[\]"\':;!@#$%^&*()\-+=~?\\/| \t\n\r])')
_DISTANCE_METRIC_MAP = {
'COSINE': DistanceMetricType.COSINE,
'L2': DistanceMetricType.L2,
'IP': DistanceMetricType.IP,
}
_SAFE_FIELD_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
@@ -130,6 +157,9 @@ class ValkeyClient(VectorDBBase):
'Set it to your Valkey server URL (e.g., valkey://localhost:6379).'
)
# Lazily import glide_sync — only needed when this backend is actually used.
self._g = _import_glide()
# Validate distance metric at init — invalid values pass through to FT.CREATE
# and fail with a cryptic server error.
metric = VALKEY_DISTANCE_METRIC.upper()
@@ -139,6 +169,13 @@ class ValkeyClient(VectorDBBase):
f'Must be one of: {", ".join(sorted(_VALID_DISTANCE_METRICS))}.'
)
DistanceMetricType = self._g['DistanceMetricType']
self._distance_metric_map = {
'COSINE': DistanceMetricType.COSINE,
'L2': DistanceMetricType.L2,
'IP': DistanceMetricType.IP,
}
self.collection_prefix = VALKEY_COLLECTION_PREFIX
self.index_type = VALKEY_INDEX_TYPE
self.distance_metric = metric
@@ -148,6 +185,10 @@ class ValkeyClient(VectorDBBase):
port = parsed.port or 6379
db = int(parsed.path.lstrip('/') or 0)
GlideClientConfiguration = self._g['GlideClientConfiguration']
NodeAddress = self._g['NodeAddress']
GlideClient = self._g['GlideClient']
config = GlideClientConfiguration(
addresses=[NodeAddress(host=host, port=port)],
database_id=db if db else None,
@@ -155,7 +196,7 @@ class ValkeyClient(VectorDBBase):
client_name="open_webui_vector_store_client",
)
try:
self.client: GlideClient = GlideClient.create(config)
self.client = GlideClient.create(config)
except Exception as e:
raise ConnectionError(f'Failed to connect to Valkey at {host}:{port}: {e}') from e
@@ -168,7 +209,7 @@ class ValkeyClient(VectorDBBase):
client_name="open_webui_vector_store_batch_client",
)
try:
self.batch_client: GlideClient = GlideClient.create(batch_config)
self.batch_client = GlideClient.create(batch_config)
except Exception as e:
raise ConnectionError(f'Failed to create batch write client for Valkey at {host}:{port}: {e}') from e
@@ -304,50 +345,51 @@ class ValkeyClient(VectorDBBase):
def _create_index(self, collection_name: str, dimension: int) -> None:
"""Create an FT index for a collection with the given vector dimension."""
g = self._g
index_name = self._index_name(collection_name)
prefix = self._key_prefix(collection_name)
distance_metric = _DISTANCE_METRIC_MAP[self.distance_metric]
distance_metric = self._distance_metric_map[self.distance_metric]
if self.index_type == 'HNSW':
vector_attrs = VectorFieldAttributesHnsw(
vector_attrs = g['VectorFieldAttributesHnsw'](
dimensions=dimension,
distance_metric=distance_metric,
type=VectorType.FLOAT32,
type=g['VectorType'].FLOAT32,
number_of_edges=VALKEY_HNSW_M,
vectors_examined_on_construction=VALKEY_HNSW_EF_CONSTRUCTION,
vectors_examined_on_runtime=VALKEY_HNSW_EF_RUNTIME,
)
algo = VectorAlgorithm.HNSW
algo = g['VectorAlgorithm'].HNSW
else:
if self.index_type != 'FLAT':
log.warning(f'Unrecognized VALKEY_INDEX_TYPE={self.index_type!r}; falling back to FLAT.')
vector_attrs = VectorFieldAttributesFlat(
vector_attrs = g['VectorFieldAttributesFlat'](
dimensions=dimension,
distance_metric=distance_metric,
type=VectorType.FLOAT32,
type=g['VectorType'].FLOAT32,
)
algo = VectorAlgorithm.FLAT
algo = g['VectorAlgorithm'].FLAT
schema = [
VectorField(name='vector', algorithm=algo, attributes=vector_attrs),
TextField(name='text'),
TagField(name='id'),
TextField(name='metadata_json'),
TagField(name='hash'),
TagField(name='file_id'),
TagField(name='source'),
TagField(name='knowledge_base_id'),
g['VectorField'](name='vector', algorithm=algo, attributes=vector_attrs),
g['TextField'](name='text'),
g['TagField'](name='id'),
g['TextField'](name='metadata_json'),
g['TagField'](name='hash'),
g['TagField'](name='file_id'),
g['TagField'](name='source'),
g['TagField'](name='knowledge_base_id'),
]
options = FtCreateOptions(data_type=DataType.HASH, prefixes=[prefix])
options = g['FtCreateOptions'](data_type=g['DataType'].HASH, prefixes=[prefix])
try:
glide_ft.create(self.client, index_name, schema, options)
g['glide_ft'].create(self.client, index_name, schema, options)
log.info(
f'Created Valkey index {index_name} with dimension={dimension}, '
f'type={self.index_type}, metric={self.distance_metric}'
)
except RequestError as e:
except g['RequestError'] as e:
if 'already exists' in str(e).lower():
log.debug(f'Index {index_name} already exists, skipping creation.')
else:
@@ -356,7 +398,7 @@ class ValkeyClient(VectorDBBase):
def _verify_collection_dimension(self, collection_name: str, dimension: int) -> None:
index_name = self._index_name(collection_name)
try:
info = glide_ft.info(self.client, index_name)
info = self._g['glide_ft'].info(self.client, index_name)
except Exception as e:
log.warning(f'Could not FT.INFO {index_name} for dimension check, skipping: {e}')
return
@@ -402,9 +444,9 @@ class ValkeyClient(VectorDBBase):
def has_collection(self, collection_name: str) -> bool:
index_name = self._index_name(collection_name)
try:
glide_ft.info(self.client, index_name)
self._g['glide_ft'].info(self.client, index_name)
return True
except RequestError as e:
except self._g['RequestError'] as e:
msg = str(e).lower()
if 'no such index' in msg or 'unknown index' in msg or 'not found in database' in msg:
return False
@@ -414,9 +456,9 @@ class ValkeyClient(VectorDBBase):
def delete_collection(self, collection_name: str):
index_name = self._index_name(collection_name)
try:
glide_ft.dropindex(self.client, index_name)
self._g['glide_ft'].dropindex(self.client, index_name)
log.info(f'Dropped index {index_name}')
except RequestError as e:
except self._g['RequestError'] as e:
log.debug(f'Could not drop index {index_name}: {e}')
self._delete_keys_by_prefix(self._key_prefix(collection_name))
@@ -475,13 +517,14 @@ class ValkeyClient(VectorDBBase):
else f'*=>[KNN {limit} @vector $query_vec]'
)
g = self._g
try:
opts = FtSearchOptions(
opts = g['FtSearchOptions'](
params={'query_vec': _vector_to_bytes(vectors[0])},
limit=FtSearchLimit(offset=0, count=limit),
limit=g['FtSearchLimit'](offset=0, count=limit),
)
result = glide_ft.search(self.client, self._index_name(collection_name), query_str, opts)
except RequestError as e:
result = g['glide_ft'].search(self.client, self._index_name(collection_name), query_str, opts)
except g['RequestError'] as e:
log.error(f'Valkey search error on collection {collection_name}: {e}')
return None
@@ -505,17 +548,18 @@ class ValkeyClient(VectorDBBase):
f'capping at {effective_limit} results. Pass an explicit limit to avoid silent truncation.'
)
g = self._g
try:
opts = FtSearchOptions(
opts = g['FtSearchOptions'](
return_fields=[
ReturnField(field_identifier='id'),
ReturnField(field_identifier='text'),
ReturnField(field_identifier='metadata_json'),
g['ReturnField'](field_identifier='id'),
g['ReturnField'](field_identifier='text'),
g['ReturnField'](field_identifier='metadata_json'),
],
limit=FtSearchLimit(offset=0, count=effective_limit),
limit=g['FtSearchLimit'](offset=0, count=effective_limit),
)
result = glide_ft.search(self.client, self._index_name(collection_name), query_str, opts)
except RequestError as e:
result = g['glide_ft'].search(self.client, self._index_name(collection_name), query_str, opts)
except g['RequestError'] as e:
log.error(f'Valkey query error on collection {collection_name}: {e}')
return None
@@ -535,7 +579,7 @@ class ValkeyClient(VectorDBBase):
cursor = _decode(scan_result[0])
keys = scan_result[1]
if keys:
batch = Batch(is_atomic=False)
batch = self._g['Batch'](is_atomic=False)
for key in keys:
batch.hgetall(key)
results = self.client.exec(batch, raise_on_error=False) or []
@@ -565,7 +609,7 @@ class ValkeyClient(VectorDBBase):
keys = [self._item_key(collection_name, item_id) for item_id in ids]
try:
self.batch_client.delete(keys)
except RequestError as e:
except self._g['RequestError'] as e:
log.error(f'Valkey delete error on collection {collection_name}: {e}')
return
@@ -578,14 +622,15 @@ class ValkeyClient(VectorDBBase):
index_name = self._index_name(collection_name)
page_size = 10000
g = self._g
while True:
try:
opts = FtSearchOptions(
return_fields=[ReturnField(field_identifier='id')],
limit=FtSearchLimit(offset=0, count=page_size),
opts = g['FtSearchOptions'](
return_fields=[g['ReturnField'](field_identifier='id')],
limit=g['FtSearchLimit'](offset=0, count=page_size),
)
result = glide_ft.search(self.client, index_name, filter_expr, opts)
except RequestError as e:
result = g['glide_ft'].search(self.client, index_name, filter_expr, opts)
except g['RequestError'] as e:
log.error(f'Valkey delete-by-filter error on collection {collection_name}: {e}')
return
@@ -601,6 +646,7 @@ class ValkeyClient(VectorDBBase):
return
def reset(self):
glide_ft = self._g['glide_ft']
collections: list[str] = []
try:
indexes = glide_ft.list(self.client) or []
+1 -1
View File
@@ -124,7 +124,7 @@ pgvector==0.4.2
PyMySQL==1.1.2
boto3==1.42.62
# mariadb==1.1.14 should be added if you want to support MariaDB
valkey-glide-sync==2.3.1
# valkey-glide-sync==2.3.1 # optional: install manually if VECTOR_DB=valkey
pymilvus==2.6.9
qdrant-client==1.17.0
+1 -1
View File
@@ -41,7 +41,7 @@ dependencies = [
"pycrdt==0.12.47",
"redis==7.4.0",
"valkey-glide-sync==2.3.1",
# "valkey-glide-sync==2.3.1", # optional: install manually if VECTOR_DB=valkey
"pytz==2026.1.post1",
"APScheduler==3.11.2",