From ade5de465fbd26a875dacd8bbacb0c5286b3ff8d Mon Sep 17 00:00:00 2001 From: Shyam Venkat Date: Tue, 5 Aug 2025 10:19:48 +0530 Subject: [PATCH 1/6] gsi vector search support --- .../vector_stores/couchbase/__init__.py | 18 +- .../vector_stores/couchbase/base.py | 519 ++++++++++--- .../test_couchbase_query_vector_store.py | 720 ++++++++++++++++++ .../tests/test_document_store_integration.py | 348 +++++++++ 4 files changed, 1499 insertions(+), 106 deletions(-) create mode 100644 llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_query_vector_store.py create mode 100644 llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_document_store_integration.py diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/__init__.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/__init__.py index f82ef7f2ff..718a10c568 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/__init__.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/__init__.py @@ -1,7 +1,17 @@ +"""Couchbase vector stores.""" + from llama_index.vector_stores.couchbase.base import ( - CouchbaseVectorStore, - CouchbaseSearchVectorStore, + CouchbaseVectorStore, # Deprecated + CouchbaseSearchVectorStore, # FTS-based + CouchbaseQueryVectorStore, # GSI-based with BHIVE support + CouchbaseVectorStoreBase, # Base class + QueryVectorSearchType, # Enum for search types ) - -__all__ = ["CouchbaseVectorStore", "CouchbaseSearchVectorStore"] +__all__ = [ + "CouchbaseVectorStore", + "CouchbaseSearchVectorStore", + "CouchbaseQueryVectorStore", + "CouchbaseVectorStoreBase", + "QueryVectorSearchType", +] diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py index 2824cc0650..71cef32733 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py @@ -4,16 +4,24 @@ import logging import warnings -from typing import Any, Dict, List, Optional +from datetime import timedelta +from enum import Enum +from typing import Any, Dict, List, Optional, Union from llama_index.core.bridge.pydantic import PrivateAttr from llama_index.core.schema import BaseNode, MetadataMode, TextNode from llama_index.core.vector_stores.types import ( BasePydanticVectorStore, + MetadataFilter, MetadataFilters, + FilterOperator, VectorStoreQuery, VectorStoreQueryResult, ) +import couchbase.search as search +from couchbase.options import SearchOptions, QueryOptions +from couchbase.vector_search import VectorQuery, VectorSearch + from llama_index.core.vector_stores.utils import ( metadata_dict_to_node, node_to_metadata_dict, @@ -22,6 +30,13 @@ logger = logging.getLogger(__name__) +class QueryVectorSearchType(str, Enum): + """Enum for search types supported by Couchbase GSI.""" + + ANN = "ANN" + KNN = "KNN" + + def _transform_couchbase_filter_condition(condition: str) -> str: """ Convert standard metadata filter condition to Couchbase specific condition. @@ -115,12 +130,89 @@ def _to_couchbase_filter(standard_filters: MetadataFilters) -> Dict[str, Any]: return {"query": filters} -class CouchbaseSearchVectorStore(BasePydanticVectorStore): +def _convert_llamaindex_filters_to_sql( + filters: MetadataFilters, metadata_key: str +) -> str: """ - Couchbase Vector Store. + Convert LlamaIndex MetadataFilters to SQL++ WHERE clause. - To use, you should have the ``couchbase`` python package installed. + Args: + filters: LlamaIndex MetadataFilters object + metadata_key: The metadata field prefix for the document + + Returns: + SQL++ WHERE clause string + + """ + if not filters or not filters.filters: + return "" + + def _build_condition(filter_item: Any) -> str: + """Build a single SQL++ condition from a MetadataFilter.""" + field_name = f"d.{metadata_key}.{filter_item.key}" + + if filter_item.operator == FilterOperator.EQ: + if isinstance(filter_item.value, str): + return f"{field_name} = '{filter_item.value}'" + else: + return f"{field_name} = {filter_item.value}" + elif filter_item.operator == FilterOperator.NE: + if isinstance(filter_item.value, str): + return f"{field_name} != '{filter_item.value}'" + else: + return f"{field_name} != {filter_item.value}" + elif filter_item.operator == FilterOperator.GT: + return f"{field_name} > {filter_item.value}" + elif filter_item.operator == FilterOperator.GTE: + return f"{field_name} >= {filter_item.value}" + elif filter_item.operator == FilterOperator.LT: + return f"{field_name} < {filter_item.value}" + elif filter_item.operator == FilterOperator.LTE: + return f"{field_name} <= {filter_item.value}" + elif filter_item.operator == FilterOperator.IN: + if isinstance(filter_item.value, list): + values = ", ".join( + [ + f"'{v}'" if isinstance(v, str) else str(v) + for v in filter_item.value + ] + ) + return f"{field_name} IN [{values}]" + else: + raise ValueError( + f"'in' operator expects a list value, got {type(filter_item.value)}" + ) + else: + raise ValueError(f"Unsupported filter operator: {filter_item.operator}") + + # Build conditions for all filters + filter_conditions = [] + for filter_item in filters.filters: + if isinstance(filter_item, MetadataFilter): + condition = _build_condition(filter_item) + filter_conditions.append(condition) + elif isinstance(filter_item, MetadataFilters): + condition = ( + "(" + + _convert_llamaindex_filters_to_sql(filter_item, metadata_key) + + ")" + ) + filter_conditions.append(condition) + else: + logger.warning(f"Unsupported filter type: {type(filter_item)}") + continue + + if not filter_conditions: + return "" + + # Join conditions based on the filter condition (AND/OR) + condition_connector = " AND " if filters.condition == "and" else " OR " + return condition_connector.join(filter_conditions) + +class CouchbaseVectorStoreBase(BasePydanticVectorStore): + """ + Base class for Couchbase Vector Stores providing common database operations. """ stores_text: bool = True @@ -135,12 +227,9 @@ class CouchbaseSearchVectorStore(BasePydanticVectorStore): _bucket_name: str = PrivateAttr() _scope_name: str = PrivateAttr() _collection_name: str = PrivateAttr() - _index_name: str = PrivateAttr() - _id_key: str = PrivateAttr() _text_key: str = PrivateAttr() _embedding_key: str = PrivateAttr() _metadata_key: str = PrivateAttr() - _scoped_index: bool = PrivateAttr() def __init__( self, @@ -148,29 +237,24 @@ def __init__( bucket_name: str, scope_name: str, collection_name: str, - index_name: str, text_key: Optional[str] = "text", embedding_key: Optional[str] = "embedding", metadata_key: Optional[str] = "metadata", - scoped_index: bool = True, ) -> None: """ - Initializes a connection to a Couchbase Vector Store. + Base initialization for Couchbase Vector Stores. Args: cluster (Cluster): Couchbase cluster object with active connection. bucket_name (str): Name of bucket to store documents in. scope_name (str): Name of scope in the bucket to store documents in. collection_name (str): Name of collection in the scope to store documents in. - index_name (str): Name of the Search index. text_key (Optional[str], optional): The field for the document text. Defaults to "text". embedding_key (Optional[str], optional): The field for the document embedding. Defaults to "embedding". metadata_key (Optional[str], optional): The field for the document metadata. Defaults to "metadata". - scoped_index (Optional[bool]): specify whether the index is a scoped index. - Set to True by default. Returns: None @@ -202,17 +286,12 @@ def __init__( if not collection_name: raise ValueError("collection_name must be provided.") - if not index_name: - raise ValueError("index_name must be provided.") - self._bucket_name = bucket_name self._scope_name = scope_name self._collection_name = collection_name self._text_key = text_key self._embedding_key = embedding_key - self._index_name = index_name self._metadata_key = metadata_key - self._scoped_index = scoped_index # Check if the bucket exists if not self._check_bucket_exists(): @@ -237,16 +316,6 @@ def __init__( except Exception as e: raise - # Check if the index exists. Throws ValueError if it doesn't - try: - self._check_index_exists() - except Exception as e: - raise - - self._bucket = self._cluster.bucket(self._bucket_name) - self._scope = self._bucket.scope(self._scope_name) - self._collection = self._scope.collection(self._collection_name) - def add(self, nodes: List[BaseNode], **kwargs: Any) -> List[str]: """ Add nodes to the collection and return their document IDs. @@ -326,6 +395,153 @@ def delete(self, ref_doc_id: str, **kwargs: Any) -> None: logger.error(f"Error deleting document {ref_doc_id}") raise + @property + def client(self) -> Any: + """ + Property function to access the client attribute. + """ + return self._cluster + + def _check_bucket_exists(self) -> bool: + """ + Check if the bucket exists in the linked Couchbase cluster. + + Returns: + True if the bucket exists + + """ + bucket_manager = self._cluster.buckets() + try: + bucket_manager.get_bucket(self._bucket_name) + return True + except Exception as e: + logger.debug("Error checking if bucket exists:", e) + return False + + def _check_scope_and_collection_exists(self) -> bool: + """ + Check if the scope and collection exists in the linked Couchbase bucket + Returns: + True if the scope and collection exist in the bucket + Raises a ValueError if either is not found. + """ + scope_collection_map: Dict[str, Any] = {} + + # Get a list of all scopes in the bucket + for scope in self._bucket.collections().get_all_scopes(): + scope_collection_map[scope.name] = [] + + # Get a list of all the collections in the scope + for collection in scope.collections: + scope_collection_map[scope.name].append(collection.name) + + # Check if the scope exists + if self._scope_name not in scope_collection_map: + raise ValueError( + f"Scope {self._scope_name} not found in Couchbase " + f"bucket {self._bucket_name}" + ) + + # Check if the collection exists in the scope + if self._collection_name not in scope_collection_map[self._scope_name]: + raise ValueError( + f"Collection {self._collection_name} not found in scope " + f"{self._scope_name} in Couchbase bucket {self._bucket_name}" + ) + + return True + + def _format_metadata(self, row_fields: Dict[str, Any]) -> Dict[str, Any]: + """ + Helper method to format the metadata from the Couchbase Search API. + + Args: + row_fields (Dict[str, Any]): The fields to format. + + Returns: + Dict[str, Any]: The formatted metadata. + + """ + metadata = {} + for key, value in row_fields.items(): + # Couchbase Search returns the metadata key with a prefix + # `metadata.` We remove it to get the original metadata key + if key.startswith(self._metadata_key): + new_key = key.split(self._metadata_key + ".")[-1] + metadata[new_key] = value + else: + metadata[key] = value + + return metadata + + +class CouchbaseSearchVectorStore(CouchbaseVectorStoreBase): + """ + Couchbase Vector Store using Full-Text Search (FTS). + + To use, you should have the ``couchbase`` python package installed. + + """ + + _index_name: str = PrivateAttr() + _scoped_index: bool = PrivateAttr() + + def __init__( + self, + cluster: Any, + bucket_name: str, + scope_name: str, + collection_name: str, + index_name: str, + text_key: Optional[str] = "text", + embedding_key: Optional[str] = "embedding", + metadata_key: Optional[str] = "metadata", + scoped_index: bool = True, + ) -> None: + """ + Initializes a connection to a Couchbase Vector Store using FTS. + + Args: + cluster (Cluster): Couchbase cluster object with active connection. + bucket_name (str): Name of bucket to store documents in. + scope_name (str): Name of scope in the bucket to store documents in. + collection_name (str): Name of collection in the scope to store documents in. + index_name (str): Name of the Search index. + text_key (Optional[str], optional): The field for the document text. + Defaults to "text". + embedding_key (Optional[str], optional): The field for the document embedding. + Defaults to "embedding". + metadata_key (Optional[str], optional): The field for the document metadata. + Defaults to "metadata". + scoped_index (Optional[bool]): specify whether the index is a scoped index. + Set to True by default. + + Returns: + None + + """ + super().__init__( + cluster=cluster, + bucket_name=bucket_name, + scope_name=scope_name, + collection_name=collection_name, + text_key=text_key, + embedding_key=embedding_key, + metadata_key=metadata_key, + ) + + if not index_name: + raise ValueError("index_name must be provided.") + + self._index_name = index_name + self._scoped_index = scoped_index + + # Check if the index exists. Throws ValueError if it doesn't + try: + self._check_index_exists() + except Exception as e: + raise + def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult: """ Executes a query in the vector store and returns the result. @@ -339,10 +555,6 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul VectorStoreQueryResult: The result of the query containing the top-k nodes, similarities, and ids. """ - import couchbase.search as search - from couchbase.options import SearchOptions - from couchbase.vector_search import VectorQuery, VectorSearch - fields = query.output_fields if not fields: @@ -432,62 +644,6 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul nodes=top_k_nodes, similarities=top_k_scores, ids=top_k_ids ) - @property - def client(self) -> Any: - """ - Property function to access the client attribute. - """ - return self._cluster - - def _check_bucket_exists(self) -> bool: - """ - Check if the bucket exists in the linked Couchbase cluster. - - Returns: - True if the bucket exists - - """ - bucket_manager = self._cluster.buckets() - try: - bucket_manager.get_bucket(self._bucket_name) - return True - except Exception as e: - logger.debug("Error checking if bucket exists:", e) - return False - - def _check_scope_and_collection_exists(self) -> bool: - """ - Check if the scope and collection exists in the linked Couchbase bucket - Returns: - True if the scope and collection exist in the bucket - Raises a ValueError if either is not found. - """ - scope_collection_map: Dict[str, Any] = {} - - # Get a list of all scopes in the bucket - for scope in self._bucket.collections().get_all_scopes(): - scope_collection_map[scope.name] = [] - - # Get a list of all the collections in the scope - for collection in scope.collections: - scope_collection_map[scope.name].append(collection.name) - - # Check if the scope exists - if self._scope_name not in scope_collection_map: - raise ValueError( - f"Scope {self._scope_name} not found in Couchbase " - f"bucket {self._bucket_name}" - ) - - # Check if the collection exists in the scope - if self._collection_name not in scope_collection_map[self._scope_name]: - raise ValueError( - f"Collection {self._collection_name} not found in scope " - f"{self._scope_name} in Couchbase bucket {self._bucket_name}" - ) - - return True - def _check_index_exists(self) -> bool: """ Check if the Search index exists in the linked Couchbase cluster @@ -516,28 +672,187 @@ def _check_index_exists(self) -> bool: return True - def _format_metadata(self, row_fields: Dict[str, Any]) -> Dict[str, Any]: + +class CouchbaseQueryVectorStore(CouchbaseVectorStoreBase): + """ + Couchbase Vector Store using Global Secondary Index (GSI) with vector search capabilities. + + This implementation supports: + - BHIVE indexes for high-performance ANN vector search + - Composite Secondary Indexes with vector search functions + - Various similarity metrics (cosine, euclidean, dot_product) + """ + + _search_type: QueryVectorSearchType = PrivateAttr() + _similarity: str = PrivateAttr() + _query_timeout: timedelta = PrivateAttr() + + def __init__( + self, + cluster: Any, + bucket_name: str, + scope_name: str, + collection_name: str, + search_type: Union[QueryVectorSearchType, str] = QueryVectorSearchType.ANN, + similarity: str = "cosine", + nprobes: Optional[int] = None, + text_key: Optional[str] = "text", + embedding_key: Optional[str] = "embedding", + metadata_key: Optional[str] = "metadata", + query_timeout: Optional[timedelta] = None, + ) -> None: """ - Helper method to format the metadata from the Couchbase Search API. + Initializes a connection to a Couchbase Vector Store using GSI. Args: - row_fields (Dict[str, Any]): The fields to format. + cluster (Cluster): Couchbase cluster object with active connection. + bucket_name (str): Name of bucket to store documents in. + scope_name (str): Name of scope in the bucket to store documents in. + collection_name (str): Name of collection in the scope to store documents in. + search_type (Union[QueryVectorSearchType, str]): Type of vector search (ANN or KNN). + Defaults to ANN. + similarity (str): Similarity metric to use (cosine, euclidean, dot_product). + Defaults to "cosine". + nprobes (Optional[int], optional): Number of probes for the ANN search. + Defaults to None, uses the value set at index creation time. + text_key (Optional[str], optional): The field for the document text. + Defaults to "text". + embedding_key (Optional[str], optional): The field for the document embedding. + Defaults to "embedding". + metadata_key (Optional[str], optional): The field for the document metadata. + Defaults to "metadata". + query_timeout (Optional[timedelta]): Timeout for SQL++ queries. + Defaults to 60 seconds. Returns: - Dict[str, Any]: The formatted metadata. + None """ - metadata = {} - for key, value in row_fields.items(): - # Couchbase Search returns the metadata key with a prefix - # `metadata.` We remove it to get the original metadata key - if key.startswith(self._metadata_key): - new_key = key.split(self._metadata_key + ".")[-1] - metadata[new_key] = value - else: - metadata[key] = value + super().__init__( + cluster=cluster, + bucket_name=bucket_name, + scope_name=scope_name, + collection_name=collection_name, + text_key=text_key, + embedding_key=embedding_key, + metadata_key=metadata_key, + ) - return metadata + if isinstance(search_type, str): + search_type = QueryVectorSearchType(search_type) + + self._search_type = search_type + self._similarity = similarity + self._query_timeout = query_timeout or timedelta(seconds=60) + self._nprobes = nprobes + + def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult: + """ + Executes a vector similarity query using GSI. + + Args: + query (VectorStoreQuery): The query object containing the search parameters. + **kwargs (Any): Additional keyword arguments. + + Returns: + VectorStoreQueryResult: The result of the query containing the top-k nodes, similarities, and ids. + + """ + if not query.query_embedding: + raise ValueError("Query embedding must not be empty") + + k = query.similarity_top_k + query_context = ( + f"{self._bucket_name}.{self._scope_name}.{self._collection_name}" + ) + + # Convert embedding to string representation for query + query_vector_str = str(query.query_embedding) + + # Handle filters if provided + where_clause = "" + if query.filters: + try: + # Convert LlamaIndex filters to SQL++ conditions + filter_sql = _convert_llamaindex_filters_to_sql( + query.filters, self._metadata_key + ) + if filter_sql: + where_clause = f"WHERE {filter_sql}" + except Exception as e: + logger.warning(f"Failed to process filters: {e}") + + if query.output_fields: + fields = query.output_fields.join(",") + else: + fields = "*, meta().id as id" + + nprobes = self._nprobes + if kwargs.get("nprobes"): + nprobes = kwargs.get("nprobes") + + # Determine the appropriate distance function based on search type + if self._search_type == QueryVectorSearchType.ANN: + nprobes_exp = f", {nprobes}" if nprobes else "" + distance_function_exp = f"APPROX_VECTOR_DISTANCE(d.{self._embedding_key}, {query_vector_str}, '{self._similarity}'{nprobes_exp})" + else: + distance_function_exp = f"VECTOR_DISTANCE(d.{self._embedding_key}, {query_vector_str}, '{self._similarity}')" + + # Build the SQL++ query + query_str = f""" + SELECT {fields}, {distance_function_exp} as distance + FROM {query_context} d + {where_clause} + ORDER BY distance + LIMIT {k} + """ + + try: + # Execute the query + query_options = QueryOptions( + timeout=self._query_timeout, + ) + + result = self._cluster.query(query_str, query_options) + + top_k_nodes = [] + top_k_scores = [] + top_k_ids = [] + + # Process results + for row in result.rows(): + doc_id = row.get("id", "") + text = row.get(self._text_key, "") + score = row.get("distance", 0.0) + + # Extract metadata + metadata_dict = {} + if self._metadata_key in row: + metadata_dict = row[self._metadata_key] + + try: + node = metadata_dict_to_node(metadata_dict, text) + node.node_id = doc_id + except Exception: + # Fallback for backwards compatibility + node = TextNode( + text=text, + id_=doc_id, + score=score, + metadata=metadata_dict, + ) + + top_k_nodes.append(node) + top_k_scores.append(score) + top_k_ids.append(doc_id) + + return VectorStoreQueryResult( + nodes=top_k_nodes, similarities=top_k_scores, ids=top_k_ids + ) + + except Exception as e: + logger.error(f"Vector search failed: {e}") + raise ValueError(f"Vector search failed with error: {e}") class CouchbaseVectorStore(CouchbaseSearchVectorStore): diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_query_vector_store.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_query_vector_store.py new file mode 100644 index 0000000000..ccaf9848d9 --- /dev/null +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_query_vector_store.py @@ -0,0 +1,720 @@ +"""Test Couchbase Query Vector Store functionality using GSI.""" + +from __future__ import annotations +import os +import json +from typing import Any, List +from datetime import timedelta + +import pytest +import time + +from llama_index.core.schema import MetadataMode, TextNode, Document +from llama_index.embeddings.openai import OpenAIEmbedding +from llama_index.core.vector_stores.types import ( + VectorStoreQuery, + MetadataFilters, + MetadataFilter, + FilterOperator, +) +from llama_index.vector_stores.couchbase import CouchbaseQueryVectorStore +from llama_index.vector_stores.couchbase.base import QueryVectorSearchType +from llama_index.core.storage.storage_context import StorageContext +from llama_index.core import VectorStoreIndex + +from datetime import timedelta + +from couchbase.auth import PasswordAuthenticator +from couchbase.cluster import Cluster +from couchbase.options import ClusterOptions +from couchbase.logic.options import KnownConfigProfiles + +CONNECTION_STRING = os.getenv("COUCHBASE_CONNECTION_STRING", "") +BUCKET_NAME = os.getenv("COUCHBASE_BUCKET_NAME", "") +SCOPE_NAME = os.getenv("COUCHBASE_SCOPE_NAME", "") +COLLECTION_NAME = os.getenv("COUCHBASE_COLLECTION_NAME", "") +USERNAME = os.getenv("COUCHBASE_USERNAME", "") +PASSWORD = os.getenv("COUCHBASE_PASSWORD", "") +INDEX_NAME = os.getenv("COUCHBASE_INDEX_NAME", "test_vector_index") +SLEEP_DURATION = 5 # Increased for GSI indexing +EMBEDDING_DIMENSION = 1536 + + +def set_all_env_vars() -> bool: + """Check if all required environment variables are set.""" + return all( + [ + CONNECTION_STRING, + BUCKET_NAME, + SCOPE_NAME, + COLLECTION_NAME, + USERNAME, + PASSWORD, + ] + ) + + +def text_to_embedding(text: str) -> List[float]: + """Convert text to a unique embedding using ASCII values.""" + ascii_values = [float(ord(char)) for char in text] + # Pad or trim the list to make it of length EMBEDDING_DIMENSION + return ascii_values[:EMBEDDING_DIMENSION] + [0.0] * ( + EMBEDDING_DIMENSION - len(ascii_values) + ) + + +def get_cluster() -> Any: + """Get a couchbase cluster object.""" + auth = PasswordAuthenticator(USERNAME, PASSWORD) + options = ClusterOptions(authenticator=auth) + options.apply_profile(KnownConfigProfiles.WanDevelopment) + connect_string = CONNECTION_STRING + cluster = Cluster(connect_string, options) + + # Wait until the cluster is ready for use. + cluster.wait_until_ready(timedelta(seconds=5)) + + return cluster + + +@pytest.fixture() +def cluster() -> Cluster: + """Get a couchbase cluster object.""" + return get_cluster() + + +def delete_documents( + client: Any, bucket_name: str, scope_name: str, collection_name: str +) -> None: + """Delete all the documents in the collection.""" + query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`" + client.query(query).execute() + + +@pytest.fixture(scope="session") +def node_embeddings() -> list[TextNode]: + """Return a list of TextNodes with embeddings.""" + return [ + TextNode( + text="foo", + id_="12c70eed-5779-4008-aba0-596e003f6443", + metadata={ + "genre": "Mystery", + "pages": 10, + "rating": 4.5, + }, + embedding=text_to_embedding("foo"), + ), + TextNode( + text="bar", + id_="f7d81cb3-bb42-47e6-96f5-17db6860cd11", + metadata={ + "genre": "Comedy", + "pages": 5, + "rating": 3.2, + }, + embedding=text_to_embedding("bar"), + ), + TextNode( + text="baz", + id_="469e9537-7bc5-4669-9ff6-baa0ed086236", + metadata={ + "genre": "Thriller", + "pages": 20, + "rating": 4.8, + }, + embedding=text_to_embedding("baz"), + ), + ] + + +def create_scope_and_collection( + cluster: Cluster, bucket_name: str, scope_name: str, collection_name: str +) -> None: + """Create scope and collection if they don't exist.""" + try: + from couchbase.exceptions import ( + ScopeAlreadyExistsException, + CollectionAlreadyExistsException, + ) + + bucket = cluster.bucket(bucket_name) + + # Create scope if it doesn't exist + try: + bucket.collections().create_scope(scope_name=scope_name) + except ScopeAlreadyExistsException: + pass + + # Create collection if it doesn't exist + try: + bucket.collections().create_collection( + collection_name=collection_name, scope_name=scope_name + ) + except CollectionAlreadyExistsException: + pass + + except Exception as e: + # Log the error but don't fail - collection might already exist + pass + + +def create_vector_index( + cluster: Any, + bucket_name: str, + scope_name: str, + collection_name: str, + index_name: str, + embedding_key: str = "embedding", +) -> None: + """Create GSI vector index for the collection.""" + try: + from couchbase.options import QueryOptions + + bucket = cluster.bucket(bucket_name) + scope = bucket.scope(scope_name) + + # Check if index already exists + try: + query = f"SELECT name FROM system:indexes WHERE keyspace_id = '{collection_name}' AND name = '{index_name}'" + result = scope.query(query).execute() + if len(list(result.rows())) > 0: + return # Index already exists + except Exception: + pass + + # Index creation options + with_opts = json.dumps( + { + "dimension": EMBEDDING_DIMENSION, + "description": "IVF1024,PQ32x8", + "similarity": "cosine", + } + ) + + collection = scope.collection(collection_name) + + docs = {} + for i in range(2000): + docs[f"large_batch_{i}"] = { + "text": f"document_{i}", + "embedding": text_to_embedding(f"document_{i}"), + "metadata": { + "batch_id": "large", + "doc_num": i, + }, + } + + result = collection.insert_multi(docs) + if not result.all_ok: + raise Exception(f"Error inserting documents: {result.exceptions}") + + # Create vector index + create_index_query = f""" + CREATE INDEX {index_name} + ON `{bucket_name}`.`{scope_name}`.`{collection_name}` ({embedding_key} VECTOR) + USING GSI WITH {with_opts} + """ + result = scope.query( + create_index_query, QueryOptions(timeout=timedelta(seconds=300)) + ).execute() + time.sleep(15) + # raise Exception("Stop here") + + # Wait for index to be ready + + except Exception: + raise + + +def drop_vector_index( + cluster: Any, + bucket_name: str, + scope_name: str, + collection_name: str, + index_name: str, +) -> None: + """Drop the GSI vector index.""" + try: + from couchbase.options import QueryOptions + + bucket = cluster.bucket(bucket_name) + scope = bucket.scope(scope_name) + + drop_index_query = f"DROP INDEX `{index_name}` on `{bucket_name}`.`{scope_name}`.`{collection_name}`" + scope.query( + drop_index_query, QueryOptions(timeout=timedelta(seconds=60)) + ).execute() + + except Exception as e: + # Index might not exist or already dropped + pass + + +@pytest.mark.skipif( + not set_all_env_vars(), reason="missing Couchbase environment variables" +) +class TestCouchbaseQueryVectorStore: + @classmethod + def setup_class(cls) -> None: + """Set up test class with vector index creation.""" + cls.cluster = get_cluster() + + # Create scope and collection if they don't exist + create_scope_and_collection( + cls.cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME + ) + + # Create vector index for testing + create_vector_index( + cls.cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME, INDEX_NAME + ) + + @classmethod + def teardown_class(cls) -> None: + """Clean up after all tests.""" + try: + # Drop the vector index + drop_vector_index( + cls.cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME, INDEX_NAME + ) + delete_documents(cls.cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME) + except Exception: + pass + + def setup_method(self) -> None: + """Set up each test method.""" + # Delete all the documents in the collection + delete_documents(self.cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME) + self.vector_store = CouchbaseQueryVectorStore( + cluster=self.cluster, + bucket_name=BUCKET_NAME, + scope_name=SCOPE_NAME, + collection_name=COLLECTION_NAME, + ) + + def test_initialization_default_params(self) -> None: + """Test initialization with default parameters.""" + vector_store = CouchbaseQueryVectorStore( + cluster=self.cluster, + bucket_name=BUCKET_NAME, + scope_name=SCOPE_NAME, + collection_name=COLLECTION_NAME, + ) + + assert vector_store._search_type == QueryVectorSearchType.ANN + assert vector_store._dimension == 1536 + assert vector_store._similarity == "cosine" + assert vector_store._text_key == "text" + assert vector_store._embedding_key == "embedding" + assert vector_store._metadata_key == "metadata" + + def test_initialization_custom_params(self) -> None: + """Test initialization with custom parameters.""" + custom_timeout = timedelta(seconds=120) + vector_store = CouchbaseQueryVectorStore( + cluster=self.cluster, + bucket_name=BUCKET_NAME, + scope_name=SCOPE_NAME, + collection_name=COLLECTION_NAME, + search_type=QueryVectorSearchType.KNN, + dimension=768, + similarity="euclidean", + text_key="content", + embedding_key="vector", + metadata_key="meta", + query_timeout=custom_timeout, + ) + + assert vector_store._search_type == QueryVectorSearchType.KNN + assert vector_store._dimension == 768 + assert vector_store._similarity == "euclidean" + assert vector_store._text_key == "content" + assert vector_store._embedding_key == "vector" + assert vector_store._metadata_key == "meta" + assert vector_store._query_timeout == custom_timeout + + def test_initialization_with_string_search_type(self) -> None: + """Test initialization with string search type.""" + vector_store = CouchbaseQueryVectorStore( + cluster=self.cluster, + bucket_name=BUCKET_NAME, + scope_name=SCOPE_NAME, + collection_name=COLLECTION_NAME, + search_type="KNN", + ) + + assert vector_store._search_type == QueryVectorSearchType.KNN + + def test_add_documents(self, node_embeddings: List[TextNode]) -> None: + """Test adding documents to Couchbase query vector store.""" + input_doc_ids = [node_embedding.id_ for node_embedding in node_embeddings] + # Add nodes to the couchbase vector store + doc_ids = self.vector_store.add(node_embeddings) + + # Ensure that all nodes are returned & they are the same as input + assert len(doc_ids) == len(node_embeddings) + for doc_id in doc_ids: + assert doc_id in input_doc_ids + + def test_ann_search(self, node_embeddings: List[TextNode]) -> None: + """Test ANN vector search functionality.""" + # Add nodes to the couchbase vector store + self.vector_store.add(node_embeddings) + + # Wait for the documents to be indexed + time.sleep(SLEEP_DURATION) + + # ANN similarity search + q = VectorStoreQuery( + query_embedding=text_to_embedding("foo"), similarity_top_k=1 + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 1 + assert ( + result.nodes[0].get_content(metadata_mode=MetadataMode.NONE) + == node_embeddings[0].text + ) + assert result.similarities is not None + + def test_knn_search(self, node_embeddings: List[TextNode]) -> None: + """Test KNN vector search functionality.""" + # Create a KNN vector store + knn_vector_store = CouchbaseQueryVectorStore( + cluster=self.cluster, + bucket_name=BUCKET_NAME, + scope_name=SCOPE_NAME, + collection_name=COLLECTION_NAME, + search_type=QueryVectorSearchType.KNN, + ) + + # Add nodes to the couchbase vector store + knn_vector_store.add(node_embeddings) + + # Wait for the documents to be indexed + time.sleep(SLEEP_DURATION) + + # KNN similarity search + q = VectorStoreQuery( + query_embedding=text_to_embedding("foo"), similarity_top_k=1 + ) + + result = knn_vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 1 + assert ( + result.nodes[0].get_content(metadata_mode=MetadataMode.NONE) + == node_embeddings[0].text + ) + assert result.similarities is not None + + def test_search_with_filters(self, node_embeddings: List[TextNode]) -> None: + """Test vector search with metadata filters.""" + # Add nodes to the couchbase vector store + self.vector_store.add(node_embeddings) + + # Wait for the documents to be indexed + time.sleep(SLEEP_DURATION) + + # Test equality filter + q = VectorStoreQuery( + query_embedding=text_to_embedding("baz"), + similarity_top_k=3, + filters=MetadataFilters( + filters=[ + MetadataFilter( + key="genre", value="Thriller", operator=FilterOperator.EQ + ), + ] + ), + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 1 + assert result.nodes[0].metadata.get("genre") == "Thriller" + + def test_search_with_numeric_filters(self, node_embeddings: List[TextNode]) -> None: + """Test vector search with numeric metadata filters.""" + # Add nodes to the couchbase vector store + self.vector_store.add(node_embeddings) + + # Wait for the documents to be indexed + time.sleep(SLEEP_DURATION) + + # Test greater than filter + q = VectorStoreQuery( + query_embedding=text_to_embedding("baz"), + similarity_top_k=3, + filters=MetadataFilters( + filters=[ + MetadataFilter(key="pages", value=10, operator=FilterOperator.GT), + ] + ), + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 1 + assert result.nodes[0].metadata.get("pages") == 20 + + # Test less than or equal filter + q = VectorStoreQuery( + query_embedding=text_to_embedding("bar"), + similarity_top_k=3, + filters=MetadataFilters( + filters=[ + MetadataFilter(key="pages", value=10, operator=FilterOperator.LTE), + ] + ), + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 2 + for node in result.nodes: + assert node.metadata.get("pages") <= 10 + + def test_search_with_combined_filters( + self, node_embeddings: List[TextNode] + ) -> None: + """Test vector search with multiple combined filters.""" + # Add nodes to the couchbase vector store + self.vector_store.add(node_embeddings) + + # Wait for the documents to be indexed + time.sleep(SLEEP_DURATION) + + # Test combined filters with AND condition + q = VectorStoreQuery( + query_embedding=text_to_embedding("baz"), + similarity_top_k=3, + filters=MetadataFilters( + filters=[ + MetadataFilter( + key="genre", value="Thriller", operator=FilterOperator.EQ + ), + MetadataFilter(key="rating", value=4.0, operator=FilterOperator.GT), + ], + condition="and", + ), + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 1 + assert result.nodes[0].metadata.get("genre") == "Thriller" + assert result.nodes[0].metadata.get("rating") > 4.0 + + def test_delete_document(self) -> None: + """Test delete document from Couchbase query vector store.""" + storage_context = StorageContext.from_defaults(vector_store=self.vector_store) + + # Add a document to the vector store + VectorStoreIndex.from_documents( + [ + Document( + text="hello world", + metadata={"name": "John Doe", "age": 30, "city": "New York"}, + ), + ], + storage_context=storage_context, + ) + + # Wait for the documents to be indexed + time.sleep(SLEEP_DURATION) + + # Search for the document + search_embedding = OpenAIEmbedding().get_text_embedding("hello world") + q = VectorStoreQuery( + query_embedding=search_embedding, + similarity_top_k=1, + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 1 + + # Get the document ID to delete + ref_doc_id_to_delete = result.nodes[0].ref_doc_id + + # Delete the document + self.vector_store.delete(ref_doc_id=ref_doc_id_to_delete) + + # Wait for the deletion to be processed + time.sleep(SLEEP_DURATION) + + # Ensure that no results are returned + result = self.vector_store.query(q) + assert len(result.nodes) == 0 + + def test_empty_query_embedding_error(self) -> None: + """Test that empty query embedding raises ValueError.""" + q = VectorStoreQuery( + query_embedding=None, + similarity_top_k=1, + ) + + with pytest.raises(ValueError, match="Query embedding must not be empty"): + self.vector_store.query(q) + + def test_different_similarity_metrics( + self, node_embeddings: List[TextNode] + ) -> None: + """Test different similarity metrics.""" + similarity_metrics = ["cosine", "euclidean", "dot"] + + for metric in similarity_metrics: + # Create vector store with specific similarity metric + vector_store = CouchbaseQueryVectorStore( + cluster=self.cluster, + bucket_name=BUCKET_NAME, + scope_name=SCOPE_NAME, + collection_name=COLLECTION_NAME, + similarity=metric, + ) + + # Add nodes to the vector store + vector_store.add(node_embeddings) + + # Wait for indexing + time.sleep(SLEEP_DURATION) + + # Test search + q = VectorStoreQuery( + query_embedding=text_to_embedding("foo"), + similarity_top_k=1, + ) + + result = vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 1 + assert result.similarities is not None + + def test_custom_field_names(self) -> None: + """Test vector store with custom field names.""" + custom_vector_store = CouchbaseQueryVectorStore( + cluster=self.cluster, + bucket_name=BUCKET_NAME, + scope_name=SCOPE_NAME, + collection_name=COLLECTION_NAME, + text_key="content", + embedding_key="vector", + metadata_key="meta", + ) + + # Create a test node with custom field mapping + test_node = TextNode( + text="custom field test", + id_="custom-test-id", + metadata={"category": "test"}, + embedding=text_to_embedding("custom field test"), + ) + + # Add the node + doc_ids = custom_vector_store.add([test_node]) + assert len(doc_ids) == 1 + + # Wait for indexing + time.sleep(SLEEP_DURATION) + + # Search for the document + q = VectorStoreQuery( + query_embedding=text_to_embedding("custom field test"), + similarity_top_k=1, + ) + + result = custom_vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 1 + assert ( + result.nodes[0].get_content(metadata_mode=MetadataMode.NONE) + == "custom field test" + ) + + def test_batch_insert(self, node_embeddings: List[TextNode]) -> None: + """Test batch insert with custom batch size.""" + # Test with small batch size + doc_ids = self.vector_store.add(node_embeddings, batch_size=2) + assert len(doc_ids) == len(node_embeddings) + + # Wait for indexing + time.sleep(SLEEP_DURATION) + + # Verify all documents are searchable + q = VectorStoreQuery( + query_embedding=text_to_embedding("foo"), + similarity_top_k=3, + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 3 + + def test_vector_index_utilization(self, node_embeddings: List[TextNode]) -> None: + """Test that vector search actually utilizes the GSI vector index.""" + # Add nodes to the vector store + self.vector_store.add(node_embeddings) + + # Wait for GSI indexing + time.sleep(SLEEP_DURATION) + + # Test that we can perform vector search (this implicitly tests index usage) + q = VectorStoreQuery( + query_embedding=text_to_embedding("foo"), + similarity_top_k=2, + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 2 + assert result.similarities is not None + assert len(result.similarities) == 2 + + # Verify scores are meaningful (should be positive distances) + for score in result.similarities: + assert score >= 0 + + def test_vector_search_relevance(self, node_embeddings: List[TextNode]) -> None: + """Test that vector search returns relevant results.""" + # Add nodes to the vector store + self.vector_store.add(node_embeddings) + + # Wait for GSI indexing + time.sleep(SLEEP_DURATION) + + # Search for "foo" - should return "foo" document with best score + q = VectorStoreQuery( + query_embedding=text_to_embedding("foo"), + similarity_top_k=3, + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 3 + + # The first result should be the most similar (lowest distance for cosine) + assert result.nodes[0].get_content(metadata_mode=MetadataMode.NONE) == "foo" + + # Verify scores are ordered (ascending for distance-based similarity) + scores = result.similarities + assert scores[0] >= scores[1] >= scores[2] + + def test_large_batch_processing(self) -> None: + """Test handling of larger document batches.""" + # Create a larger batch of documents + large_batch = [] + for i in range(2000): + node = TextNode( + text=f"document_{i}", + id_=f"large_batch_{i}", + metadata={"batch_id": "large", "doc_num": i}, + embedding=text_to_embedding(f"document_{i}"), + ) + large_batch.append(node) + + # Add the large batch + doc_ids = self.vector_store.add(large_batch, batch_size=10) + assert len(doc_ids) == len(large_batch) + + # Wait for indexing + time.sleep(SLEEP_DURATION * 2) # Extra time for larger batch + + # Test search works with larger dataset + q = VectorStoreQuery( + query_embedding=text_to_embedding("document_25"), + similarity_top_k=5, + ) + + result = self.vector_store.query(q) + assert result.nodes is not None and len(result.nodes) == 5 diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_document_store_integration.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_document_store_integration.py new file mode 100644 index 0000000000..4d9a4343c9 --- /dev/null +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_document_store_integration.py @@ -0,0 +1,348 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 +import pytest +from datetime import datetime +from typing import List + +from haystack.testing.document_store import DocumentStoreBaseTests +from haystack.dataclasses import Document, ByteStream +from haystack.document_stores.types import DuplicatePolicy +from haystack.utils.auth import Secret +from couchbase_haystack import ( + CouchbaseQueryDocumentStore, + QueryVectorSearchType, + QueryVectorSearchFunctionParams, +) +from couchbase_haystack.document_stores.auth import CouchbasePasswordAuthenticator +from couchbase_haystack.document_stores.cluster_options import CouchbaseClusterOptions +from couchbase.options import KnownConfigProfiles +from couchbase.exceptions import ( + ScopeAlreadyExistsException, + CollectionAlreadyExistsException, +) +from couchbase.options import QueryOptions +from datetime import timedelta +from sentence_transformers import SentenceTransformer +import time +import json +from pandas import DataFrame +from uuid import uuid1 + +model = SentenceTransformer("all-MiniLM-L6-v2") + +# Test configuration +TEST_BUCKET = "test_bucket" +TEST_SCOPE = "test_scope" +TEST_COLLECTION = "test_collection" +TEST_INDEX = "test_vector_index" +VECTOR_DIMENSION = 384 + + +class TestGSIDocumentStoreIntegration(DocumentStoreBaseTests): + @pytest.fixture(scope="class") + def sample_init_documents(self) -> List[Document]: + """Create sample documents for testing.""" + return [ + Document( + id=f"doc_init_{i}", + content=f"Test document {i}", + meta={ + "field1": f"value{i}", + "field2": i, + "created_at": datetime.now().isoformat(), + }, + embedding=[0.001 * i] * VECTOR_DIMENSION, + ) + for i in range(2048) + ] + + @pytest.fixture + def sample_documents(self) -> List[Document]: + """Create sample documents for testing.""" + return [ + Document( + id=f"doc_{i}", + content=f"Test document {i}", + meta={ + "field1": f"value{i}", + "field2": i, + "created_at": datetime.now().isoformat(), + }, + embedding=[0.001 * i] * VECTOR_DIMENSION, + ) + for i in range(1024) + ] + + @pytest.fixture(scope="class") + def document_store_with_index_creation(self, sample_init_documents): + # Create authenticator + authenticator = CouchbasePasswordAuthenticator( + username=Secret.from_env_var("USER_NAME"), + password=Secret.from_env_var("PASSWORD"), + ) + + # Create cluster options + cluster_options = CouchbaseClusterOptions( + protocol=KnownConfigProfiles.WanDevelopment + ) + + # Create document store + store = CouchbaseQueryDocumentStore( + cluster_connection_string=Secret.from_env_var("CONNECTION_STRING"), + authenticator=authenticator, + cluster_options=cluster_options, + bucket=TEST_BUCKET, + scope=TEST_SCOPE, + collection=TEST_COLLECTION, + index_name=TEST_INDEX, + query_vector_search_params=QueryVectorSearchFunctionParams( + search_type=QueryVectorSearchType.ANN, + dimension=VECTOR_DIMENSION, + similarity="L2", + ), + vector_field="embedding", + ) + + # Create scope if it doesn't exist + try: + store.bucket.collections().create_scope(scope_name=TEST_SCOPE) + except ScopeAlreadyExistsException: + pass + + # Create collection if it doesn't exist + try: + store.bucket.collections().create_collection( + collection_name=TEST_COLLECTION, scope_name=TEST_SCOPE + ) + except CollectionAlreadyExistsException: + pass + + # Write initial documents + store.write_documents(sample_init_documents, policy=DuplicatePolicy.OVERWRITE) + + with_opts = json.dumps( + { + "dimension": VECTOR_DIMENSION, + "description": "IVF1024,PQ32x8", + "similarity": "L2", + } + ) + # Create index before tests + result = store.scope.query( + f""" + CREATE INDEX {TEST_INDEX} + ON {TEST_BUCKET}.{TEST_SCOPE}.{TEST_COLLECTION} ({store.vector_field} VECTOR) + USING GSI WITH {with_opts} + """, + QueryOptions(timeout=timedelta(seconds=300)), + ).execute() + print(result) + # time.sleep(60) + + store.delete_documents([doc.id for doc in store.filter_documents()]) + + yield store + store.bucket.collections().drop_collection( + collection_name=TEST_COLLECTION, scope_name=TEST_SCOPE + ) + # Cleanup after tests + store.bucket.close() + + @pytest.fixture() + def document_store(self, document_store_with_index_creation): + yield document_store_with_index_creation + document_store_with_index_creation.delete_documents( + [doc.id for doc in document_store_with_index_creation.filter_documents()] + ) + + def assert_documents_are_equal( + self, received: List[Document], expected: List[Document] + ): + print(received, expected) + for r in received: + r.score = None + r.embedding = None + received_dict = {doc.id: doc for doc in received} + received = [] + for doc in expected: + received.append(received_dict.get(doc.id)) + doc.embedding = None + print("================") + print(received, expected) + print(len(received), len(expected)) + # print([doc.to_dict(flatten=False) if doc else doc for doc in received]) + # print([doc.to_dict(flatten=False) for doc in expected]) + super().assert_documents_are_equal(received, expected) + + def test_write_documents_duplicate_skip(self, document_store): + pass + + def test_no_filters(self, document_store: CouchbaseQueryDocumentStore): + """Test filter_documents() with empty filters""" + self.assert_documents_are_equal(document_store.filter_documents(), []) + self.assert_documents_are_equal(document_store.filter_documents(filters={}), []) + docs = [Document(content="test doc")] + document_store.write_documents(docs) + self.assert_documents_are_equal(document_store.filter_documents(), docs) + self.assert_documents_are_equal( + document_store.filter_documents(filters={}), docs + ) + + def test_write_documents(self, document_store: CouchbaseQueryDocumentStore): + documents = [ + Document(id=uuid1().hex, content="Haystack is an amazing tool for search."), + Document( + id=uuid1().hex, + content="We are using pre-trained models to generate embeddings.", + ), + Document(id=uuid1().hex, content="The weather is sunny today."), + ] + for doc in documents: + embedding = model.encode(doc.content).tolist() + doc.embedding = embedding + + assert document_store.write_documents(documents) == 3 + retrieved_docs = document_store.filter_documents() + assert len(retrieved_docs) == 3 + retrieved_docs.sort(key=lambda x: x.id) + self.assert_documents_are_equal(retrieved_docs, documents) + + def test_write_blob(self, document_store: CouchbaseQueryDocumentStore): + bytestream = ByteStream( + b"test", meta={"meta_key": "meta_value"}, mime_type="mime_type" + ) + documents = [Document(blob=bytestream)] + for doc in documents: + # Assuming blob_content is in bytes, decode it to string if necessary + embedding = model.encode(bytestream.data.decode("utf-8")).tolist() + doc.embedding = embedding + assert document_store.write_documents(documents) == 1 + retrieved_docs = document_store.filter_documents() + time.sleep(30) + self.assert_documents_are_equal(retrieved_docs, documents) + + def test_write_dataframe(self, document_store: CouchbaseQueryDocumentStore): + dataframe = DataFrame({"col1": [1, 2], "col2": [3, 4]}) + docs = [Document(dataframe=dataframe)] + document_store.write_documents(docs) + retrieved_docs = document_store.filter_documents() + self.assert_documents_are_equal(retrieved_docs, docs) + + def test_comparison_in1( + self, document_store: CouchbaseQueryDocumentStore, filterable_docs + ): + """Test filter_documents() with 'in' comparator""" + document_store.write_documents(filterable_docs) + # time.sleep(2000) + result = document_store.filter_documents( + {"field": "meta.number", "operator": "in", "value": [10, -10]} + ) + assert len(result) + expected = [ + d + for d in filterable_docs + if d.meta.get("number") is not None and d.meta["number"] in [10, -10] + ] + self.assert_documents_are_equal(result, expected) + + def test_complex_filter(self, document_store, filterable_docs): + document_store.write_documents(filterable_docs) + filters = { + "operator": "OR", + "conditions": [ + { + "operator": "AND", + "conditions": [ + {"field": "meta.number", "operator": "==", "value": 100}, + {"field": "meta.chapter", "operator": "==", "value": "intro"}, + ], + }, + { + "operator": "AND", + "conditions": [ + {"field": "meta.page", "operator": "==", "value": "90"}, + { + "field": "meta.chapter", + "operator": "==", + "value": "conclusion", + }, + ], + }, + ], + } + + result = document_store.filter_documents(filters=filters) + + self.assert_documents_are_equal( + result, + [ + d + for d in filterable_docs + if (d.meta.get("number") == 100 and d.meta.get("chapter") == "intro") + or ( + d.meta.get("page") == "90" and d.meta.get("chapter") == "conclusion" + ) + ], + ) + + def test_duplicate_document_handling(self, document_store, sample_documents): + """Test handling of duplicate documents.""" + # Write documents first time + document_store.write_documents(sample_documents) + + # Try to write same documents again with FAIL policy + with pytest.raises(Exception): + document_store.write_documents( + sample_documents, policy=DuplicatePolicy.FAIL + ) + + # Write with OVERWRITE policy + document_store.write_documents( + sample_documents, policy=DuplicatePolicy.OVERWRITE + ) + + # Verify document count hasn't changed + documents = document_store.filter_documents() + assert len(documents) == len(sample_documents) + + def test_vector_search( + self, document_store: CouchbaseQueryDocumentStore, sample_documents + ): + """Test vector search functionality.""" + # Write documents + document_store.write_documents(sample_documents) + + # Create a query embedding + query_embedding = [0.1] * VECTOR_DIMENSION + + # Perform vector search + results = document_store.vector_search(query_embedding, top_k=3) + + # Verify results + assert len(results) == 3 + assert all(hasattr(doc, "score") for doc in results) + print(results) + assert all(doc.score is not None for doc in results) + + # TODO: ADD logic to check if the results are correct + + def test_vector_search_with_filters(self, document_store, sample_documents): + """Test vector search with filters.""" + # Write documents + document_store.write_documents(sample_documents) + + # Create a query embedding + query_embedding = [0.1] * VECTOR_DIMENSION + + # Define filters + filters = {"field": "field2", "operator": ">", "value": 2} + + # Perform vector search with filters + results = document_store.vector_search( + query_embedding, top_k=3, filters=filters + ) + + # Verify results + assert len(results) <= 3 + assert all(doc.meta["field2"] > 2 for doc in results) From 1a68858cc5ee94b1a30dd1172b3bfa837a75eed4 Mon Sep 17 00:00:00 2001 From: Shyam Venkat Date: Tue, 23 Sep 2025 12:00:29 +0530 Subject: [PATCH 2/6] made search type and similarity required for GSI --- .../llama_index/vector_stores/couchbase/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py index 71cef32733..15e6c4bf77 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py @@ -693,8 +693,8 @@ def __init__( bucket_name: str, scope_name: str, collection_name: str, - search_type: Union[QueryVectorSearchType, str] = QueryVectorSearchType.ANN, - similarity: str = "cosine", + search_type: Union[QueryVectorSearchType, str], + similarity: str, nprobes: Optional[int] = None, text_key: Optional[str] = "text", embedding_key: Optional[str] = "embedding", From 8af94ff28dc058fd3368c24492dbaba11b81c3f0 Mon Sep 17 00:00:00 2001 From: Shyam Venkat Date: Fri, 26 Sep 2025 09:53:30 +0530 Subject: [PATCH 3/6] expose bucket, scope and collection as properties --- .../vector_stores/couchbase/base.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py index 15e6c4bf77..a8ee868ebf 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py @@ -402,6 +402,27 @@ def client(self) -> Any: """ return self._cluster + @property + def bucket(self) -> Any: + """ + Property function to access the bucket attribute. + """ + return self._bucket + + @property + def scope(self) -> Any: + """ + Property function to access the scope attribute. + """ + return self._scope + + @property + def collection(self) -> Any: + """ + Property function to access the collection attribute. + """ + return self._collection + def _check_bucket_exists(self) -> bool: """ Check if the bucket exists in the linked Couchbase cluster. From c11ed6a5462634adb4c85d1261b6c369cee0d088 Mon Sep 17 00:00:00 2001 From: Shyam Venkat Date: Thu, 16 Oct 2025 11:59:02 +0530 Subject: [PATCH 4/6] docs update --- .../README.md | 107 +++++++++++++++++- .../pyproject.toml | 2 +- 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/README.md b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/README.md index b98d2ec99c..3e339ca500 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/README.md +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/README.md @@ -1,3 +1,108 @@ -# LlamaIndex Vector_Stores Integration: Couchbase +# LlamaIndex Vector Stores Integration: Couchbase + +This package provides Couchbase vector store integrations for LlamaIndex, offering multiple implementation options for vector similarity search. + +## Installation + +```bash +pip install llama-index-vector-stores-couchbase +``` + +## Available Vector Store Classes + +### CouchbaseSearchVectorStore + +Uses Couchbase Full-Text Search (FTS) with vector search capabilities. + +### CouchbaseQueryVectorStore (Recommended) + +Uses Couchbase Global Secondary Index (GSI) with BHIVE vector search support for high-performance ANN operations. + +### CouchbaseVectorStore (Deprecated) > **Note:** `CouchbaseVectorStore` has been deprecated in version 0.4.0. Please use `CouchbaseSearchVectorStore` instead. + +## Requirements + +- Python >= 3.9, < 4.0 +- Couchbase Server with vector search capabilities +- couchbase >= 4.2.0, < 5 + +## Basic Usage + +### Using CouchbaseSearchVectorStore (FTS-based) + +```python +from llama_index.vector_stores.couchbase import CouchbaseSearchVectorStore +from couchbase.cluster import Cluster +from couchbase.auth import PasswordAuthenticator + +# Connect to Couchbase +auth = PasswordAuthenticator("username", "password") +cluster = Cluster("couchbase://localhost", auth) + +# Initialize vector store +vector_store = CouchbaseSearchVectorStore( + cluster=cluster, + bucket_name="my_bucket", + scope_name="my_scope", + collection_name="my_collection", + index_name="my_vector_index", + text_key="text", + embedding_key="embedding", + metadata_key="metadata", + scoped_index=True, +) +``` + +### Using CouchbaseQueryVectorStore (GSI-based) + +```python +from llama_index.vector_stores.couchbase import ( + CouchbaseQueryVectorStore, + QueryVectorSearchType, +) + +# Initialize GSI-based vector store +vector_store = CouchbaseQueryVectorStore( + cluster=cluster, + bucket_name="my_bucket", + scope_name="my_scope", + collection_name="my_collection", + search_type=QueryVectorSearchType.ANN, # or QueryVectorSearchType.KNN + similarity="cosine", # or "euclidean", "dot_product" + nprobes=10, # Optional: number of probes for ANN search + text_key="text", + embedding_key="embedding", + metadata_key="metadata", +) +``` + +## Features + +- **Multiple Search Types**: Support for both GSI-based and FTS vector search +- **Flexible Similarity Metrics**: Cosine, Euclidean, and dot product similarities +- **Metadata Filtering**: Advanced filtering capabilities using LlamaIndex MetadataFilters +- **Batch Operations**: Efficient batch insertion with configurable batch sizes +- **High Performance**: BHIVE index support for approximate nearest neighbor (ANN) search +- **Scoped Indexes**: Support for both scoped and global search indexes in FTS-based vector search + +## Search Types + +### ANN (Approximate Nearest Neighbor) + +- Uses BHIVE indexes for high-performance approximate search +- Configurable nprobes parameter for accuracy/speed tradeoff +- Recommended for large-scale deployments + +### KNN (K-Nearest Neighbor) + +- Exact nearest neighbor search +- Higher accuracy but potentially slower for large datasets +- Good for smaller datasets or when exact results are required + +For more information, refer to: [Couchbase Vector Search Concepts](https://preview.docs-test.couchbase.com/docs-server-DOC-12565_vector_search_concepts/server/current/vector-index/use-vector-indexes.html) + +## License + +MIT diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/pyproject.toml b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/pyproject.toml index 1cb636f296..646d8531ae 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/pyproject.toml +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/pyproject.toml @@ -26,7 +26,7 @@ dev = [ [project] name = "llama-index-vector-stores-couchbase" -version = "0.4.0" +version = "0.5.0" description = "llama-index vector_stores couchbase integration" authors = [{name = "Couchbase", email = "devadvocates@couchbase.com"}] requires-python = ">=3.9,<4.0" From 204a7748b63bd038c8673c9958a2ffaf3fe1ee59 Mon Sep 17 00:00:00 2001 From: Shyam Venkat Date: Wed, 29 Oct 2025 11:31:15 +0530 Subject: [PATCH 5/6] test case update for base file changes --- .../vector_stores/couchbase/base.py | 76 ++-- .../test_couchbase_query_vector_store.py | 54 ++- ...=> test_couchbase_search_vector_stores.py} | 209 ++++++++--- .../tests/test_document_store_integration.py | 348 ------------------ .../tests/vector_index.json | 89 +++++ 5 files changed, 337 insertions(+), 439 deletions(-) rename llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/{test_vector_stores_couchbase.py => test_couchbase_search_vector_stores.py} (59%) delete mode 100644 llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_document_store_integration.py create mode 100644 llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/vector_index.json diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py index a8ee868ebf..3bcd6b556f 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py @@ -18,6 +18,10 @@ VectorStoreQuery, VectorStoreQueryResult, ) +from couchbase.collection import Collection +from couchbase.scope import Scope +from couchbase.bucket import Bucket +from couchbase.cluster import Cluster import couchbase.search as search from couchbase.options import SearchOptions, QueryOptions from couchbase.vector_search import VectorQuery, VectorSearch @@ -37,6 +41,17 @@ class QueryVectorSearchType(str, Enum): KNN = "KNN" +class QueryVectorSearchSimilarity(str, Enum): + """Enum for similarity metrics supported by Couchbase GSI.""" + + COSINE = "COSINE" + DOT = "DOT" + L2 = "L2" + EUCLIDEAN = "EUCLIDEAN" + L2_SQUARED = "L2_SQUARED" + EUCLIDEAN_SQUARED = "EUCLIDEAN_SQUARED" + + def _transform_couchbase_filter_condition(condition: str) -> str: """ Convert standard metadata filter condition to Couchbase specific condition. @@ -220,16 +235,17 @@ class CouchbaseVectorStoreBase(BasePydanticVectorStore): # Default batch size DEFAULT_BATCH_SIZE: int = 100 - _cluster: Any = PrivateAttr() - _bucket: Any = PrivateAttr() - _scope: Any = PrivateAttr() - _collection: Any = PrivateAttr() + _cluster: Cluster = PrivateAttr() + _bucket: Bucket = PrivateAttr() + _scope: Scope = PrivateAttr() + _collection: Collection = PrivateAttr() _bucket_name: str = PrivateAttr() _scope_name: str = PrivateAttr() _collection_name: str = PrivateAttr() _text_key: str = PrivateAttr() _embedding_key: str = PrivateAttr() _metadata_key: str = PrivateAttr() + _query_options: QueryOptions = PrivateAttr() def __init__( self, @@ -240,6 +256,7 @@ def __init__( text_key: Optional[str] = "text", embedding_key: Optional[str] = "embedding", metadata_key: Optional[str] = "metadata", + query_options: Optional[QueryOptions] = None, ) -> None: """ Base initialization for Couchbase Vector Stores. @@ -255,6 +272,8 @@ def __init__( Defaults to "embedding". metadata_key (Optional[str], optional): The field for the document metadata. Defaults to "metadata". + query_options (Optional[QueryOptions]): Query options for SQL++ queries. + Defaults to None. Returns: None @@ -292,7 +311,7 @@ def __init__( self._text_key = text_key self._embedding_key = embedding_key self._metadata_key = metadata_key - + self._query_options = query_options # Check if the bucket exists if not self._check_bucket_exists(): raise ValueError( @@ -387,9 +406,13 @@ def delete(self, ref_doc_id: str, **kwargs: Any) -> None: """ try: - document_field = self._metadata_key + ".ref_doc_id" + document_field = f"`{self._metadata_key}`.`ref_doc_id`" query = f"DELETE FROM `{self._collection_name}` WHERE {document_field} = $ref_doc_id" - self._scope.query(query, ref_doc_id=ref_doc_id).execute() + query_options = ( + self._query_options.copy() if self._query_options else QueryOptions() + ) + query_options["named_parameters"] = {"ref_doc_id": ref_doc_id} + self._scope.query(query, query_options).execute() logger.debug(f"Deleted document {ref_doc_id}") except Exception: logger.error(f"Error deleting document {ref_doc_id}") @@ -518,6 +541,7 @@ def __init__( embedding_key: Optional[str] = "embedding", metadata_key: Optional[str] = "metadata", scoped_index: bool = True, + query_options: Optional[QueryOptions] = None, ) -> None: """ Initializes a connection to a Couchbase Vector Store using FTS. @@ -536,6 +560,8 @@ def __init__( Defaults to "metadata". scoped_index (Optional[bool]): specify whether the index is a scoped index. Set to True by default. + query_options (Optional[QueryOptions]): Query options for SQL++ queries. + Defaults to None. Returns: None @@ -549,6 +575,7 @@ def __init__( text_key=text_key, embedding_key=embedding_key, metadata_key=metadata_key, + query_options=query_options, ) if not index_name: @@ -715,12 +742,12 @@ def __init__( scope_name: str, collection_name: str, search_type: Union[QueryVectorSearchType, str], - similarity: str, + similarity: Union[QueryVectorSearchSimilarity, str], nprobes: Optional[int] = None, text_key: Optional[str] = "text", embedding_key: Optional[str] = "embedding", metadata_key: Optional[str] = "metadata", - query_timeout: Optional[timedelta] = None, + query_options: Optional[QueryOptions] = None, ) -> None: """ Initializes a connection to a Couchbase Vector Store using GSI. @@ -742,7 +769,7 @@ def __init__( Defaults to "embedding". metadata_key (Optional[str], optional): The field for the document metadata. Defaults to "metadata". - query_timeout (Optional[timedelta]): Timeout for SQL++ queries. + query_options (Optional[QueryOptions]): Query options for SQL++ queries. Defaults to 60 seconds. Returns: @@ -757,14 +784,22 @@ def __init__( text_key=text_key, embedding_key=embedding_key, metadata_key=metadata_key, + query_options=query_options, ) if isinstance(search_type, str): search_type = QueryVectorSearchType(search_type) self._search_type = search_type - self._similarity = similarity - self._query_timeout = query_timeout or timedelta(seconds=60) + self._similarity = ( + similarity.upper() + if isinstance(similarity, str) + else ( + similarity.value + if isinstance(similarity, QueryVectorSearchSimilarity) + else None + ) + ) self._nprobes = nprobes def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult: @@ -784,7 +819,7 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul k = query.similarity_top_k query_context = ( - f"{self._bucket_name}.{self._scope_name}.{self._collection_name}" + f"`{self._bucket_name}`.`{self._scope_name}`.`{self._collection_name}`" ) # Convert embedding to string representation for query @@ -806,7 +841,7 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul if query.output_fields: fields = query.output_fields.join(",") else: - fields = "*, meta().id as id" + fields = "d.*, meta().id as id" nprobes = self._nprobes if kwargs.get("nprobes"): @@ -821,20 +856,16 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul # Build the SQL++ query query_str = f""" - SELECT {fields}, {distance_function_exp} as distance + SELECT {fields}, {distance_function_exp} as score FROM {query_context} d {where_clause} - ORDER BY distance + ORDER BY score LIMIT {k} """ try: # Execute the query - query_options = QueryOptions( - timeout=self._query_timeout, - ) - - result = self._cluster.query(query_str, query_options) + result = self._cluster.query(query_str, self._query_options) top_k_nodes = [] top_k_scores = [] @@ -844,13 +875,12 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul for row in result.rows(): doc_id = row.get("id", "") text = row.get(self._text_key, "") - score = row.get("distance", 0.0) + score = row.get("score") # Extract metadata metadata_dict = {} if self._metadata_key in row: metadata_dict = row[self._metadata_key] - try: node = metadata_dict_to_node(metadata_dict, text) node.node_id = doc_id diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_query_vector_store.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_query_vector_store.py index ccaf9848d9..4cc3426e15 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_query_vector_store.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_query_vector_store.py @@ -19,6 +19,7 @@ ) from llama_index.vector_stores.couchbase import CouchbaseQueryVectorStore from llama_index.vector_stores.couchbase.base import QueryVectorSearchType +from llama_index.vector_stores.couchbase.base import QueryVectorSearchSimilarity from llama_index.core.storage.storage_context import StorageContext from llama_index.core import VectorStoreIndex @@ -28,6 +29,7 @@ from couchbase.cluster import Cluster from couchbase.options import ClusterOptions from couchbase.logic.options import KnownConfigProfiles +from couchbase.options import QueryOptions CONNECTION_STRING = os.getenv("COUCHBASE_CONNECTION_STRING", "") BUCKET_NAME = os.getenv("COUCHBASE_BUCKET_NAME", "") @@ -136,6 +138,7 @@ def create_scope_and_collection( from couchbase.exceptions import ( ScopeAlreadyExistsException, CollectionAlreadyExistsException, + QueryIndexAlreadyExistsException, ) bucket = cluster.bucket(bucket_name) @@ -154,6 +157,13 @@ def create_scope_and_collection( except CollectionAlreadyExistsException: pass + try: + bucket.scope(scope_name).collection( + collection_name + ).query_indexes().create_primary_index() + except QueryIndexAlreadyExistsException: + pass + except Exception as e: # Log the error but don't fail - collection might already exist pass @@ -291,6 +301,9 @@ def setup_method(self) -> None: bucket_name=BUCKET_NAME, scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, + search_type=QueryVectorSearchType.ANN, + similarity=QueryVectorSearchSimilarity.DOT, + nprobes=50, ) def test_initialization_default_params(self) -> None: @@ -300,11 +313,14 @@ def test_initialization_default_params(self) -> None: bucket_name=BUCKET_NAME, scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, + search_type=QueryVectorSearchType.ANN, + similarity=QueryVectorSearchSimilarity.COSINE, + nprobes=50, ) assert vector_store._search_type == QueryVectorSearchType.ANN - assert vector_store._dimension == 1536 - assert vector_store._similarity == "cosine" + assert vector_store._similarity == QueryVectorSearchSimilarity.COSINE + assert vector_store._nprobes == 50 assert vector_store._text_key == "text" assert vector_store._embedding_key == "embedding" assert vector_store._metadata_key == "metadata" @@ -318,21 +334,19 @@ def test_initialization_custom_params(self) -> None: scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, search_type=QueryVectorSearchType.KNN, - dimension=768, similarity="euclidean", text_key="content", embedding_key="vector", metadata_key="meta", - query_timeout=custom_timeout, + query_options=QueryOptions(timeout=custom_timeout), ) assert vector_store._search_type == QueryVectorSearchType.KNN - assert vector_store._dimension == 768 - assert vector_store._similarity == "euclidean" + assert vector_store._similarity == QueryVectorSearchSimilarity.EUCLIDEAN assert vector_store._text_key == "content" assert vector_store._embedding_key == "vector" assert vector_store._metadata_key == "meta" - assert vector_store._query_timeout == custom_timeout + assert vector_store._query_options["timeout"] == custom_timeout def test_initialization_with_string_search_type(self) -> None: """Test initialization with string search type.""" @@ -342,9 +356,12 @@ def test_initialization_with_string_search_type(self) -> None: scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, search_type="KNN", + similarity="EUCLIDEAN", ) assert vector_store._search_type == QueryVectorSearchType.KNN + assert vector_store._similarity == QueryVectorSearchSimilarity.EUCLIDEAN + assert vector_store._nprobes is None def test_add_documents(self, node_embeddings: List[TextNode]) -> None: """Test adding documents to Couchbase query vector store.""" @@ -387,6 +404,8 @@ def test_knn_search(self, node_embeddings: List[TextNode]) -> None: scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, search_type=QueryVectorSearchType.KNN, + similarity=QueryVectorSearchSimilarity.L2, + nprobes=50, ) # Add nodes to the couchbase vector store @@ -557,7 +576,11 @@ def test_different_similarity_metrics( self, node_embeddings: List[TextNode] ) -> None: """Test different similarity metrics.""" - similarity_metrics = ["cosine", "euclidean", "dot"] + similarity_metrics = [ + QueryVectorSearchSimilarity.COSINE, + QueryVectorSearchSimilarity.EUCLIDEAN, + QueryVectorSearchSimilarity.DOT, + ] for metric in similarity_metrics: # Create vector store with specific similarity metric @@ -567,6 +590,8 @@ def test_different_similarity_metrics( scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, similarity=metric, + search_type=QueryVectorSearchType.ANN, + nprobes=50, ) # Add nodes to the vector store @@ -592,6 +617,9 @@ def test_custom_field_names(self) -> None: bucket_name=BUCKET_NAME, scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, + search_type=QueryVectorSearchType.ANN, + similarity=QueryVectorSearchSimilarity.COSINE, + nprobes=50, text_key="content", embedding_key="vector", metadata_key="meta", @@ -662,10 +690,6 @@ def test_vector_index_utilization(self, node_embeddings: List[TextNode]) -> None assert result.similarities is not None assert len(result.similarities) == 2 - # Verify scores are meaningful (should be positive distances) - for score in result.similarities: - assert score >= 0 - def test_vector_search_relevance(self, node_embeddings: List[TextNode]) -> None: """Test that vector search returns relevant results.""" # Add nodes to the vector store @@ -683,12 +707,14 @@ def test_vector_search_relevance(self, node_embeddings: List[TextNode]) -> None: result = self.vector_store.query(q) assert result.nodes is not None and len(result.nodes) == 3 - # The first result should be the most similar (lowest distance for cosine) + # The first result should be the most similar (lowest distance for dot product) assert result.nodes[0].get_content(metadata_mode=MetadataMode.NONE) == "foo" # Verify scores are ordered (ascending for distance-based similarity) scores = result.similarities - assert scores[0] >= scores[1] >= scores[2] + print(f"scores: {scores}") + assert scores[0] <= scores[1] + assert scores[1] <= scores[2] def test_large_batch_processing(self) -> None: """Test handling of larger document batches.""" diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_vector_stores_couchbase.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_search_vector_stores.py similarity index 59% rename from llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_vector_stores_couchbase.py rename to llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_search_vector_stores.py index 736e8ff766..80125b5997 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_vector_stores_couchbase.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_couchbase_search_vector_stores.py @@ -6,6 +6,7 @@ import pytest import time +import json from llama_index.core.schema import MetadataMode, TextNode, Document from llama_index.embeddings.openai import OpenAIEmbedding @@ -20,6 +21,9 @@ ) from llama_index.core.storage.storage_context import StorageContext from llama_index.core import VectorStoreIndex +from couchbase.cluster import Cluster +from couchbase.management.logic.search_index_logic import SearchIndex +from couchbase.exceptions import SearchIndexNotFoundException CONNECTION_STRING = os.getenv("COUCHBASE_CONNECTION_STRING", "") @@ -57,7 +61,98 @@ def text_to_embedding(text: str) -> List[float]: ) -def get_cluster() -> Any: +def create_scope_and_collection( + cluster: Cluster, bucket_name: str, scope_name: str, collection_name: str +) -> None: + """Create scope and collection if they don't exist.""" + try: + from couchbase.exceptions import ( + ScopeAlreadyExistsException, + CollectionAlreadyExistsException, + QueryIndexAlreadyExistsException, + ) + + bucket = cluster.bucket(bucket_name) + + # Create scope if it doesn't exist + try: + bucket.collections().create_scope(scope_name=scope_name) + except ScopeAlreadyExistsException: + pass + + # Create collection if it doesn't exist + try: + bucket.collections().create_collection( + collection_name=collection_name, scope_name=scope_name + ) + except CollectionAlreadyExistsException: + pass + + try: + bucket.scope(scope_name).collection( + collection_name + ).query_indexes().create_primary_index() + except QueryIndexAlreadyExistsException: + pass + + except Exception as e: + # Log the error but don't fail - collection might already exist + pass + + +def create_vector_index( + cluster: Cluster, + bucket_name: str, + scope_name: str, + collection_name: str, + index_name: str, +) -> None: + """Create vector index if it doesn't exist.""" + bucket = cluster.bucket(BUCKET_NAME) + scope = bucket.scope(SCOPE_NAME) + index_definition = load_json_file(f"{os.path.dirname(__file__)}/vector_index.json") + + sim = scope.search_indexes() + try: + sim.get_index(index_name=index_definition["name"]) + except SearchIndexNotFoundException as e: + type = index_definition["params"]["mapping"]["types"][ + "____scope.collection_____" + ] + del index_definition["params"]["mapping"]["types"]["____scope.collection_____"] + index_definition["params"]["mapping"]["types"][ + f"{SCOPE_NAME}.{COLLECTION_NAME}" + ] = type + search_index = SearchIndex( + name=index_definition["name"], + source_name=BUCKET_NAME, + source_type=index_definition["sourceType"], + params=index_definition["params"], + plan_params=index_definition["planParams"], + ) + sim.upsert_index(search_index) + + # Wait for the index to be ready + max_retries = 10 + retry_interval = 2 # seconds + for attempt in range(max_retries): + try: + # Check if index exists and is ready by getting document count + sim.get_indexed_documents_count(index_definition["name"]) + # If we can get the count, the index is ready + break + except Exception as e: + pass + + time.sleep(retry_interval) + if attempt == max_retries - 1: + pytest.skip( + f"Index {index_definition['name']} not ready after {max_retries} attempts" + ) + + +@pytest.fixture(scope="session") +def cluster() -> Cluster: """Get a couchbase cluster object.""" from datetime import timedelta @@ -69,17 +164,15 @@ def get_cluster() -> Any: options = ClusterOptions(auth) connect_string = CONNECTION_STRING cluster = Cluster(connect_string, options) - + bucket = cluster.bucket(BUCKET_NAME) # Wait until the cluster is ready for use. cluster.wait_until_ready(timedelta(seconds=5)) + create_scope_and_collection(cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME) + create_vector_index(cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME, INDEX_NAME) - return cluster - - -@pytest.fixture() -def cluster() -> Any: - """Get a couchbase cluster object.""" - return get_cluster() + yield cluster + bucket.collections().drop_scope(SCOPE_NAME) + cluster.close() def delete_documents( @@ -113,13 +206,13 @@ def node_embeddings() -> list[TextNode]: embedding=text_to_embedding("bar"), ), TextNode( - text="baz", + text="cake", id_="469e9537-7bc5-4669-9ff6-baa0ed086236", metadata={ "genre": "Thriller", "pages": 20, }, - embedding=text_to_embedding("baz"), + embedding=text_to_embedding("cake"), ), ] @@ -128,34 +221,36 @@ def node_embeddings() -> list[TextNode]: not set_all_env_vars(), reason="missing Couchbase environment variables" ) class TestCouchbaseSearchVectorStore: - @classmethod - def setup_method(self) -> None: - self.cluster = get_cluster() - # Delete all the documents in the collection - delete_documents(self.cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME) - self.vector_store = CouchbaseSearchVectorStore( - cluster=self.cluster, + @pytest.fixture() + def vector_store(self, cluster: Cluster) -> CouchbaseSearchVectorStore: + yield CouchbaseSearchVectorStore( + cluster=cluster, bucket_name=BUCKET_NAME, scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, index_name=INDEX_NAME, ) + delete_documents(cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME) - def test_add_documents(self, node_embeddings: List[TextNode]) -> None: + def test_add_documents( + self, vector_store: CouchbaseSearchVectorStore, node_embeddings: List[TextNode] + ) -> None: """Test adding documents to Couchbase vector store.""" input_doc_ids = [node_embedding.id_ for node_embedding in node_embeddings] # Add nodes to the couchbase vector - doc_ids = self.vector_store.add(node_embeddings) + doc_ids = vector_store.add(node_embeddings) # Ensure that all nodes are returned & they are the same as input assert len(doc_ids) == len(node_embeddings) for doc_id in doc_ids: assert doc_id in input_doc_ids - def test_search(self, node_embeddings: List[TextNode]) -> None: + def test_search( + self, vector_store: CouchbaseSearchVectorStore, node_embeddings: List[TextNode] + ) -> None: """Test end to end Couchbase vector search.""" # Add nodes to the couchbase vector - self.vector_store.add(node_embeddings) + vector_store.add(node_embeddings) # Wait for the documents to be indexed time.sleep(SLEEP_DURATION) @@ -165,7 +260,7 @@ def test_search(self, node_embeddings: List[TextNode]) -> None: query_embedding=text_to_embedding("foo"), similarity_top_k=1 ) - result = self.vector_store.query(q) + result = vector_store.query(q) assert result.nodes is not None and len(result.nodes) == 1 assert ( result.nodes[0].get_content(metadata_mode=MetadataMode.NONE) @@ -173,12 +268,12 @@ def test_search(self, node_embeddings: List[TextNode]) -> None: ) assert result.similarities is not None - def test_delete_doc(self) -> None: + def test_delete_doc(self, vector_store: CouchbaseSearchVectorStore) -> None: """Test delete document from Couchbase vector store.""" - storage_context = StorageContext.from_defaults(vector_store=self.vector_store) + storage_context = StorageContext.from_defaults(vector_store=vector_store) # Add nodes to the couchbase vector - VectorStoreIndex.from_documents( + store_index = VectorStoreIndex.from_documents( [ Document( text="hello", @@ -198,33 +293,35 @@ def test_delete_doc(self) -> None: similarity_top_k=1, ) - result = self.vector_store.query(q) + result = vector_store.query(q) assert result.nodes is not None and len(result.nodes) == 1 # Identify the document to delete ref_id_to_delete = result.nodes[0].ref_doc_id # Delete the document - self.vector_store.delete(ref_doc_id=ref_id_to_delete) + vector_store.delete(ref_doc_id=ref_id_to_delete) # Wait for the documents to be indexed time.sleep(SLEEP_DURATION) # Ensure that no results are returned - result = self.vector_store.query(q) + result = vector_store.query(q) assert len(result.nodes) == 0 - def test_search_with_filter(self, node_embeddings: List[TextNode]) -> None: + def test_search_with_filter( + self, vector_store: CouchbaseSearchVectorStore, node_embeddings: List[TextNode] + ) -> None: """Test end to end Couchbase vector search with filter.""" # Add nodes to the couchbase vector - self.vector_store.add(node_embeddings) + vector_store.add(node_embeddings) # Wait for the documents to be indexed time.sleep(SLEEP_DURATION) # similarity search q = VectorStoreQuery( - query_embedding=text_to_embedding("baz"), + query_embedding=text_to_embedding("cake"), similarity_top_k=1, filters=MetadataFilters( filters=[ @@ -234,17 +331,19 @@ def test_search_with_filter(self, node_embeddings: List[TextNode]) -> None: ), ) - result = self.vector_store.query(q) + result = vector_store.query(q) assert result.nodes is not None and len(result.nodes) == 1 assert ( result.nodes[0].metadata.get("genre") == "Thriller" and result.nodes[0].metadata.get("pages") == 20 ) - def test_hybrid_search(self, node_embeddings: List[TextNode]) -> None: + def test_hybrid_search( + self, vector_store: CouchbaseSearchVectorStore, node_embeddings: List[TextNode] + ) -> None: """Test the hybrid search functionality.""" # Add nodes to the couchbase vector - self.vector_store.add(node_embeddings) + vector_store.add(node_embeddings) # Wait for the documents to be indexed time.sleep(SLEEP_DURATION) @@ -253,7 +352,7 @@ def test_hybrid_search(self, node_embeddings: List[TextNode]) -> None: query_embedding=text_to_embedding("baz"), similarity_top_k=1, ) - result = self.vector_store.query(query) + result = vector_store.query(query) # similarity search hybrid_query = VectorStoreQuery( @@ -261,7 +360,7 @@ def test_hybrid_search(self, node_embeddings: List[TextNode]) -> None: similarity_top_k=1, ) - hybrid_result = self.vector_store.query( + hybrid_result = vector_store.query( hybrid_query, cb_search_options={ "query": {"field": "metadata.genre", "match": "Thriller"} @@ -273,49 +372,51 @@ def test_hybrid_search(self, node_embeddings: List[TextNode]) -> None: ) == hybrid_result.nodes[0].get_content(metadata_mode=MetadataMode.NONE) assert result.similarities[0] <= hybrid_result.similarities[0] - def test_output_fields(self, node_embeddings: List[TextNode]) -> None: + def test_output_fields( + self, vector_store: CouchbaseSearchVectorStore, node_embeddings: List[TextNode] + ) -> None: """Test the output fields functionality.""" # Add nodes to the couchbase vector - self.vector_store.add(node_embeddings) + vector_store.add(node_embeddings) # Wait for the documents to be indexed time.sleep(SLEEP_DURATION) q = VectorStoreQuery( - query_embedding=text_to_embedding("baz"), + query_embedding=text_to_embedding("cake"), similarity_top_k=1, output_fields=["text", "metadata.genre"], ) - result = self.vector_store.query(q) + result = vector_store.query(q) assert result.nodes is not None and len(result.nodes) == 1 - assert result.nodes[0].get_content(metadata_mode=MetadataMode.NONE) == "baz" + assert result.nodes[0].get_content(metadata_mode=MetadataMode.NONE) == "cake" assert result.nodes[0].metadata.get("genre") == "Thriller" +def load_json_file(file_path): + with open(file_path, "r") as file: + return json.load(file) + + class TestCouchbaseVectorStore(TestCouchbaseSearchVectorStore): - @classmethod - def setup_method(self) -> None: - self.cluster = get_cluster() - # Delete all the documents in the collection - delete_documents(self.cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME) - - # Now, actually instantiate and assign to self.vector_store so inherited tests use it. - # The warning has already been checked. - self.vector_store = CouchbaseVectorStore( - cluster=self.cluster, + @pytest.fixture() + def vector_store(self, cluster: Cluster) -> CouchbaseVectorStore: + yield CouchbaseVectorStore( + cluster=cluster, bucket_name=BUCKET_NAME, scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, index_name=INDEX_NAME, ) + delete_documents(cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME) - def test_deprecation_warning(self) -> None: + def test_deprecation_warning(self, cluster: Cluster) -> None: """Test that a deprecation warning is raised when instantiating CouchbaseVectorStore.""" with pytest.warns(DeprecationWarning) as warnings_raised: CouchbaseVectorStore( - cluster=self.cluster, + cluster=cluster, bucket_name=BUCKET_NAME, scope_name=SCOPE_NAME, collection_name=COLLECTION_NAME, diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_document_store_integration.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_document_store_integration.py deleted file mode 100644 index 4d9a4343c9..0000000000 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/test_document_store_integration.py +++ /dev/null @@ -1,348 +0,0 @@ -# SPDX-FileCopyrightText: 2023-present deepset GmbH -# -# SPDX-License-Identifier: Apache-2.0 -import pytest -from datetime import datetime -from typing import List - -from haystack.testing.document_store import DocumentStoreBaseTests -from haystack.dataclasses import Document, ByteStream -from haystack.document_stores.types import DuplicatePolicy -from haystack.utils.auth import Secret -from couchbase_haystack import ( - CouchbaseQueryDocumentStore, - QueryVectorSearchType, - QueryVectorSearchFunctionParams, -) -from couchbase_haystack.document_stores.auth import CouchbasePasswordAuthenticator -from couchbase_haystack.document_stores.cluster_options import CouchbaseClusterOptions -from couchbase.options import KnownConfigProfiles -from couchbase.exceptions import ( - ScopeAlreadyExistsException, - CollectionAlreadyExistsException, -) -from couchbase.options import QueryOptions -from datetime import timedelta -from sentence_transformers import SentenceTransformer -import time -import json -from pandas import DataFrame -from uuid import uuid1 - -model = SentenceTransformer("all-MiniLM-L6-v2") - -# Test configuration -TEST_BUCKET = "test_bucket" -TEST_SCOPE = "test_scope" -TEST_COLLECTION = "test_collection" -TEST_INDEX = "test_vector_index" -VECTOR_DIMENSION = 384 - - -class TestGSIDocumentStoreIntegration(DocumentStoreBaseTests): - @pytest.fixture(scope="class") - def sample_init_documents(self) -> List[Document]: - """Create sample documents for testing.""" - return [ - Document( - id=f"doc_init_{i}", - content=f"Test document {i}", - meta={ - "field1": f"value{i}", - "field2": i, - "created_at": datetime.now().isoformat(), - }, - embedding=[0.001 * i] * VECTOR_DIMENSION, - ) - for i in range(2048) - ] - - @pytest.fixture - def sample_documents(self) -> List[Document]: - """Create sample documents for testing.""" - return [ - Document( - id=f"doc_{i}", - content=f"Test document {i}", - meta={ - "field1": f"value{i}", - "field2": i, - "created_at": datetime.now().isoformat(), - }, - embedding=[0.001 * i] * VECTOR_DIMENSION, - ) - for i in range(1024) - ] - - @pytest.fixture(scope="class") - def document_store_with_index_creation(self, sample_init_documents): - # Create authenticator - authenticator = CouchbasePasswordAuthenticator( - username=Secret.from_env_var("USER_NAME"), - password=Secret.from_env_var("PASSWORD"), - ) - - # Create cluster options - cluster_options = CouchbaseClusterOptions( - protocol=KnownConfigProfiles.WanDevelopment - ) - - # Create document store - store = CouchbaseQueryDocumentStore( - cluster_connection_string=Secret.from_env_var("CONNECTION_STRING"), - authenticator=authenticator, - cluster_options=cluster_options, - bucket=TEST_BUCKET, - scope=TEST_SCOPE, - collection=TEST_COLLECTION, - index_name=TEST_INDEX, - query_vector_search_params=QueryVectorSearchFunctionParams( - search_type=QueryVectorSearchType.ANN, - dimension=VECTOR_DIMENSION, - similarity="L2", - ), - vector_field="embedding", - ) - - # Create scope if it doesn't exist - try: - store.bucket.collections().create_scope(scope_name=TEST_SCOPE) - except ScopeAlreadyExistsException: - pass - - # Create collection if it doesn't exist - try: - store.bucket.collections().create_collection( - collection_name=TEST_COLLECTION, scope_name=TEST_SCOPE - ) - except CollectionAlreadyExistsException: - pass - - # Write initial documents - store.write_documents(sample_init_documents, policy=DuplicatePolicy.OVERWRITE) - - with_opts = json.dumps( - { - "dimension": VECTOR_DIMENSION, - "description": "IVF1024,PQ32x8", - "similarity": "L2", - } - ) - # Create index before tests - result = store.scope.query( - f""" - CREATE INDEX {TEST_INDEX} - ON {TEST_BUCKET}.{TEST_SCOPE}.{TEST_COLLECTION} ({store.vector_field} VECTOR) - USING GSI WITH {with_opts} - """, - QueryOptions(timeout=timedelta(seconds=300)), - ).execute() - print(result) - # time.sleep(60) - - store.delete_documents([doc.id for doc in store.filter_documents()]) - - yield store - store.bucket.collections().drop_collection( - collection_name=TEST_COLLECTION, scope_name=TEST_SCOPE - ) - # Cleanup after tests - store.bucket.close() - - @pytest.fixture() - def document_store(self, document_store_with_index_creation): - yield document_store_with_index_creation - document_store_with_index_creation.delete_documents( - [doc.id for doc in document_store_with_index_creation.filter_documents()] - ) - - def assert_documents_are_equal( - self, received: List[Document], expected: List[Document] - ): - print(received, expected) - for r in received: - r.score = None - r.embedding = None - received_dict = {doc.id: doc for doc in received} - received = [] - for doc in expected: - received.append(received_dict.get(doc.id)) - doc.embedding = None - print("================") - print(received, expected) - print(len(received), len(expected)) - # print([doc.to_dict(flatten=False) if doc else doc for doc in received]) - # print([doc.to_dict(flatten=False) for doc in expected]) - super().assert_documents_are_equal(received, expected) - - def test_write_documents_duplicate_skip(self, document_store): - pass - - def test_no_filters(self, document_store: CouchbaseQueryDocumentStore): - """Test filter_documents() with empty filters""" - self.assert_documents_are_equal(document_store.filter_documents(), []) - self.assert_documents_are_equal(document_store.filter_documents(filters={}), []) - docs = [Document(content="test doc")] - document_store.write_documents(docs) - self.assert_documents_are_equal(document_store.filter_documents(), docs) - self.assert_documents_are_equal( - document_store.filter_documents(filters={}), docs - ) - - def test_write_documents(self, document_store: CouchbaseQueryDocumentStore): - documents = [ - Document(id=uuid1().hex, content="Haystack is an amazing tool for search."), - Document( - id=uuid1().hex, - content="We are using pre-trained models to generate embeddings.", - ), - Document(id=uuid1().hex, content="The weather is sunny today."), - ] - for doc in documents: - embedding = model.encode(doc.content).tolist() - doc.embedding = embedding - - assert document_store.write_documents(documents) == 3 - retrieved_docs = document_store.filter_documents() - assert len(retrieved_docs) == 3 - retrieved_docs.sort(key=lambda x: x.id) - self.assert_documents_are_equal(retrieved_docs, documents) - - def test_write_blob(self, document_store: CouchbaseQueryDocumentStore): - bytestream = ByteStream( - b"test", meta={"meta_key": "meta_value"}, mime_type="mime_type" - ) - documents = [Document(blob=bytestream)] - for doc in documents: - # Assuming blob_content is in bytes, decode it to string if necessary - embedding = model.encode(bytestream.data.decode("utf-8")).tolist() - doc.embedding = embedding - assert document_store.write_documents(documents) == 1 - retrieved_docs = document_store.filter_documents() - time.sleep(30) - self.assert_documents_are_equal(retrieved_docs, documents) - - def test_write_dataframe(self, document_store: CouchbaseQueryDocumentStore): - dataframe = DataFrame({"col1": [1, 2], "col2": [3, 4]}) - docs = [Document(dataframe=dataframe)] - document_store.write_documents(docs) - retrieved_docs = document_store.filter_documents() - self.assert_documents_are_equal(retrieved_docs, docs) - - def test_comparison_in1( - self, document_store: CouchbaseQueryDocumentStore, filterable_docs - ): - """Test filter_documents() with 'in' comparator""" - document_store.write_documents(filterable_docs) - # time.sleep(2000) - result = document_store.filter_documents( - {"field": "meta.number", "operator": "in", "value": [10, -10]} - ) - assert len(result) - expected = [ - d - for d in filterable_docs - if d.meta.get("number") is not None and d.meta["number"] in [10, -10] - ] - self.assert_documents_are_equal(result, expected) - - def test_complex_filter(self, document_store, filterable_docs): - document_store.write_documents(filterable_docs) - filters = { - "operator": "OR", - "conditions": [ - { - "operator": "AND", - "conditions": [ - {"field": "meta.number", "operator": "==", "value": 100}, - {"field": "meta.chapter", "operator": "==", "value": "intro"}, - ], - }, - { - "operator": "AND", - "conditions": [ - {"field": "meta.page", "operator": "==", "value": "90"}, - { - "field": "meta.chapter", - "operator": "==", - "value": "conclusion", - }, - ], - }, - ], - } - - result = document_store.filter_documents(filters=filters) - - self.assert_documents_are_equal( - result, - [ - d - for d in filterable_docs - if (d.meta.get("number") == 100 and d.meta.get("chapter") == "intro") - or ( - d.meta.get("page") == "90" and d.meta.get("chapter") == "conclusion" - ) - ], - ) - - def test_duplicate_document_handling(self, document_store, sample_documents): - """Test handling of duplicate documents.""" - # Write documents first time - document_store.write_documents(sample_documents) - - # Try to write same documents again with FAIL policy - with pytest.raises(Exception): - document_store.write_documents( - sample_documents, policy=DuplicatePolicy.FAIL - ) - - # Write with OVERWRITE policy - document_store.write_documents( - sample_documents, policy=DuplicatePolicy.OVERWRITE - ) - - # Verify document count hasn't changed - documents = document_store.filter_documents() - assert len(documents) == len(sample_documents) - - def test_vector_search( - self, document_store: CouchbaseQueryDocumentStore, sample_documents - ): - """Test vector search functionality.""" - # Write documents - document_store.write_documents(sample_documents) - - # Create a query embedding - query_embedding = [0.1] * VECTOR_DIMENSION - - # Perform vector search - results = document_store.vector_search(query_embedding, top_k=3) - - # Verify results - assert len(results) == 3 - assert all(hasattr(doc, "score") for doc in results) - print(results) - assert all(doc.score is not None for doc in results) - - # TODO: ADD logic to check if the results are correct - - def test_vector_search_with_filters(self, document_store, sample_documents): - """Test vector search with filters.""" - # Write documents - document_store.write_documents(sample_documents) - - # Create a query embedding - query_embedding = [0.1] * VECTOR_DIMENSION - - # Define filters - filters = {"field": "field2", "operator": ">", "value": 2} - - # Perform vector search with filters - results = document_store.vector_search( - query_embedding, top_k=3, filters=filters - ) - - # Verify results - assert len(results) <= 3 - assert all(doc.meta["field2"] > 2 for doc in results) diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/vector_index.json b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/vector_index.json new file mode 100644 index 0000000000..cb9b5b9fb6 --- /dev/null +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/tests/vector_index.json @@ -0,0 +1,89 @@ +{ + "name": "vector_search", + "type": "fulltext-index", + "sourceType": "gocbcore", + "sourceName": "test_bucket", + "planParams": { + "indexPartitions": 1, + "numReplicas": 0 + }, + "params": { + "doc_config": { + "docid_prefix_delim": "", + "docid_regexp": "", + "mode": "scope.collection.type_field", + "type_field": "type" + }, + "mapping": { + "default_analyzer": "standard", + "default_datetime_parser": "dateTimeOptional", + "index_dynamic": true, + "store_dynamic": true, + "default_mapping": { + "dynamic": true, + "enabled": false + }, + "types": { + "____scope.collection_____": { + "dynamic": false, + "enabled": true, + "properties": { + "text": { + "enabled": true, + "fields": [ + { + "docvalues": true, + "include_in_all": false, + "include_term_vectors": false, + "index": true, + "name": "text", + "store": true, + "type": "text" + } + ] + }, + "embedding": { + "enabled": true, + "dynamic": false, + "fields": [ + { + "vector_index_optimized_for": "recall", + "docvalues": true, + "dims": 1536, + "include_in_all": false, + "include_term_vectors": false, + "index": true, + "name": "embedding", + "similarity": "dot_product", + "store": true, + "type": "vector" + } + ] + }, + "metadata": { + "dynamic": true, + "enabled": true, + "properties": { + "name": { + "enabled": true, + "fields": [ + { + "docvalues": true, + "include_in_all": false, + "include_term_vectors": false, + "index": true, + "name": "name", + "store": true, + "analyzer": "keyword", + "type": "text" + } + ] + } + } + } + } + } + } + } + } +} From f0f0b4041369aa5524c18544d4f015d492ea32ac Mon Sep 17 00:00:00 2001 From: Shyam Venkat Date: Wed, 29 Oct 2025 11:49:01 +0530 Subject: [PATCH 6/6] docs update --- .../README.md | 157 +++++++++++++++--- .../vector_stores/couchbase/__init__.py | 2 + .../vector_stores/couchbase/base.py | 41 ++++- .../pyproject.toml | 2 +- 4 files changed, 171 insertions(+), 31 deletions(-) diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/README.md b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/README.md index 3e339ca500..f5c8c371b2 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/README.md +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/README.md @@ -1,6 +1,6 @@ # LlamaIndex Vector Stores Integration: Couchbase -This package provides Couchbase vector store integrations for LlamaIndex, offering multiple implementation options for vector similarity search. +This package provides Couchbase vector store integrations for LlamaIndex, offering multiple implementation options for vector similarity search based on Couchbase Server's native vector indexing capabilities. ## Installation @@ -12,11 +12,16 @@ pip install llama-index-vector-stores-couchbase ### CouchbaseSearchVectorStore -Uses Couchbase Full-Text Search (FTS) with vector search capabilities. +Implements [Search Vector Indexes](https://docs.couchbase.com/server/current/vector-index/use-vector-indexes.html) using Couchbase Full-Text Search (FTS) with vector search capabilities. Ideal for hybrid searches combining vector, full-text, and geospatial searches. ### CouchbaseQueryVectorStore (Recommended) -Uses Couchbase Global Secondary Index (GSI) with BHIVE vector search support for high-performance ANN operations. +Implements both [Hyperscale Vector Indexes](https://docs.couchbase.com/server/current/vector-index/use-vector-indexes.html) and [Composite Vector Indexes](https://docs.couchbase.com/server/current/vector-index/use-vector-indexes.html) using Couchbase Query Service with SQL++ and vector search functions. Supports: + +- **Hyperscale Vector Indexes**: Purpose-built for pure vector searches at massive scale with minimal memory footprint +- **Composite Vector Indexes**: Best for combining vector similarity with scalar filters that exclude large portions of the dataset + +Can scale to billions of documents. Requires Couchbase Server 8.0+. ### CouchbaseVectorStore (Deprecated) @@ -25,12 +30,13 @@ Uses Couchbase Global Secondary Index (GSI) with BHIVE vector search support for ## Requirements - Python >= 3.9, < 4.0 -- Couchbase Server with vector search capabilities -- couchbase >= 4.2.0, < 5 +- Couchbase Server 7.6+ for Search Vector Indexes +- Couchbase Server 8.0+ for Hyperscale and Composite Vector Indexes +- couchbase >= 4.5.0 ## Basic Usage -### Using CouchbaseSearchVectorStore (FTS-based) +### Using CouchbaseSearchVectorStore (Search Vector Indexes) ```python from llama_index.vector_stores.couchbase import CouchbaseSearchVectorStore @@ -55,53 +61,156 @@ vector_store = CouchbaseSearchVectorStore( ) ``` -### Using CouchbaseQueryVectorStore (GSI-based) +### Using CouchbaseQueryVectorStore (Hyperscale & Composite Vector Indexes) ```python from llama_index.vector_stores.couchbase import ( CouchbaseQueryVectorStore, QueryVectorSearchType, + QueryVectorSearchSimilarity, ) -# Initialize GSI-based vector store +# Initialize Query Service-based vector store +# Works with both Hyperscale Vector Indexes (pure vector search) +# and Composite Vector Indexes (vector + scalar filters) vector_store = CouchbaseQueryVectorStore( cluster=cluster, bucket_name="my_bucket", scope_name="my_scope", collection_name="my_collection", search_type=QueryVectorSearchType.ANN, # or QueryVectorSearchType.KNN - similarity="cosine", # or "euclidean", "dot_product" - nprobes=10, # Optional: number of probes for ANN search + similarity=QueryVectorSearchSimilarity.COSINE, # Can also use string: "cosine", "euclidean", "dot_product" + nprobes=10, # Optional: number of probes for ANN search (only for ANN) text_key="text", embedding_key="embedding", metadata_key="metadata", ) ``` +## Configuration Options + +### Search Types + +The `QueryVectorSearchType` enum defines the type of vector search to perform: + +- `QueryVectorSearchType.ANN` - Approximate Nearest Neighbor (recommended for large datasets) +- `QueryVectorSearchType.KNN` - K-Nearest Neighbor (exact search) + +### Similarity Metrics + +The `QueryVectorSearchSimilarity` enum provides various distance metrics: + +- `QueryVectorSearchSimilarity.COSINE` - Cosine similarity (range: -1 to 1) +- `QueryVectorSearchSimilarity.DOT` - Dot product similarity +- `QueryVectorSearchSimilarity.L2` or `EUCLIDEAN` - Euclidean distance +- `QueryVectorSearchSimilarity.L2_SQUARED` or `EUCLIDEAN_SQUARED` - Squared Euclidean distance + +You can also use lowercase strings: `"cosine"`, `"dot_product"`, `"euclidean"`, etc. + ## Features -- **Multiple Search Types**: Support for both GSI-based and FTS vector search -- **Flexible Similarity Metrics**: Cosine, Euclidean, and dot product similarities +- **Multiple Index Types**: Support for all three Couchbase vector index types: + - Hyperscale Vector Indexes (Query Service-based, 8.0+) + - Composite Vector Indexes (Query Service-based, 8.0+) + - Search Vector Indexes (FTS-based, 7.6+) +- **Flexible Similarity Metrics**: Multiple distance metrics including: + - COSINE (Cosine similarity) + - DOT (Dot product) + - L2 / EUCLIDEAN (Euclidean distance) + - L2_SQUARED / EUCLIDEAN_SQUARED (Squared Euclidean distance) - **Metadata Filtering**: Advanced filtering capabilities using LlamaIndex MetadataFilters - **Batch Operations**: Efficient batch insertion with configurable batch sizes -- **High Performance**: BHIVE index support for approximate nearest neighbor (ANN) search -- **Scoped Indexes**: Support for both scoped and global search indexes in FTS-based vector search +- **High Performance**: ANN and KNN search support for efficient nearest neighbor queries +- **Massive Scalability**: Hyperscale and Composite indexes can scale to billions of documents + +## Implementation Details + +### Query Service-Based Vector Indexes (`CouchbaseQueryVectorStore`) + +`CouchbaseQueryVectorStore` supports both **Hyperscale Vector Indexes** and **Composite Vector Indexes**, which use the Couchbase Query Service with SQL++ queries and vector search functions. + +#### Hyperscale Vector Indexes + +Purpose-built for pure vector searches at massive scale: + +**When to Use:** + +- Pure vector similarity searches without complex scalar filtering +- Content discovery, recommendations, reverse image search +- Chatbot context matching (e.g., RAG workflows) +- Anomaly detection in IoT sensor networks +- Datasets from tens of millions to billions of documents + +**Key Characteristics:** + +- Optimized specifically for vector searches +- Higher accuracy at lower quantizations +- Low memory footprint (most index data on disk) +- Best TCO for huge datasets +- Excellent for concurrent updates and searches +- Scalar values and vectors compared simultaneously + +#### Composite Vector Indexes + +Combine a Global Secondary Index (GSI) with vector search functions: + +**When to Use:** + +- Searches that combine vector similarity with scalar filters +- When scalar filters can exclude large portions (>20%) of the dataset +- Applications requiring compliance-based restrictions on results +- Content recommendations, job searches, supply chain management +- Datasets from tens of millions to billions of documents + +**Key Characteristics:** + +- Scalar filters are applied _before_ vector search, reducing vectors to compare +- Efficient when scalar values have low selectivity (exclude <20% of dataset) +- Can exclude nearest neighbors based on scalar values (useful for compliance) +- Can scale to billions of documents + +#### Search Types (Both Hyperscale & Composite) + +- **ANN (Approximate Nearest Neighbor)**: Faster approximate search with configurable `nprobes` parameter for accuracy/speed tradeoff +- **KNN (K-Nearest Neighbor)**: Exact nearest neighbor search for maximum accuracy + +### Search Vector Indexes (`CouchbaseSearchVectorStore`) + +Search Vector Indexes combine Full-Text Search (FTS) with vector search capabilities: + +**When to Use:** + +- Hybrid searches combining vector, full-text, and geospatial searches +- Applications like e-commerce product search, travel recommendations, or real estate searches +- Datasets up to tens of millions of documents + +**Key Characteristics:** + +- Combines semantic search with keyword and geospatial searches in a single query +- Supports both scoped and global indexes +- Ideal for multi-modal search scenarios + +### Metadata Filtering -## Search Types +Both implementations support metadata filtering: -### ANN (Approximate Nearest Neighbor) +- Filter by document attributes using standard LlamaIndex `MetadataFilters` +- Supports operators: `==`, `!=`, `>`, `<`, `>=`, `<=`, `IN`, `NIN` +- Combine filters with `AND`/`OR` conditions -- Uses BHIVE indexes for high-performance approximate search -- Configurable nprobes parameter for accuracy/speed tradeoff -- Recommended for large-scale deployments +### Choosing the Right Index Type -### KNN (K-Nearest Neighbor) +The same `CouchbaseQueryVectorStore` class works with both Hyperscale and Composite Vector Indexes. The choice of which underlying index type to use is determined by the index you create on your Couchbase collection. -- Exact nearest neighbor search -- Higher accuracy but potentially slower for large datasets -- Good for smaller datasets or when exact results are required +| Feature | Hyperscale (via QueryVectorStore) | Composite (via QueryVectorStore) | Search (via SearchVectorStore) | +| ------------------- | ------------------------------------ | -------------------------------- | ---------------------------------- | +| **Index Type** | Hyperscale Vector Index | Composite Vector Index | Search Vector Index | +| **Best For** | Pure vector searches | Vector + scalar filters | Vector + full-text + geospatial | +| **Available Since** | Couchbase Server 8.0 | Couchbase Server 8.0 | Couchbase Server 7.6 | +| **Scalar Handling** | Compared with vectors simultaneously | Pre-filters before vector search | Searches in parallel | +| **Use Cases** | Content discovery, RAG, image search | Job search, compliance filtering | E-commerce, travel recommendations | -For more information, refer to: [Couchbase Vector Search Concepts](https://preview.docs-test.couchbase.com/docs-server-DOC-12565_vector_search_concepts/server/current/vector-index/use-vector-indexes.html) +For more information, refer to: [Couchbase Vector Search Documentation](https://docs.couchbase.com/server/current/vector-index/use-vector-indexes.html) ## License diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/__init__.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/__init__.py index 718a10c568..4dc79b15c6 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/__init__.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/__init__.py @@ -6,6 +6,7 @@ CouchbaseQueryVectorStore, # GSI-based with BHIVE support CouchbaseVectorStoreBase, # Base class QueryVectorSearchType, # Enum for search types + QueryVectorSearchSimilarity, # Enum for similarity metrics ) __all__ = [ @@ -14,4 +15,5 @@ "CouchbaseQueryVectorStore", "CouchbaseVectorStoreBase", "QueryVectorSearchType", + "QueryVectorSearchSimilarity", ] diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py index 3bcd6b556f..30b3bd1e00 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/llama_index/vector_stores/couchbase/base.py @@ -521,10 +521,19 @@ def _format_metadata(self, row_fields: Dict[str, Any]) -> Dict[str, Any]: class CouchbaseSearchVectorStore(CouchbaseVectorStoreBase): """ - Couchbase Vector Store using Full-Text Search (FTS). + Couchbase Vector Store using Search Vector Indexes (FTS-based). + + This implementation uses Couchbase's Search Vector Indexes, which combine + Full-Text Search (FTS) with vector search capabilities. Ideal for hybrid + searches combining vector similarity, full-text search, and geospatial queries. + + Supports datasets up to tens of millions of documents. + Requires Couchbase Server 7.6 or later. To use, you should have the ``couchbase`` python package installed. + For more information, see: + https://docs.couchbase.com/server/current/vector-index/use-vector-indexes.html """ _index_name: str = PrivateAttr() @@ -723,12 +732,32 @@ def _check_index_exists(self) -> bool: class CouchbaseQueryVectorStore(CouchbaseVectorStoreBase): """ - Couchbase Vector Store using Global Secondary Index (GSI) with vector search capabilities. + Couchbase Vector Store using Query Service with vector search capabilities. + + This implementation supports both Hyperscale Vector Indexes and Composite Vector + Indexes, which use the Couchbase Query Service with SQL++ and vector search functions. + + Hyperscale Vector Indexes: + - Purpose-built for pure vector searches at massive scale + - Lowest memory footprint (most index data on disk) + - Higher accuracy at lower quantizations + - Best for content discovery, RAG workflows, image search, anomaly detection + + Composite Vector Indexes: + - Combine Global Secondary Index (GSI) with vector search functions + - Scalar filters applied BEFORE vector search (reduces vectors to compare) + - Best for searches combining vector similarity with scalar filters + - Useful for compliance requirements (can exclude results based on scalars) + + Key features: + - Supports both ANN (Approximate) and KNN (Exact) nearest neighbor searches + - Can scale to billions of documents + - Various similarity metrics (COSINE, DOT, L2/EUCLIDEAN, L2_SQUARED) + + Requires Couchbase Server 8.0 or later. - This implementation supports: - - BHIVE indexes for high-performance ANN vector search - - Composite Secondary Indexes with vector search functions - - Various similarity metrics (cosine, euclidean, dot_product) + For more information, see: + https://docs.couchbase.com/server/current/vector-index/use-vector-indexes.html """ _search_type: QueryVectorSearchType = PrivateAttr() diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/pyproject.toml b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/pyproject.toml index 646d8531ae..71373c946a 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/pyproject.toml +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-couchbase/pyproject.toml @@ -26,7 +26,7 @@ dev = [ [project] name = "llama-index-vector-stores-couchbase" -version = "0.5.0" +version = "0.6.0" description = "llama-index vector_stores couchbase integration" authors = [{name = "Couchbase", email = "devadvocates@couchbase.com"}] requires-python = ">=3.9,<4.0"