From 0209d919aa2a06120a05997e5a97f3f031309271 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 7 Apr 2025 14:21:23 -0700 Subject: [PATCH 1/6] cd /home/fco/redislabs/vector-db-benchmark && git status cd /home/fco/redislabs/vector-db-benchmark && git add engine/base_client/search.py engine/base_client/client.py chunk up the iterable before starting the processes --- engine/base_client/search.py | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 32964192..6bb1ec68 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -2,6 +2,7 @@ import time from multiprocessing import get_context from typing import Iterable, List, Optional, Tuple +from itertools import islice import numpy as np import tqdm @@ -112,22 +113,31 @@ def search_all( else: ctx = get_context(self.get_mp_start_method()) - with ctx.Pool( - processes=parallel, - initializer=self.__class__.init_client, - initargs=( + def process_initializer(): + """Initialize each process before starting the search.""" + self.__class__.init_client( self.host, distance, self.connection_params, self.search_params, - ), + ) + self.setup_search() + + # Dynamically chunk the generator + query_chunks = list(chunked_iterable(used_queries, max(1, len(used_queries) // parallel))) + + with ctx.Pool( + processes=parallel, + initializer=process_initializer, ) as pool: if parallel > 10: time.sleep(15) # Wait for all processes to start start = time.perf_counter() - precisions, latencies = list( - zip(*pool.imap_unordered(search_one, iterable=tqdm.tqdm(used_queries))) + results = pool.starmap( + process_chunk, + [(chunk, search_one) for chunk in query_chunks], ) + precisions, latencies = zip(*[result for chunk in results for result in chunk]) total_time = time.perf_counter() - start @@ -157,3 +167,15 @@ def post_search(self): @classmethod def delete_client(cls): pass + + +def chunked_iterable(iterable, size): + """Yield successive chunks of a given size from an iterable.""" + it = iter(iterable) + while chunk := list(islice(it, size)): + yield chunk + + +def process_chunk(chunk, search_one): + """Process a chunk of queries using the search_one function.""" + return [search_one(query) for query in chunk] From 534de8cc98948d5575d1942f905831f4d9d9abb3 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 6 May 2025 23:17:36 +0100 Subject: [PATCH 2/6] Restore performance optimizations from PR #16 (85a6bc7) --- engine/base_client/client.py | 9 +++-- engine/base_client/search.py | 65 +++++++++++++++++++++++------------- 2 files changed, 48 insertions(+), 26 deletions(-) diff --git a/engine/base_client/client.py b/engine/base_client/client.py index c07f3d64..879a6e7d 100644 --- a/engine/base_client/client.py +++ b/engine/base_client/client.py @@ -40,8 +40,9 @@ def save_search_results( ): now = datetime.now() timestamp = now.strftime("%Y-%m-%d-%H-%M-%S") + pid = os.getpid() # Get the current process ID experiments_file = ( - f"{self.name}-{dataset_name}-search-{search_id}-{timestamp}.json" + f"{self.name}-{dataset_name}-search-{search_id}-{pid}-{timestamp}.json" ) result_path = RESULTS_DIR / experiments_file with open(result_path, "w") as out: @@ -99,7 +100,8 @@ def run_experiment( reader = dataset.get_reader(execution_params.get("normalize", False)) if skip_if_exists: - glob_pattern = f"{self.name}-{dataset.config.name}-search-*-*.json" + pid = os.getpid() # Get the current process ID + glob_pattern = f"{self.name}-{dataset.config.name}-search-*-{pid}-*.json" existing_results = list(RESULTS_DIR.glob(glob_pattern)) if len(existing_results) == len(self.searchers): print( @@ -137,8 +139,9 @@ def run_experiment( print("Experiment stage: Search") for search_id, searcher in enumerate(self.searchers): if skip_if_exists: + pid = os.getpid() # Get the current process ID glob_pattern = ( - f"{self.name}-{dataset.config.name}-search-{search_id}-*.json" + f"{self.name}-{dataset.config.name}-search-{search_id}-{pid}-*.json" ) existing_results = list(RESULTS_DIR.glob(glob_pattern)) print("Pattern", glob_pattern, "Results:", existing_results) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 6bb1ec68..44bb195f 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -1,6 +1,6 @@ import functools import time -from multiprocessing import get_context +from multiprocessing import Process, Queue from typing import Iterable, List, Optional, Tuple from itertools import islice @@ -106,15 +106,17 @@ def search_all( used_queries = queries_list if parallel == 1: + # Single-threaded execution start = time.perf_counter() - precisions, latencies = list( - zip(*[search_one(query) for query in tqdm.tqdm(used_queries)]) - ) + results = [search_one(query) for query in tqdm.tqdm(used_queries)] + total_time = time.perf_counter() - start else: - ctx = get_context(self.get_mp_start_method()) + # Dynamically calculate chunk size + chunk_size = max(1, len(used_queries) // parallel) + query_chunks = list(chunked_iterable(used_queries, chunk_size)) - def process_initializer(): - """Initialize each process before starting the search.""" + # Function to be executed by each worker process + def worker_function(chunk, result_queue): self.__class__.init_client( self.host, distance, @@ -122,24 +124,36 @@ def process_initializer(): self.search_params, ) self.setup_search() + results = process_chunk(chunk, search_one) + result_queue.put(results) - # Dynamically chunk the generator - query_chunks = list(chunked_iterable(used_queries, max(1, len(used_queries) // parallel))) - - with ctx.Pool( - processes=parallel, - initializer=process_initializer, - ) as pool: - if parallel > 10: - time.sleep(15) # Wait for all processes to start - start = time.perf_counter() - results = pool.starmap( - process_chunk, - [(chunk, search_one) for chunk in query_chunks], - ) - precisions, latencies = zip(*[result for chunk in results for result in chunk]) + # Create a queue to collect results + result_queue = Queue() + + # Create and start worker processes + processes = [] + for chunk in query_chunks: + process = Process(target=worker_function, args=(chunk, result_queue)) + processes.append(process) + process.start() + + # Start measuring time for the critical work + start = time.perf_counter() - total_time = time.perf_counter() - start + # Collect results from all worker processes + results = [] + for _ in processes: + results.extend(result_queue.get()) + + # Wait for all worker processes to finish + for process in processes: + process.join() + + # Stop measuring time for the critical work + total_time = time.perf_counter() - start + + # Extract precisions and latencies (outside the timed section) + precisions, latencies = zip(*results) self.__class__.delete_client() @@ -179,3 +193,8 @@ def chunked_iterable(iterable, size): def process_chunk(chunk, search_one): """Process a chunk of queries using the search_one function.""" return [search_one(query) for query in chunk] + + +def process_chunk_wrapper(chunk, search_one): + """Wrapper to process a chunk of queries.""" + return process_chunk(chunk, search_one) From 71c1f3b2b7c0a1080413b1cdaae2f0cd0a13191f Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 6 May 2025 23:25:21 +0100 Subject: [PATCH 3/6] Add real-time progress bar with throughput display for parallel execution --- engine/base_client/search.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 44bb195f..b1e62641 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -140,10 +140,20 @@ def worker_function(chunk, result_queue): # Start measuring time for the critical work start = time.perf_counter() + # Create a progress bar for the total number of queries + total_queries = len(used_queries) + pbar = tqdm.tqdm(total=total_queries, desc="Processing queries", unit="queries") + # Collect results from all worker processes results = [] for _ in processes: - results.extend(result_queue.get()) + chunk_results = result_queue.get() + results.extend(chunk_results) + # Update the progress bar with the number of processed queries in this chunk + pbar.update(len(chunk_results)) + + # Close the progress bar + pbar.close() # Wait for all worker processes to finish for process in processes: @@ -192,6 +202,7 @@ def chunked_iterable(iterable, size): def process_chunk(chunk, search_one): """Process a chunk of queries using the search_one function.""" + # No progress bar in worker processes to avoid cluttering the output return [search_one(query) for query in chunk] From 2eda5b752dbd1bb74dc23b85763f33027a65563d Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 6 May 2025 23:28:14 +0100 Subject: [PATCH 4/6] Add test script for multiprocessing with progress bar --- test_multiprocessing.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 test_multiprocessing.py diff --git a/test_multiprocessing.py b/test_multiprocessing.py new file mode 100644 index 00000000..40384946 --- /dev/null +++ b/test_multiprocessing.py @@ -0,0 +1,36 @@ +from engine.base_client.search import BaseSearcher +from dataset_reader.base_reader import Query +import time + +class TestSearcher(BaseSearcher): + @classmethod + def init_client(cls, host, distance, connection_params, search_params): + pass + + @classmethod + def search_one(cls, vector, meta_conditions, top): + return [] + + @classmethod + def _search_one(cls, query, top=None): + # Add a small delay to simulate real work + time.sleep(0.001) + return 1.0, 0.1 + + def setup_search(self): + pass + +# Create test queries +queries = [Query(vector=[0.1]*10, meta_conditions=None, expected_result=None) for _ in range(1000)] + +# Create a searcher with parallel=10 +searcher = TestSearcher('localhost', {}, {'parallel': 10}) + +# Run the search_all method +start = time.perf_counter() +results = searcher.search_all('cosine', queries) +total_time = time.perf_counter() - start + +print(f'Number of queries: {len(results["latencies"])}') +print(f'Total time: {total_time:.6f} seconds') +print(f'Throughput: {results["rps"]:.2f} queries/sec') From 6127e847306e3506469dd2f7bf196a9870128ae5 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 6 May 2025 23:33:10 +0100 Subject: [PATCH 5/6] Optimize query cycling for large query counts using generators --- engine/base_client/search.py | 70 ++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index b1e62641..6a512722 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -84,36 +84,69 @@ def search_all( # Handle num_queries parameter if num_queries > 0: - # If we need more queries than available, cycle through the list + # If we need more queries than available, use a cycling generator if num_queries > len(queries_list) and len(queries_list) > 0: print(f"Requested {num_queries} queries but only {len(queries_list)} are available.") - print(f"Extending queries by cycling through the available ones.") - # Calculate how many complete cycles and remaining items we need - complete_cycles = num_queries // len(queries_list) - remaining = num_queries % len(queries_list) - - # Create the extended list - extended_queries = [] - for _ in range(complete_cycles): - extended_queries.extend(queries_list) - extended_queries.extend(queries_list[:remaining]) - - used_queries = extended_queries + print(f"Using a cycling generator to efficiently process queries.") + + # Create a cycling generator function + def cycling_query_generator(queries, total_count): + """Generate queries by cycling through the available ones.""" + count = 0 + while count < total_count: + for query in queries: + if count < total_count: + yield query + count += 1 + else: + break + + # Use the generator instead of creating a full list + used_queries = cycling_query_generator(queries_list, num_queries) + # We need to know the total count for the progress bar + total_query_count = num_queries else: used_queries = queries_list[:num_queries] + total_query_count = len(used_queries) print(f"Using {num_queries} queries") else: used_queries = queries_list + total_query_count = len(used_queries) if parallel == 1: # Single-threaded execution start = time.perf_counter() - results = [search_one(query) for query in tqdm.tqdm(used_queries)] + + # Create a progress bar with the correct total + pbar = tqdm.tqdm(total=total_query_count, desc="Processing queries", unit="queries") + + # Process queries with progress updates + results = [] + for query in used_queries: + results.append(search_one(query)) + pbar.update(1) + + # Close the progress bar + pbar.close() + total_time = time.perf_counter() - start else: - # Dynamically calculate chunk size - chunk_size = max(1, len(used_queries) // parallel) - query_chunks = list(chunked_iterable(used_queries, chunk_size)) + # Dynamically calculate chunk size based on total_query_count + chunk_size = max(1, total_query_count // parallel) + + # If used_queries is a generator, we need to handle it differently + if hasattr(used_queries, '__next__'): + # For generators, we'll create chunks on-the-fly + query_chunks = [] + remaining = total_query_count + while remaining > 0: + current_chunk_size = min(chunk_size, remaining) + chunk = [next(used_queries) for _ in range(current_chunk_size)] + query_chunks.append(chunk) + remaining -= current_chunk_size + else: + # For lists, we can use the chunked_iterable function + query_chunks = list(chunked_iterable(used_queries, chunk_size)) # Function to be executed by each worker process def worker_function(chunk, result_queue): @@ -141,8 +174,7 @@ def worker_function(chunk, result_queue): start = time.perf_counter() # Create a progress bar for the total number of queries - total_queries = len(used_queries) - pbar = tqdm.tqdm(total=total_queries, desc="Processing queries", unit="queries") + pbar = tqdm.tqdm(total=total_query_count, desc="Processing queries", unit="queries") # Collect results from all worker processes results = [] From 343e906f9d7a78afc6321b745902dec515fc37ef Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 6 May 2025 23:33:53 +0100 Subject: [PATCH 6/6] Update test script to test query cycling optimization --- test_multiprocessing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test_multiprocessing.py b/test_multiprocessing.py index 40384946..341289ec 100644 --- a/test_multiprocessing.py +++ b/test_multiprocessing.py @@ -20,15 +20,15 @@ def _search_one(cls, query, top=None): def setup_search(self): pass -# Create test queries -queries = [Query(vector=[0.1]*10, meta_conditions=None, expected_result=None) for _ in range(1000)] +# Create a small set of test queries +queries = [Query(vector=[0.1]*10, meta_conditions=None, expected_result=None) for _ in range(10)] # Create a searcher with parallel=10 searcher = TestSearcher('localhost', {}, {'parallel': 10}) -# Run the search_all method +# Run the search_all method with a large num_queries parameter start = time.perf_counter() -results = searcher.search_all('cosine', queries) +results = searcher.search_all('cosine', queries, num_queries=1000) total_time = time.perf_counter() - start print(f'Number of queries: {len(results["latencies"])}')