Skip to content

Commit aa39103

Browse files
committed
ingester: Port batch ingester to django
Also added some verbose logs to monitor ingestion rate, queue, etc. Signed-off-by: Denys Fedoryshchenko <[email protected]>
1 parent e9f3cb4 commit aa39103

File tree

3 files changed

+385
-23
lines changed

3 files changed

+385
-23
lines changed

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 271 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
from typing import Any, Literal, Optional
1414
import yaml
1515
import kcidb_io
16+
from django.db import transaction
17+
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents
1618

1719
from kernelCI_app.management.commands.helpers.process_submissions import (
1820
insert_submission_data,
21+
build_instances_from_submission,
1922
)
2023

2124
VERBOSE = 0
@@ -35,11 +38,40 @@
3538

3639
logger = logging.getLogger("ingester")
3740

38-
# Thread-safe queue for database operations
39-
db_queue = Queue()
41+
# Batching and backpressure controls
42+
try:
43+
INGEST_BATCH_SIZE = int(os.environ.get("INGEST_BATCH_SIZE", "1000"))
44+
except Exception:
45+
INGEST_BATCH_SIZE = 10000
46+
47+
try:
48+
INGEST_FLUSH_TIMEOUT_SEC = float(os.environ.get("INGEST_FLUSH_TIMEOUT_SEC", "2.0"))
49+
except Exception:
50+
INGEST_FLUSH_TIMEOUT_SEC = 5.0
51+
52+
try:
53+
INGEST_QUEUE_MAXSIZE = int(os.environ.get("INGEST_QUEUE_MAXSIZE", "5000"))
54+
except Exception:
55+
INGEST_QUEUE_MAXSIZE = 5000
56+
57+
# Thread-safe queue for database operations (bounded for backpressure)
58+
db_queue = Queue(maxsize=INGEST_QUEUE_MAXSIZE)
4059
db_lock = threading.Lock()
4160

4261

62+
def _ts() -> str:
63+
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
64+
65+
66+
def _out(msg: str) -> None:
67+
# Write debug/perf output to stdout
68+
# logger was unreliable in some environments
69+
try:
70+
print(msg, flush=True)
71+
except Exception:
72+
pass
73+
74+
4375
def move_file_to_failed_dir(filename, failed_dir):
4476
try:
4577
os.rename(filename, os.path.join(failed_dir, os.path.basename(filename)))
@@ -260,6 +292,7 @@ def prepare_file_data(filename, trees_name, spool_dir):
260292

261293

262294
def process_submission_item(data: dict[str, Any], metadata: dict[str, Any]):
295+
# Legacy path retained for potential direct (non-batched) insertions
263296
with db_lock:
264297
insert_submission_data(data, metadata)
265298

@@ -268,7 +301,9 @@ def process_submission_item(data: dict[str, Any], metadata: dict[str, Any]):
268301
logger.info("Ingested %s in %.2f KB/s", metadata["filename"], ing_speed)
269302

270303

271-
def db_worker(stop_event: threading.Event):
304+
def db_worker(
305+
stop_event: threading.Event,
306+
): # noqa: C901 - complex but performance-critical
272307
"""
273308
Worker thread that processes the database queue.
274309
This is the only thread that interacts with the database.
@@ -277,33 +312,167 @@ def db_worker(stop_event: threading.Event):
277312
stop_event: threading.Event (flag) to signal the worker to stop processing
278313
"""
279314

