Skip to content

Commit c97b59b

Browse files
committed
ingester: Port batch ingester to django
Also added some verbose logs to monitor ingestion rate, queue, etc. Signed-off-by: Denys Fedoryshchenko <[email protected]>
1 parent e9f3cb4 commit c97b59b

File tree

3 files changed

+368
-66
lines changed

3 files changed

+368
-66
lines changed

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 269 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
from typing import Any, Literal, Optional
1414
import yaml
1515
import kcidb_io
16+
from django.db import transaction
17+
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents
1618

1719
from kernelCI_app.management.commands.helpers.process_submissions import (
18-
insert_submission_data,
20+
build_instances_from_submission,
1921
)
2022

2123
VERBOSE = 0
@@ -35,11 +37,43 @@
3537

3638
logger = logging.getLogger("ingester")
3739

38-
# Thread-safe queue for database operations
39-
db_queue = Queue()
40+
# Batching and backpressure controls
41+
try:
42+
INGEST_BATCH_SIZE = int(os.environ.get("INGEST_BATCH_SIZE", "10000"))
43+
except (ValueError, TypeError):
44+
logger.warning("Invalid INGEST_BATCH_SIZE, using default 10000")
45+
INGEST_BATCH_SIZE = 10000
46+
47+
try:
48+
INGEST_FLUSH_TIMEOUT_SEC = float(os.environ.get("INGEST_FLUSH_TIMEOUT_SEC", "2.0"))
49+
except (ValueError, TypeError):
50+
logger.warning("Invalid INGEST_FLUSH_TIMEOUT_SEC, using default 2.0")
51+
INGEST_FLUSH_TIMEOUT_SEC = 5.0
52+
53+
try:
54+
INGEST_QUEUE_MAXSIZE = int(os.environ.get("INGEST_QUEUE_MAXSIZE", "5000"))
55+
except (ValueError, TypeError):
56+
logger.warning("Invalid INGEST_QUEUE_MAXSIZE, using default 5000")
57+
INGEST_QUEUE_MAXSIZE = 5000
58+
59+
# Thread-safe queue for database operations (bounded for backpressure)
60+
db_queue = Queue(maxsize=INGEST_QUEUE_MAXSIZE)
4061
db_lock = threading.Lock()
4162

4263

64+
def _ts() -> str:
65+
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
66+
67+
68+
def _out(msg: str) -> None:
69+
"""Write debug/perf output to stdout"""
70+
# logger was unreliable in some environments
71+
try:
72+
print(msg, flush=True)
73+
except Exception:
74+
pass
75+
76+
4377
def move_file_to_failed_dir(filename, failed_dir):
4478
try:
4579
os.rename(filename, os.path.join(failed_dir, os.path.basename(filename)))
@@ -259,16 +293,7 @@ def prepare_file_data(filename, trees_name, spool_dir):
259293
}
260294

261295

262-
def process_submission_item(data: dict[str, Any], metadata: dict[str, Any]):
263-
with db_lock:
264-
insert_submission_data(data, metadata)
265-
266-
if VERBOSE and "processing_time" in metadata:
267-
ing_speed = metadata["fsize"] / metadata["processing_time"] / 1024
268-
logger.info("Ingested %s in %.2f KB/s", metadata["filename"], ing_speed)
269-
270-
271-
def db_worker(stop_event: threading.Event):
296+
def db_worker(stop_event: threading.Event): # noqa: C901
272297
"""
273298
Worker thread that processes the database queue.
274299
This is the only thread that interacts with the database.
@@ -277,33 +302,165 @@ def db_worker(stop_event: threading.Event):
277302
stop_event: threading.Event (flag) to signal the worker to stop processing
278303
"""
279304

