From 312547da733ff438e2e84a2a49a39d1b93d2aaab Mon Sep 17 00:00:00 2001 From: root Date: Mon, 7 Apr 2025 13:39:03 -0700 Subject: [PATCH 1/8] adding laion dataset' --- datasets/datasets.json | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/datasets/datasets.json b/datasets/datasets.json index 7a165eb6..c776917d 100644 --- a/datasets/datasets.json +++ b/datasets/datasets.json @@ -322,5 +322,14 @@ "type": "tar", "link": "https://storage.googleapis.com/ann-filtered-benchmark/datasets/random_keywords_1m_vocab_10_no_filters.tgz", "path": "random-100-match-kw-small-vocab/random_keywords_1m_vocab_10_no_filters" + }, + { + "name": "laion-img-emb-512-100M-cosine", + "vector_size": 512, + "distance": "cosine", + "type": "h5", + "path": "laion-img-emb-512/laion-img-emb-512-100M-cosine.hdf5", + "link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/laion400m/laion-img-emb-512-100M-cosine.hdf5" } + ] From 0c3670980f525efe0aefa56abd0527d5386a668f Mon Sep 17 00:00:00 2001 From: root Date: Mon, 7 Apr 2025 13:41:34 -0700 Subject: [PATCH 2/8] adding laion dataset 1M --- datasets/datasets.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datasets/datasets.json b/datasets/datasets.json index c776917d..ee7ce131 100644 --- a/datasets/datasets.json +++ b/datasets/datasets.json @@ -324,11 +324,11 @@ "path": "random-100-match-kw-small-vocab/random_keywords_1m_vocab_10_no_filters" }, { - "name": "laion-img-emb-512-100M-cosine", + "name": "laion-img-emb-512-1M-cosine", "vector_size": 512, "distance": "cosine", "type": "h5", - "path": "laion-img-emb-512/laion-img-emb-512-100M-cosine.hdf5", + "path": "laion-img-emb-512/laion-img-emb-512-1M-cosine.hdf5", "link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/laion400m/laion-img-emb-512-100M-cosine.hdf5" } From 3b5e2c95563152f377e59e4917a4c4ae8ffcc0fd Mon Sep 17 00:00:00 2001 From: root Date: Mon, 7 Apr 2025 14:21:23 -0700 Subject: [PATCH 3/8] chunk up the iterable before starting the processes --- engine/base_client/search.py | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 93368a3f..93575420 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -2,6 +2,7 @@ import time from multiprocessing import get_context from typing import Iterable, List, Optional, Tuple +from itertools import islice import numpy as np import tqdm @@ -79,22 +80,31 @@ def search_all( else: ctx = get_context(self.get_mp_start_method()) - with ctx.Pool( - processes=parallel, - initializer=self.__class__.init_client, - initargs=( + def process_initializer(): + """Initialize each process before starting the search.""" + self.__class__.init_client( self.host, distance, self.connection_params, self.search_params, - ), + ) + self.setup_search() + + # Dynamically chunk the generator + query_chunks = list(chunked_iterable(queries, max(1, parallel))) + + with ctx.Pool( + processes=parallel, + initializer=process_initializer, ) as pool: if parallel > 10: time.sleep(15) # Wait for all processes to start start = time.perf_counter() - precisions, latencies = list( - zip(*pool.imap_unordered(search_one, iterable=tqdm.tqdm(queries))) + results = pool.starmap( + process_chunk, + [(chunk, search_one) for chunk in query_chunks], ) + precisions, latencies = zip(*[result for chunk in results for result in chunk]) total_time = time.perf_counter() - start @@ -123,3 +133,15 @@ def post_search(self): @classmethod def delete_client(cls): pass + + +def chunked_iterable(iterable, size): + """Yield successive chunks of a given size from an iterable.""" + it = iter(iterable) + while chunk := list(islice(it, size)): + yield chunk + + +def process_chunk(chunk, search_one): + """Process a chunk of queries using the search_one function.""" + return [search_one(query) for query in chunk] From 92b4ddb7dddba8496695cc17a1a565887e13d49a Mon Sep 17 00:00:00 2001 From: Martin Dimitrov Date: Mon, 7 Apr 2025 14:30:27 -0700 Subject: [PATCH 4/8] replace arbitrary 15 sec wait with barrier --- engine/base_client/search.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 93575420..30732650 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -1,6 +1,6 @@ import functools import time -from multiprocessing import get_context +from multiprocessing import get_context, Barrier from typing import Iterable, List, Optional, Tuple from itertools import islice @@ -80,6 +80,9 @@ def search_all( else: ctx = get_context(self.get_mp_start_method()) + # Create a Barrier to synchronize processes + barrier = Barrier(parallel) + def process_initializer(): """Initialize each process before starting the search.""" self.__class__.init_client( @@ -89,6 +92,7 @@ def process_initializer(): self.search_params, ) self.setup_search() + barrier.wait() # Wait for all processes to be ready # Dynamically chunk the generator query_chunks = list(chunked_iterable(queries, max(1, parallel))) @@ -97,8 +101,6 @@ def process_initializer(): processes=parallel, initializer=process_initializer, ) as pool: - if parallel > 10: - time.sleep(15) # Wait for all processes to start start = time.perf_counter() results = pool.starmap( process_chunk, From deaf5abaca3513ff24ff40e59e12be565acb3f40 Mon Sep 17 00:00:00 2001 From: Martin Dimitrov Date: Tue, 8 Apr 2025 11:06:54 -0700 Subject: [PATCH 5/8] fixed chunk to correct size --- engine/base_client/search.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 30732650..1ba7d51e 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -64,6 +64,9 @@ def search_all( parallel = self.search_params.get("parallel", 1) top = self.search_params.get("top", None) + # Convert queries to a list to calculate its length + queries = list(queries) # This allows us to calculate len(queries) + # setup_search may require initialized client self.init_client( self.host, distance, self.connection_params, self.search_params @@ -94,8 +97,9 @@ def process_initializer(): self.setup_search() barrier.wait() # Wait for all processes to be ready - # Dynamically chunk the generator - query_chunks = list(chunked_iterable(queries, max(1, parallel))) + # Dynamically calculate chunk size + chunk_size = max(1, len(queries) // parallel) + query_chunks = list(chunked_iterable(queries, chunk_size)) with ctx.Pool( processes=parallel, From 4c1d080cf960cdcc5b52ae45ebf85e3595eb1e43 Mon Sep 17 00:00:00 2001 From: Martin Dimitrov Date: Tue, 8 Apr 2025 14:18:55 -0700 Subject: [PATCH 6/8] implemented custom process management , instead of using pool --- datasets/datasets.json | 8 +++++ engine/base_client/client.py | 9 ++++-- engine/base_client/search.py | 59 ++++++++++++++++++++++-------------- 3 files changed, 51 insertions(+), 25 deletions(-) diff --git a/datasets/datasets.json b/datasets/datasets.json index ee7ce131..453e8c6b 100644 --- a/datasets/datasets.json +++ b/datasets/datasets.json @@ -330,6 +330,14 @@ "type": "h5", "path": "laion-img-emb-512/laion-img-emb-512-1M-cosine.hdf5", "link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/laion400m/laion-img-emb-512-100M-cosine.hdf5" + }, + { + "name": "laion-img-emb-512-1M-100ktrain-cosine", + "vector_size": 512, + "distance": "cosine", + "type": "h5", + "path": "laion-img-emb-512/laion-img-emb-512-1M-100ktrain-cosine.hdf5", + "link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/laion400m/laion-img-emb-512-100M-cosine.hdf5" } ] diff --git a/engine/base_client/client.py b/engine/base_client/client.py index 0f262d34..26d2a0e5 100644 --- a/engine/base_client/client.py +++ b/engine/base_client/client.py @@ -36,8 +36,9 @@ def save_search_results( ): now = datetime.now() timestamp = now.strftime("%Y-%m-%d-%H-%M-%S") + pid = os.getpid() # Get the current process ID experiments_file = ( - f"{self.name}-{dataset_name}-search-{search_id}-{timestamp}.json" + f"{self.name}-{dataset_name}-search-{search_id}-{pid}-{timestamp}.json" ) result_path = RESULTS_DIR / experiments_file with open(result_path, "w") as out: @@ -89,7 +90,8 @@ def run_experiment( reader = dataset.get_reader(execution_params.get("normalize", False)) if skip_if_exists: - glob_pattern = f"{self.name}-{dataset.config.name}-search-*-*.json" + pid = os.getpid() # Get the current process ID + glob_pattern = f"{self.name}-{dataset.config.name}-search-{pid}-*-*.json" existing_results = list(RESULTS_DIR.glob(glob_pattern)) if len(existing_results) == len(self.searchers): print( @@ -124,8 +126,9 @@ def run_experiment( for search_id, searcher in enumerate(self.searchers): if skip_if_exists: + pid = os.getpid() # Get the current process ID glob_pattern = ( - f"{self.name}-{dataset.config.name}-search-{search_id}-*.json" + f"{self.name}-{dataset.config.name}-search-{search_id}-{pid}-*.json" ) existing_results = list(RESULTS_DIR.glob(glob_pattern)) print("Pattern", glob_pattern, "Results:", existing_results) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 1ba7d51e..c1408b29 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -1,6 +1,6 @@ import functools import time -from multiprocessing import get_context, Barrier +from multiprocessing import get_context, Barrier, Process, Queue from typing import Iterable, List, Optional, Tuple from itertools import islice @@ -75,19 +75,21 @@ def search_all( search_one = functools.partial(self.__class__._search_one, top=top) + # Initialize the start time + start = time.perf_counter() + if parallel == 1: - start = time.perf_counter() + # Single-threaded execution precisions, latencies = list( zip(*[search_one(query) for query in tqdm.tqdm(queries)]) ) else: - ctx = get_context(self.get_mp_start_method()) - - # Create a Barrier to synchronize processes - barrier = Barrier(parallel) + # Dynamically calculate chunk size + chunk_size = max(1, len(queries) // parallel) + query_chunks = list(chunked_iterable(queries, chunk_size)) - def process_initializer(): - """Initialize each process before starting the search.""" + # Function to be executed by each worker process + def worker_function(chunk, result_queue): self.__class__.init_client( self.host, distance, @@ -95,22 +97,30 @@ def process_initializer(): self.search_params, ) self.setup_search() - barrier.wait() # Wait for all processes to be ready + results = process_chunk(chunk, search_one) + result_queue.put(results) - # Dynamically calculate chunk size - chunk_size = max(1, len(queries) // parallel) - query_chunks = list(chunked_iterable(queries, chunk_size)) + # Create a queue to collect results + result_queue = Queue() - with ctx.Pool( - processes=parallel, - initializer=process_initializer, - ) as pool: - start = time.perf_counter() - results = pool.starmap( - process_chunk, - [(chunk, search_one) for chunk in query_chunks], - ) - precisions, latencies = zip(*[result for chunk in results for result in chunk]) + # Create and start worker processes + processes = [] + for chunk in query_chunks: + process = Process(target=worker_function, args=(chunk, result_queue)) + processes.append(process) + process.start() + + # Collect results from all worker processes + results = [] + for _ in processes: + results.extend(result_queue.get()) + + # Wait for all worker processes to finish + for process in processes: + process.join() + + # Extract precisions and latencies + precisions, latencies = zip(*results) total_time = time.perf_counter() - start @@ -151,3 +161,8 @@ def chunked_iterable(iterable, size): def process_chunk(chunk, search_one): """Process a chunk of queries using the search_one function.""" return [search_one(query) for query in chunk] + + +def process_chunk_wrapper(chunk, search_one): + """Wrapper to process a chunk of queries.""" + return process_chunk(chunk, search_one) From 0d513c26683181c90f88afed279ab29baf55256b Mon Sep 17 00:00:00 2001 From: Martin Dimitrov Date: Tue, 8 Apr 2025 14:31:40 -0700 Subject: [PATCH 7/8] measure time only during the critical work --- engine/base_client/search.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index c1408b29..3eae6539 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -75,14 +75,11 @@ def search_all( search_one = functools.partial(self.__class__._search_one, top=top) - # Initialize the start time - start = time.perf_counter() - if parallel == 1: # Single-threaded execution - precisions, latencies = list( - zip(*[search_one(query) for query in tqdm.tqdm(queries)]) - ) + start = time.perf_counter() + results = [search_one(query) for query in tqdm.tqdm(queries)] + total_time = time.perf_counter() - start else: # Dynamically calculate chunk size chunk_size = max(1, len(queries) // parallel) @@ -110,6 +107,9 @@ def worker_function(chunk, result_queue): processes.append(process) process.start() + # Start measuring time for the critical work + start = time.perf_counter() + # Collect results from all worker processes results = [] for _ in processes: @@ -119,10 +119,11 @@ def worker_function(chunk, result_queue): for process in processes: process.join() - # Extract precisions and latencies - precisions, latencies = zip(*results) + # Stop measuring time for the critical work + total_time = time.perf_counter() - start - total_time = time.perf_counter() - start + # Extract precisions and latencies (outside the timed section) + precisions, latencies = zip(*results) self.__class__.delete_client() From 2c592a09b3ea094034bd3e015bdbb8589e8da344 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Po=C5=BAniak?= Date: Wed, 9 Apr 2025 09:22:19 -0700 Subject: [PATCH 8/8] Add itertools before islice function --- engine/base_client/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index b0c153c3..7cde4e6e 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -166,7 +166,7 @@ def delete_client(cls): def chunked_iterable(iterable, size): """Yield successive chunks of a given size from an iterable.""" it = iter(iterable) - while chunk := list(islice(it, size)): + while chunk := list(itertools.islice(it, size)): yield chunk