315+
# Local buffers for batching
316+
issues_buf = []
317+
checkouts_buf = []
318+
builds_buf = []
319+
tests_buf = []
320+
incidents_buf = []
321+
322+
last_flush_ts = time.time()
323+
324+
def buffered_total():
325+
return (
326+
len(issues_buf)
327+
+ len(checkouts_buf)
328+
+ len(builds_buf)
329+
+ len(tests_buf)
330+
+ len(incidents_buf)
331+
)
332+
333+
def flush_buffers():
334+
nonlocal last_flush_ts
335+
total = buffered_total()
336+
if total == 0:
337+
return
338+
batch_size = INGEST_BATCH_SIZE
339+
340+
# Insert in dependency-safe order
341+
flush_start = time.time()
342+
try:
343+
# Single transaction for all tables in the flush
344+
with transaction.atomic():
345+
if issues_buf:
346+
t0 = time.time()
347+
Issues.objects.bulk_create(
348+
issues_buf, batch_size=batch_size, ignore_conflicts=True
349+
)
350+
_out(
351+
"[%s] bulk_create issues: n=%d in %.3fs"
352+
% (_ts(), len(issues_buf), time.time() - t0)
353+
)
354+
if checkouts_buf:
355+
t0 = time.time()
356+
Checkouts.objects.bulk_create(
357+
checkouts_buf, batch_size=batch_size, ignore_conflicts=True
358+
)
359+
_out(
360+
"[%s] bulk_create checkouts: n=%d in %.3fs"
361+
% (_ts(), len(checkouts_buf), time.time() - t0)
362+
)
363+
if builds_buf:
364+
t0 = time.time()
365+
Builds.objects.bulk_create(
366+
builds_buf, batch_size=batch_size, ignore_conflicts=True
367+
)
368+
_out(
369+
"[%s] bulk_create builds: n=%d in %.3fs"
370+
% (_ts(), len(builds_buf), time.time() - t0)
371+
)
372+
if tests_buf:
373+
t0 = time.time()
374+
Tests.objects.bulk_create(
375+
tests_buf, batch_size=batch_size, ignore_conflicts=True
376+
)
377+
_out(
378+
"[%s] bulk_create tests: n=%d in %.3fs"
379+
% (_ts(), len(tests_buf), time.time() - t0)
380+
)
381+
if incidents_buf:
382+
t0 = time.time()
383+
Incidents.objects.bulk_create(
384+
incidents_buf, batch_size=batch_size, ignore_conflicts=True
385+
)
386+
_out(
387+
"[%s] bulk_create incidents: n=%d in %.3fs"
388+
% (_ts(), len(incidents_buf), time.time() - t0)
389+
)
390+
except Exception as e:
391+
logger.error("Error during bulk_create flush: %s", e)
392+
finally:
393+
flush_dur = time.time() - flush_start
394+
rate = total / flush_dur if flush_dur > 0 else 0.0
395+
msg = (
396+
"[%s] Flushed batch in %.3fs (%.1f items/s): "
397+
"issues=%d checkouts=%d builds=%d tests=%d incidents=%d"
398+
% (
399+
_ts(),
400+
flush_dur,
401+
rate,
402+
len(issues_buf),
403+
len(checkouts_buf),
404+
len(builds_buf),
405+
len(tests_buf),
406+
len(incidents_buf),
407+
)
408+
)
409+
_out(msg)
410+
issues_buf.clear()
411+
checkouts_buf.clear()
412+
builds_buf.clear()
413+
tests_buf.clear()
414+
incidents_buf.clear()
415+
last_flush_ts = time.time()
416+
280417
while not stop_event.is_set() or not db_queue.empty():
281418
try:
282419
item = db_queue.get(timeout=0.1)
283420
if item is None:
284-
db_queue.task_done() # Important: mark the poison pill as done
421+
db_queue.task_done()
285422
break
286423
try:
287424
data, metadata = item
288425
if data is not None:
289-
process_submission_item(data, metadata)
426+
inst = build_instances_from_submission(data)
427+
issues_buf.extend(inst["issues"])
428+
checkouts_buf.extend(inst["checkouts"])
429+
builds_buf.extend(inst["builds"])
430+
tests_buf.extend(inst["tests"])
431+
incidents_buf.extend(inst["incidents"])
432+
433+
# Flush if batch size reached
434+
if buffered_total() >= INGEST_BATCH_SIZE:
435+
flush_buffers()
436+
290437
if VERBOSE:
291-
logger.info(
292-
"Processed file %s with size %s bytes",
293-
metadata["filename"],
294-
metadata["fsize"],
438+
msg = (
439+
"Queued from %s: "
440+
"issues=%d checkouts=%d builds=%d tests=%d incidents=%d"
441+
% (
442+
metadata.get("filename", "unknown"),
443+
len(inst["issues"]),
444+
len(inst["checkouts"]),
445+
len(inst["builds"]),
446+
len(inst["tests"]),
447+
len(inst["incidents"]),
448+
)
295449
)
296-
450+
_out(msg)
297451
except Exception as e:
298452
logger.error("Error processing item in db_worker: %s", e)
299453
finally:
300-
db_queue.task_done() # Always mark task as done, even if processing failed
454+
db_queue.task_done()
301455

302456
except Empty:
303-
continue # Timeout occurred, continue to check stop_event
457+
# Time-based flush when idle
458+
if (time.time() - last_flush_ts) >= INGEST_FLUSH_TIMEOUT_SEC:
459+
if VERBOSE:
460+
_out(
461+
"[%s] Idle flush after %.1fs without new items (buffered=%d)"
462+
% (
463+
_ts(),
464+
INGEST_FLUSH_TIMEOUT_SEC,
465+
buffered_total(),
466+
)
467+
)
468+
flush_buffers()
469+
continue
304470
except Exception as e:
305471
logger.error("Unexpected error in db_worker: %s", e)
306472

473+
# Final flush after loop ends
474+
flush_buffers()
475+
307476

308477
def process_file(filename, trees_name, spool_dir):
309478
"""
@@ -339,7 +508,7 @@ def process_file(filename, trees_name, spool_dir):
339508
return True
340509

341510

342-
def ingest_submissions_parallel(
511+
def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threading
343512
spool_dir: str, trees_name: dict[str, str], max_workers: int = 5
344513
):
345514
"""
@@ -356,7 +525,24 @@ def ingest_submissions_parallel(
356525
if not json_files:
357526
return
358527

359-
logger.info("Found %d files to process", len(json_files))
528+
total_bytes = 0
529+
for f in json_files:
530+
try:
531+
total_bytes += os.path.getsize(os.path.join(spool_dir, f))
532+
except Exception:
533+
pass
534+
535+
_out(
536+
"[%s] Spool status: %d .json files queued (%.2f MB)"
537+
% (
538+
_ts(),
539+
len(json_files),
540+
total_bytes / (1024 * 1024) if total_bytes else 0.0,
541+
)
542+
)
543+
544+
cycle_start = time.time()
545+
total_files_count = len(json_files)
360546

361547
# Start database worker thread
362548
# This thread will constantly consume the db_queue and send data to the database
@@ -367,6 +553,11 @@ def ingest_submissions_parallel(
367553
stat_ok = 0
368554
stat_fail = 0
369555

556+
processed = 0
557+
last_progress = cycle_start
558+
progress_every_n = 200
559+
progress_every_sec = 2.0
560+
370561
try:
371562
# Process files in parallel
372563
with ThreadPoolExecutor(max_workers=max_workers) as executor:
@@ -376,7 +567,7 @@ def ingest_submissions_parallel(
376567
for filename in json_files
377568
}
378569

379-
# Collect results
570+
# Collect results progressively
380571
for future in as_completed(future_to_file):
381572
filename = future_to_file[future]
382573
try:
@@ -388,25 +579,82 @@ def ingest_submissions_parallel(
388579
except Exception as e:
389580
logger.error("Exception processing %s: %s", filename, e)
390581
stat_fail += 1
391-
582+
finally:
583+
processed += 1
584+
now = time.time()
585+
if (
586+
processed % progress_every_n == 0
587+
or (now - last_progress) >= progress_every_sec
588+
):
589+
elapsed = now - cycle_start
590+
rate = processed / elapsed if elapsed > 0 else 0.0
591+
remaining = total_files_count - processed
592+
eta = remaining / rate if rate > 0 else float("inf")
593+
try:
594+
qsz = db_queue.qsize()
595+
except Exception:
596+
qsz = -1
597+
msg = (
598+
"[%s] Progress: %d/%d files (ok=%d, fail=%d) | "
599+
"%.2fs elapsed | %.1f files/s | ETA %.1fs | db_queue=%d"
600+
% (
601+
_ts(),
602+
processed,
603+
total_files_count,
604+
stat_ok,
605+
stat_fail,
606+
elapsed,
607+
rate,
608+
eta,
609+
qsz,
610+
)
611+
)
612+
_out(msg)
613+
last_progress = now
614+
615+
_out(
616+
"[%s] Waiting for DB queue to drain... size=%d" % (_ts(), db_queue.qsize())
617+
)
392618
# Wait for all database operations to complete
393619
db_queue.join()
394620

621+
except KeyboardInterrupt:
622+
_out("[%s] KeyboardInterrupt: stopping ingestion and flushing..." % _ts())
623+
try:
624+
# Attempt to cancel remaining futures and exit early
625+
# Note: this only cancels tasks not yet started
626+
pass
627+
finally:
628+
raise
395629
finally:
396630
# Signal database worker to stop
397631
stop_event.set()
398632
db_queue.put(None) # Poison pill
399633
db_thread.join()
400634

401-
if stat_ok + stat_fail > 0:
402-
logger.info(
403-
"Processed %d files: %d succeeded, %d failed",
404-
stat_ok + stat_fail,
405-
stat_ok,
406-
stat_fail,
635+
elapsed = time.time() - cycle_start
636+
total_files = stat_ok + stat_fail
637+
if total_files > 0:
638+
files_per_sec = total_files / elapsed if elapsed > 0 else 0.0
639+
mb = total_bytes / (1024 * 1024)
640+
mb_per_sec = mb / elapsed if elapsed > 0 else 0.0
641+
msg = (
642+
"[%s] Ingest cycle: %d files (ok=%d, fail=%d) in %.2fs | "
643+
"%.2f files/s | %.2f MB processed (%.2f MB/s)"
644+
% (
645+
_ts(),
646+
total_files,
647+
stat_ok,
648+
stat_fail,
649+
elapsed,
650+
files_per_sec,
651+
mb,
652+
mb_per_sec,
653+
)
407654
)
655+
_out(msg)
408656
else:
409-
logger.info("No files processed, nothing to do")
657+
_out("[%s] No files processed, nothing to do" % _ts())
410658

411659

412660
def verify_dir(dir):

0 commit comments

Comments
 (0)