305+
# Local buffers for batching
306+
issues_buf = []
307+
checkouts_buf = []
308+
builds_buf = []
309+
tests_buf = []
310+
incidents_buf = []
311+
312+
last_flush_ts = time.time()
313+
314+
def buffered_total():
315+
return (
316+
len(issues_buf)
317+
+ len(checkouts_buf)
318+
+ len(builds_buf)
319+
+ len(tests_buf)
320+
+ len(incidents_buf)
321+
)
322+
323+
def flush_buffers():
324+
nonlocal last_flush_ts
325+
total = buffered_total()
326+
if total == 0:
327+
return
328+
329+
# Insert in dependency-safe order
330+
flush_start = time.time()
331+
try:
332+
# Single transaction for all tables in the flush
333+
with transaction.atomic():
334+
if issues_buf:
335+
t0 = time.time()
336+
Issues.objects.bulk_create(
337+
issues_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
338+
)
339+
_out(
340+
"[%s] bulk_create issues: n=%d in %.3fs"
341+
% (_ts(), len(issues_buf), time.time() - t0)
342+
)
343+
if checkouts_buf:
344+
t0 = time.time()
345+
Checkouts.objects.bulk_create(
346+
checkouts_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
347+
)
348+
_out(
349+
"[%s] bulk_create checkouts: n=%d in %.3fs"
350+
% (_ts(), len(checkouts_buf), time.time() - t0)
351+
)
352+
if builds_buf:
353+
t0 = time.time()
354+
Builds.objects.bulk_create(
355+
builds_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
356+
)
357+
_out(
358+
"[%s] bulk_create builds: n=%d in %.3fs"
359+
% (_ts(), len(builds_buf), time.time() - t0)
360+
)
361+
if tests_buf:
362+
t0 = time.time()
363+
Tests.objects.bulk_create(
364+
tests_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
365+
)
366+
_out(
367+
"[%s] bulk_create tests: n=%d in %.3fs"
368+
% (_ts(), len(tests_buf), time.time() - t0)
369+
)
370+
if incidents_buf:
371+
t0 = time.time()
372+
Incidents.objects.bulk_create(
373+
incidents_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
374+
)
375+
_out(
376+
"[%s] bulk_create incidents: n=%d in %.3fs"
377+
% (_ts(), len(incidents_buf), time.time() - t0)
378+
)
379+
except Exception as e:
380+
logger.error("Error during bulk_create flush: %s", e)
381+
finally:
382+
flush_dur = time.time() - flush_start
383+
rate = total / flush_dur if flush_dur > 0 else 0.0
384+
msg = (
385+
"[%s] Flushed batch in %.3fs (%.1f items/s): "
386+
"issues=%d checkouts=%d builds=%d tests=%d incidents=%d"
387+
% (
388+
_ts(),
389+
flush_dur,
390+
rate,
391+
len(issues_buf),
392+
len(checkouts_buf),
393+
len(builds_buf),
394+
len(tests_buf),
395+
len(incidents_buf),
396+
)
397+
)
398+
_out(msg)
399+
issues_buf.clear()
400+
checkouts_buf.clear()
401+
builds_buf.clear()
402+
tests_buf.clear()
403+
incidents_buf.clear()
404+
last_flush_ts = time.time()
405+
280406
while not stop_event.is_set() or not db_queue.empty():
281407
try:
282408
item = db_queue.get(timeout=0.1)
283409
if item is None:
284-
db_queue.task_done() # Important: mark the poison pill as done
410+
db_queue.task_done()
285411
break
286412
try:
287413
data, metadata = item
288414
if data is not None:
289-
process_submission_item(data, metadata)
415+
inst = build_instances_from_submission(data)
416+
issues_buf.extend(inst["issues"])
417+
checkouts_buf.extend(inst["checkouts"])
418+
builds_buf.extend(inst["builds"])
419+
tests_buf.extend(inst["tests"])
420+
incidents_buf.extend(inst["incidents"])
421+
422+
if buffered_total() >= INGEST_BATCH_SIZE:
423+
flush_buffers()
424+
290425
if VERBOSE:
291-
logger.info(
292-
"Processed file %s with size %s bytes",
293-
metadata["filename"],
294-
metadata["fsize"],
426+
msg = (
427+
"Queued from %s: "
428+
"issues=%d checkouts=%d builds=%d tests=%d incidents=%d"
429+
% (
430+
metadata.get("filename", "unknown"),
431+
len(inst["issues"]),
432+
len(inst["checkouts"]),
433+
len(inst["builds"]),
434+
len(inst["tests"]),
435+
len(inst["incidents"]),
436+
)
295437
)
296-
438+
_out(msg)
297439
except Exception as e:
298440
logger.error("Error processing item in db_worker: %s", e)
299441
finally:
300-
db_queue.task_done() # Always mark task as done, even if processing failed
442+
db_queue.task_done()
301443

302444
except Empty:
303-
continue # Timeout occurred, continue to check stop_event
445+
# Time-based flush when idle
446+
if (time.time() - last_flush_ts) >= INGEST_FLUSH_TIMEOUT_SEC:
447+
if VERBOSE:
448+
_out(
449+
"[%s] Idle flush after %.1fs without new items (buffered=%d)"
450+
% (
451+
_ts(),
452+
INGEST_FLUSH_TIMEOUT_SEC,
453+
buffered_total(),
454+
)
455+
)
456+
flush_buffers()
457+
continue
304458
except Exception as e:
305459
logger.error("Unexpected error in db_worker: %s", e)
306460

461+
# Final flush after loop ends
462+
flush_buffers()
463+
307464

308465
def process_file(filename, trees_name, spool_dir):
309466
"""
@@ -339,7 +496,7 @@ def process_file(filename, trees_name, spool_dir):
339496
return True
340497

341498

342-
def ingest_submissions_parallel(
499+
def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threading
343500
spool_dir: str, trees_name: dict[str, str], max_workers: int = 5
344501
):
345502
"""
@@ -356,7 +513,24 @@ def ingest_submissions_parallel(
356513
if not json_files:
357514
return
358515

359-
logger.info("Found %d files to process", len(json_files))
516+
total_bytes = 0
517+
for f in json_files:
518+
try:
519+
total_bytes += os.path.getsize(os.path.join(spool_dir, f))
520+
except Exception:
521+
pass
522+
523+
_out(
524+
"[%s] Spool status: %d .json files queued (%.2f MB)"
525+
% (
526+
_ts(),
527+
len(json_files),
528+
total_bytes / (1024 * 1024) if total_bytes else 0.0,
529+
)
530+
)
531+
532+
cycle_start = time.time()
533+
total_files_count = len(json_files)
360534

361535
# Start database worker thread
362536
# This thread will constantly consume the db_queue and send data to the database
@@ -367,6 +541,11 @@ def ingest_submissions_parallel(
367541
stat_ok = 0
368542
stat_fail = 0
369543

544+
processed = 0
545+
last_progress = cycle_start
546+
progress_every_n = 200
547+
progress_every_sec = 2.0
548+
370549
try:
371550
# Process files in parallel
372551
with ThreadPoolExecutor(max_workers=max_workers) as executor:
@@ -376,7 +555,7 @@ def ingest_submissions_parallel(
376555
for filename in json_files
377556
}
378557

379-
# Collect results
558+
# Collect results progressively
380559
for future in as_completed(future_to_file):
381560
filename = future_to_file[future]
382561
try:
@@ -388,25 +567,82 @@ def ingest_submissions_parallel(
388567
except Exception as e:
389568
logger.error("Exception processing %s: %s", filename, e)
390569
stat_fail += 1
391-
570+
finally:
571+
processed += 1
572+
now = time.time()
573+
if (
574+
processed % progress_every_n == 0
575+
or (now - last_progress) >= progress_every_sec
576+
):
577+
elapsed = now - cycle_start
578+
rate = processed / elapsed if elapsed > 0 else 0.0
579+
remaining = total_files_count - processed
580+
eta = remaining / rate if rate > 0 else float("inf")
581+
try:
582+
qsz = db_queue.qsize()
583+
except Exception:
584+
qsz = -1
585+
msg = (
586+
"[%s] Progress: %d/%d files (ok=%d, fail=%d) | "
587+
"%.2fs elapsed | %.1f files/s | ETA %.1fs | db_queue=%d"
588+
% (
589+
_ts(),
590+
processed,
591+
total_files_count,
592+
stat_ok,
593+
stat_fail,
594+
elapsed,
595+
rate,
596+
eta,
597+
qsz,
598+
)
599+
)
600+
_out(msg)
601+
last_progress = now
602+
603+
_out(
604+
"[%s] Waiting for DB queue to drain... size=%d" % (_ts(), db_queue.qsize())
605+
)
392606
# Wait for all database operations to complete
393607
db_queue.join()
394608

609+
except KeyboardInterrupt:
610+
_out("[%s] KeyboardInterrupt: stopping ingestion and flushing..." % _ts())
611+
try:
612+
# Attempt to cancel remaining futures and exit early
613+
# Note: this only cancels tasks not yet started
614+
pass
615+
finally:
616+
raise
395617
finally:
396618
# Signal database worker to stop
397619
stop_event.set()
398620
db_queue.put(None) # Poison pill
399621
db_thread.join()
400622

401-
if stat_ok + stat_fail > 0:
402-
logger.info(
403-
"Processed %d files: %d succeeded, %d failed",
404-
stat_ok + stat_fail,
405-
stat_ok,
406-
stat_fail,
623+
elapsed = time.time() - cycle_start
624+
total_files = stat_ok + stat_fail
625+
if total_files > 0:
626+
files_per_sec = total_files / elapsed if elapsed > 0 else 0.0
627+
mb = total_bytes / (1024 * 1024)
628+
mb_per_sec = mb / elapsed if elapsed > 0 else 0.0
629+
msg = (
630+
"[%s] Ingest cycle: %d files (ok=%d, fail=%d) in %.2fs | "
631+
"%.2f files/s | %.2f MB processed (%.2f MB/s)"
632+
% (
633+
_ts(),
634+
total_files,
635+
stat_ok,
636+
stat_fail,
637+
elapsed,
638+
files_per_sec,
639+
mb,
640+
mb_per_sec,
641+
)
407642
)
643+
_out(msg)
408644
else:
409-
logger.info("No files processed, nothing to do")
645+
_out("[%s] No files processed, nothing to do" % _ts())
410646

411647

412648
def verify_dir(dir):

0 commit comments

Comments
 (0)