Skip to content

Commit 01dfafe

Browse files
committed
added periodic printing of interval results to file
1 parent 7751ccd commit 01dfafe

File tree

1 file changed

+190
-80
lines changed

1 file changed

+190
-80
lines changed

engine/base_client/search.py

Lines changed: 190 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ def search_all(
109109
)
110110
self.setup_search()
111111

112+
# Reset the doc_id counter to prevent any initialization during client setup
113+
self.__class__._doc_id_counter = None
114+
112115
search_one = functools.partial(self.__class__._search_one, top=top)
113116
insert_one = functools.partial(self.__class__._insert_one)
114117

@@ -155,88 +158,184 @@ def cycling_query_generator(queries, total_count):
155158
used_queries = queries_list
156159
total_query_count = len(used_queries)
157160

158-
if parallel == 1:
159-
# Single-threaded execution
160-
start = time.perf_counter()
161-
162-
# Process queries
163-
results = []
164-
total_insert_count = 0
165-
total_search_count = 0
166-
all_insert_latencies = []
167-
all_search_latencies = []
168-
169-
for query in used_queries:
170-
if random.random() < insert_fraction:
171-
precision, latency = insert_one(query)
172-
total_insert_count += 1
173-
all_insert_latencies.append(latency)
174-
results.append(('insert', precision, latency))
175-
else:
176-
precision, latency = search_one(query)
177-
total_search_count += 1
178-
all_search_latencies.append(latency)
179-
results.append(('search', precision, latency))
180-
181-
total_time = time.perf_counter() - start
161+
# Interval reporting setup
162+
interval_size = 10000 # Report every 10K operations
163+
need_interval_reporting = total_query_count >= interval_size
164+
interval_counter = 0
165+
overall_start_time = time.perf_counter()
166+
167+
# Calculate total number of intervals for progress tracking
168+
total_intervals = (total_query_count + interval_size - 1) // interval_size # Ceiling division
169+
170+
# Initialize progress bar for intervals if needed (only if output is to terminal)
171+
if need_interval_reporting and os.isatty(1): # Check if stdout is a terminal
172+
interval_pbar = tqdm.tqdm(total=total_intervals, desc="Intervals", unit="interval")
182173
else:
183-
# Dynamically calculate chunk size based on total_query_count
184-
chunk_size = max(1, total_query_count // parallel)
185-
186-
# If used_queries is a generator, we need to handle it differently
187-
if hasattr(used_queries, '__next__'):
188-
# For generators, we'll create chunks on-the-fly
189-
query_chunks = []
190-
remaining = total_query_count
191-
while remaining > 0:
192-
current_chunk_size = min(chunk_size, remaining)
193-
chunk = [next(used_queries) for _ in range(current_chunk_size)]
194-
query_chunks.append(chunk)
195-
remaining -= current_chunk_size
174+
interval_pbar = None
175+
176+
# Initialize global doc_id offset to ensure uniqueness across intervals
177+
global_doc_id_offset = 0
178+
179+
# Overall accumulators
180+
overall_results = []
181+
overall_insert_count = 0
182+
overall_search_count = 0
183+
overall_insert_latencies = []
184+
overall_search_latencies = []
185+
186+
# Interval statistics for output file
187+
interval_stats = []
188+
189+
# Convert generator to iterator for interval processing
190+
query_iterator = iter(used_queries)
191+
192+
# Process queries in intervals of 10K
193+
while True:
194+
# Get next interval chunk (up to 10K queries)
195+
interval_queries = list(islice(query_iterator, interval_size))
196+
if not interval_queries:
197+
break # No more queries
198+
199+
interval_counter += 1
200+
current_interval_size = len(interval_queries)
201+
202+
if parallel == 1:
203+
# Single-threaded execution for this interval
204+
interval_start = time.perf_counter()
205+
206+
# Force reset and set doc_id counter offset for single-threaded execution
207+
# This ensures we override any previous initialization
208+
self.__class__._doc_id_counter = itertools.count(global_doc_id_offset)
209+
210+
# Process queries for this interval
211+
interval_results = []
212+
interval_insert_count = 0
213+
interval_search_count = 0
214+
interval_insert_latencies = []
215+
interval_search_latencies = []
216+
217+
for query in interval_queries:
218+
if random.random() < insert_fraction:
219+
precision, latency = insert_one(query)
220+
interval_insert_count += 1
221+
interval_insert_latencies.append(latency)
222+
interval_results.append(('insert', precision, latency))
223+
else:
224+
precision, latency = search_one(query)
225+
interval_search_count += 1
226+
interval_search_latencies.append(latency)
227+
interval_results.append(('search', precision, latency))
228+
229+
interval_time = time.perf_counter() - interval_start
196230
else:
197-
# For lists, we can use the chunked_iterable function
198-
query_chunks = list(chunked_iterable(used_queries, chunk_size))
199-
200-
# Create a queue to collect results
201-
result_queue = Queue()
202-
203-
# Create worker processes
204-
processes = []
205-
for chunk in query_chunks:
206-
process = Process(target=worker_function, args=(self, distance, search_one, insert_one,
207-
chunk, result_queue, insert_fraction))
208-
processes.append(process)
209-
210-
# Start worker processes
211-
for process in processes:
212-
process.start()
213-
214-
# Collect results from all worker processes
215-
results = []
216-
total_insert_count = 0
217-
total_search_count = 0
218-
all_insert_latencies = []
219-
all_search_latencies = []
220-
min_start_time = time.perf_counter()
221-
222-
for _ in processes:
223-
proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get()
224-
results.extend(chunk_results)
225-
total_insert_count += insert_count
226-
total_search_count += search_count
227-
all_insert_latencies.extend(insert_latencies)
228-
all_search_latencies.extend(search_latencies)
231+
# Parallel execution for this interval
232+
# Dynamically calculate chunk size based on current interval size
233+
chunk_size = max(1, current_interval_size // parallel)
234+
235+
# For interval queries (always a list), use chunked_iterable
236+
query_chunks = list(chunked_iterable(interval_queries, chunk_size))
237+
238+
# Create a queue to collect results
239+
result_queue = Queue()
240+
241+
# Create worker processes
242+
processes = []
243+
for i, chunk in enumerate(query_chunks):
244+
# Calculate unique doc_id offset for this worker in this interval
245+
worker_doc_id_offset = global_doc_id_offset + (i * 1000000)
246+
process = Process(target=worker_function, args=(self, distance, search_one, insert_one,
247+
chunk, result_queue, insert_fraction, worker_doc_id_offset))
248+
processes.append(process)
249+
250+
# Start worker processes
251+
for process in processes:
252+
process.start()
253+
254+
# Collect results from all worker processes
255+
interval_results = []
256+
interval_insert_count = 0
257+
interval_search_count = 0
258+
interval_insert_latencies = []
259+
interval_search_latencies = []
260+
min_start_time = time.perf_counter()
261+
262+
for _ in processes:
263+
proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get()
264+
interval_results.extend(chunk_results)
265+
interval_insert_count += insert_count
266+
interval_search_count += search_count
267+
interval_insert_latencies.extend(insert_latencies)
268+
interval_search_latencies.extend(search_latencies)
269+
270+
# Update min_start_time if necessary
271+
if proc_start_time < min_start_time:
272+
min_start_time = proc_start_time
273+
274+
# Stop measuring time for the critical work
275+
interval_time = time.perf_counter() - min_start_time
276+
277+
# Wait for all worker processes to finish
278+
for process in processes:
279+
process.join()
280+
281+
# Accumulate overall results
282+
overall_results.extend(interval_results)
283+
overall_insert_count += interval_insert_count
284+
overall_search_count += interval_search_count
285+
overall_insert_latencies.extend(interval_insert_latencies)
286+
overall_search_latencies.extend(interval_search_latencies)
287+
288+
# Update global doc_id offset for next interval
289+
if parallel == 1:
290+
# For single-threaded, reserve space based on actual inserts in this interval
291+
global_doc_id_offset += max(1000000, interval_insert_count * 2) # Some buffer
292+
else:
293+
# Reserve space for all parallel workers in this interval
294+
global_doc_id_offset += parallel * 1000000
295+
296+
# Report interval metrics if needed
297+
if need_interval_reporting:
298+
interval_search_precisions = [result[1] for result in interval_results if result[0] == 'search']
229299

230-
# Update min_start_time if necessary
231-
if proc_start_time < min_start_time:
232-
min_start_time = proc_start_time
233-
234-
# Stop measuring time for the critical work
235-
total_time = time.perf_counter() - min_start_time
236-
237-
# Wait for all worker processes to finish
238-
for process in processes:
239-
process.join()
300+
# Create interval statistics for output file
301+
interval_stat = {
302+
"interval": interval_counter,
303+
"operations": current_interval_size,
304+
"time_seconds": float(interval_time), # Ensure it's a float
305+
"rps": float(current_interval_size / interval_time), # Ensure it's a float
306+
"searches": interval_search_count,
307+
"inserts": interval_insert_count,
308+
"search_precision": float(np.mean(interval_search_precisions)) if interval_search_precisions else None
309+
}
310+
interval_stats.append(interval_stat)
311+
312+
# Debug: Print number of intervals collected so far
313+
print(f"DEBUG: Collected {len(interval_stats)} intervals so far", flush=True)
314+
315+
# Update progress bar with same metrics (this goes to terminal)
316+
if interval_pbar:
317+
interval_pbar.update(1)
318+
interval_pbar.set_postfix({
319+
'RPS': f"{current_interval_size / interval_time:.1f}",
320+
'Searches': interval_search_count,
321+
'Inserts': interval_insert_count,
322+
'Precision': f"{np.mean(interval_search_precisions):.4f}" if interval_search_precisions else "N/A"
323+
})
324+
325+
# Close progress bar when done
326+
if interval_pbar:
327+
interval_pbar.close()
328+
print() # Add a blank line after progress bar
329+
330+
# Calculate total time for overall metrics
331+
total_time = time.perf_counter() - overall_start_time
332+
333+
# Use overall accumulated results
334+
results = overall_results
335+
total_insert_count = overall_insert_count
336+
total_search_count = overall_search_count
337+
all_insert_latencies = overall_insert_latencies
338+
all_search_latencies = overall_search_latencies
240339

241340
# Extract overall precisions and latencies
242341
all_precisions = [result[1] for result in results]
@@ -247,6 +346,11 @@ def cycling_query_generator(queries, total_count):
247346

248347
self.__class__.delete_client()
249348

349+
350+
if len(interval_stats) > 0:
351+
print(f"DEBUG: First interval: {interval_stats[0]}", flush=True)
352+
print(f"DEBUG: Last interval: {interval_stats[-1]}", flush=True)
353+
250354
return {
251355
# Overall metrics
252356
"total_time": total_time,
@@ -274,6 +378,9 @@ def cycling_query_generator(queries, total_count):
274378
"actual_insert_fraction": total_insert_count / len(all_latencies) if len(all_latencies) > 0 else 0,
275379
"target_insert_fraction": insert_fraction,
276380

381+
# Interval statistics (only included if intervals were used)
382+
"interval_stats": interval_stats if interval_stats else None,
383+
277384
# Legacy compatibility (for existing code that expects these)
278385
"mean_time": np.mean(all_latencies),
279386
"mean_precisions": np.mean(search_precisions) if search_precisions else 1.0, # Only search precisions
@@ -326,7 +433,7 @@ def process_chunk(chunk, search_one, insert_one, insert_fraction):
326433
return results, insert_count, search_count, insert_latencies, search_latencies
327434

328435
# Function to be executed by each worker process
329-
def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0):
436+
def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0, doc_id_offset=0):
330437
self.init_client(
331438
self.host,
332439
distance,
@@ -335,6 +442,9 @@ def worker_function(self, distance, search_one, insert_one, chunk, result_queue,
335442
)
336443
self.setup_search()
337444

445+
# Force set the doc_id counter offset for this worker (overrides any previous state)
446+
self.__class__._doc_id_counter = itertools.count(doc_id_offset)
447+
338448
start_time = time.perf_counter()
339449
results, insert_count, search_count, insert_latencies, search_latencies = process_chunk(
340450
chunk, search_one, insert_one, insert_fraction

0 commit comments

Comments
 (0)