2
2
import time
3
3
from multiprocessing import get_context
4
4
from typing import Iterable , List , Optional , Tuple
5
- from itertools import islice
6
5
7
6
import numpy as np
8
7
import tqdm
@@ -84,34 +83,26 @@ def search_all(
84
83
85
84
# Handle num_queries parameter
86
85
if num_queries > 0 :
87
- # If we need more queries than available, use a cycling generator
86
+ # If we need more queries than available, cycle through the list
88
87
if num_queries > len (queries_list ) and len (queries_list ) > 0 :
89
88
print (f"Requested { num_queries } queries but only { len (queries_list )} are available." )
90
- print (f"Using a cycling generator to efficiently process queries." )
91
-
92
- # Create a cycling generator function
93
- def cycling_query_generator (queries , total_count ):
94
- """Generate queries by cycling through the available ones."""
95
- count = 0
96
- while count < total_count :
97
- for query in queries :
98
- if count < total_count :
99
- yield query
100
- count += 1
101
- else :
102
- break
103
-
104
- # Use the generator instead of creating a full list
105
- used_queries = cycling_query_generator (queries_list , num_queries )
106
- # We need to know the total count for the progress bar
107
- total_query_count = num_queries
89
+ print (f"Extending queries by cycling through the available ones." )
90
+ # Calculate how many complete cycles and remaining items we need
91
+ complete_cycles = num_queries // len (queries_list )
92
+ remaining = num_queries % len (queries_list )
93
+
94
+ # Create the extended list
95
+ extended_queries = []
96
+ for _ in range (complete_cycles ):
97
+ extended_queries .extend (queries_list )
98
+ extended_queries .extend (queries_list [:remaining ])
99
+
100
+ used_queries = extended_queries
108
101
else :
109
102
used_queries = queries_list [:num_queries ]
110
- total_query_count = len (used_queries )
111
103
print (f"Using { num_queries } queries" )
112
104
else :
113
105
used_queries = queries_list
114
- total_query_count = len (used_queries )
115
106
116
107
if parallel == 1 :
117
108
start = time .perf_counter ()
@@ -121,32 +112,22 @@ def cycling_query_generator(queries, total_count):
121
112
else :
122
113
ctx = get_context (self .get_mp_start_method ())
123
114
124
- def process_initializer ():
125
- """Initialize each process before starting the search."""
126
- self .__class__ .init_client (
115
+ with ctx .Pool (
116
+ processes = parallel ,
117
+ initializer = self .__class__ .init_client ,
118
+ initargs = (
127
119
self .host ,
128
120
distance ,
129
121
self .connection_params ,
130
122
self .search_params ,
131
- )
132
- self .setup_search ()
133
-
134
- # Dynamically chunk the generator
135
- query_chunks = list (chunked_iterable (used_queries , max (1 , len (used_queries ) // parallel )))
136
-
137
- with ctx .Pool (
138
- processes = parallel ,
139
- initializer = process_initializer ,
123
+ ),
140
124
) as pool :
141
125
if parallel > 10 :
142
126
time .sleep (15 ) # Wait for all processes to start
143
127
start = time .perf_counter ()
144
- results = pool .starmap (
145
- process_chunk ,
146
- [(chunk , search_one ) for chunk in query_chunks ],
128
+ precisions , latencies = list (
129
+ zip (* pool .imap_unordered (search_one , iterable = tqdm .tqdm (used_queries )))
147
130
)
148
- precisions , latencies = zip (* [result for chunk in results for result in chunk ])
149
-
150
131
total_time = time .perf_counter () - start
151
132
152
133
self .__class__ .delete_client ()
@@ -175,16 +156,3 @@ def post_search(self):
175
156
@classmethod
176
157
def delete_client (cls ):
177
158
pass
178
-
179
-
180
- def chunked_iterable (iterable , size ):
181
- """Yield successive chunks of a given size from an iterable."""
182
- it = iter (iterable )
183
- while chunk := list (islice (it , size )):
184
- yield chunk
185
-
186
-
187
- def process_chunk (chunk , search_one ):
188
- """Process a chunk of queries using the search_one function."""
189
- # No progress bar in worker processes to avoid cluttering the output
190
- return [search_one (query ) for query in chunk ]
0 commit comments