diff --git a/datasets/datasets.json b/datasets/datasets.json index f9728a68..71da5d2e 100644 --- a/datasets/datasets.json +++ b/datasets/datasets.json @@ -1203,5 +1203,22 @@ "type": "tar", "link": "https://storage.googleapis.com/ann-filtered-benchmark/datasets/random_keywords_1m_vocab_10_no_filters.tgz", "path": "random-100-match-kw-small-vocab/random_keywords_1m_vocab_10_no_filters" + }, + { + "name": "laion-img-emb-512-1M-cosine", + "vector_size": 512, + "distance": "cosine", + "type": "h5", + "path": "laion-img-emb-512/laion-img-emb-512-1M-cosine.hdf5", + "link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/laion400m/laion-img-emb-512-100M-cosine.hdf5" + }, + { + "name": "laion-img-emb-512-1M-100ktrain-cosine", + "vector_size": 512, + "distance": "cosine", + "type": "h5", + "path": "laion-img-emb-512/laion-img-emb-512-1M-100ktrain-cosine.hdf5", + "link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/laion400m/laion-img-emb-512-100M-cosine.hdf5" } + ] diff --git a/engine/base_client/client.py b/engine/base_client/client.py index c78b65bc..1d0fa69b 100644 --- a/engine/base_client/client.py +++ b/engine/base_client/client.py @@ -40,8 +40,9 @@ def save_search_results( ): now = datetime.now() timestamp = now.strftime("%Y-%m-%d-%H-%M-%S") + pid = os.getpid() # Get the current process ID experiments_file = ( - f"{self.name}-{dataset_name}-search-{search_id}-{timestamp}.json" + f"{self.name}-{dataset_name}-search-{search_id}-{pid}-{timestamp}.json" ) result_path = RESULTS_DIR / experiments_file with open(result_path, "w") as out: @@ -97,7 +98,8 @@ def run_experiment( reader = dataset.get_reader(execution_params.get("normalize", False)) if skip_if_exists: - glob_pattern = f"{self.name}-{dataset.config.name}-search-*-*.json" + pid = os.getpid() # Get the current process ID + glob_pattern = f"{self.name}-{dataset.config.name}-search-{pid}-*-*.json" existing_results = list(RESULTS_DIR.glob(glob_pattern)) if len(existing_results) == len(self.searchers): print( @@ -135,8 +137,9 @@ def run_experiment( print("Experiment stage: Search") for search_id, searcher in enumerate(self.searchers): if skip_if_exists: + pid = os.getpid() # Get the current process ID glob_pattern = ( - f"{self.name}-{dataset.config.name}-search-{search_id}-*.json" + f"{self.name}-{dataset.config.name}-search-{search_id}-{pid}-*.json" ) existing_results = list(RESULTS_DIR.glob(glob_pattern)) print("Pattern", glob_pattern, "Results:", existing_results) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index a52ab47d..7cde4e6e 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -1,6 +1,6 @@ import functools import time -from multiprocessing import get_context +from multiprocessing import get_context, Barrier, Process, Queue from typing import Iterable, List, Optional, Tuple import itertools @@ -65,6 +65,10 @@ def search_all( ): parallel = self.search_params.get("parallel", 1) top = self.search_params.get("top", None) + + # Convert queries to a list to calculate its length + queries = list(queries) # This allows us to calculate len(queries) + # setup_search may require initialized client self.init_client( self.host, distance, self.connection_params, self.search_params @@ -80,31 +84,56 @@ def search_all( print(f"Limiting queries to [0:{MAX_QUERIES-1}]") if parallel == 1: + # Single-threaded execution start = time.perf_counter() - precisions, latencies = list( - zip(*[search_one(query) for query in tqdm.tqdm(used_queries)]) - ) + + results = [search_one(query) for query in tqdm.tqdm(queries)] + total_time = time.perf_counter() - start + else: - ctx = get_context(self.get_mp_start_method()) + # Dynamically calculate chunk size + chunk_size = max(1, len(queries) // parallel) + query_chunks = list(chunked_iterable(queries, chunk_size)) - with ctx.Pool( - processes=parallel, - initializer=self.__class__.init_client, - initargs=( + # Function to be executed by each worker process + def worker_function(chunk, result_queue): + self.__class__.init_client( self.host, distance, self.connection_params, self.search_params, - ), - ) as pool: - if parallel > 10: - time.sleep(15) # Wait for all processes to start - start = time.perf_counter() - precisions, latencies = list( - zip(*pool.imap_unordered(search_one, iterable=tqdm.tqdm(used_queries))) ) + self.setup_search() + results = process_chunk(chunk, search_one) + result_queue.put(results) + + # Create a queue to collect results + result_queue = Queue() + + # Create and start worker processes + processes = [] + for chunk in query_chunks: + process = Process(target=worker_function, args=(chunk, result_queue)) + processes.append(process) + process.start() + + # Start measuring time for the critical work + start = time.perf_counter() - total_time = time.perf_counter() - start + # Collect results from all worker processes + results = [] + for _ in processes: + results.extend(result_queue.get()) + + # Wait for all worker processes to finish + for process in processes: + process.join() + + # Stop measuring time for the critical work + total_time = time.perf_counter() - start + + # Extract precisions and latencies (outside the timed section) + precisions, latencies = zip(*results) self.__class__.delete_client() @@ -132,3 +161,20 @@ def post_search(self): @classmethod def delete_client(cls): pass + + +def chunked_iterable(iterable, size): + """Yield successive chunks of a given size from an iterable.""" + it = iter(iterable) + while chunk := list(itertools.islice(it, size)): + yield chunk + + +def process_chunk(chunk, search_one): + """Process a chunk of queries using the search_one function.""" + return [search_one(query) for query in chunk] + + +def process_chunk_wrapper(chunk, search_one): + """Wrapper to process a chunk of queries.""" + return process_chunk(chunk, search_one)