Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 266 additions & 33 deletions backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
from typing import Any, Literal, Optional
import yaml
import kcidb_io
from django.db import transaction
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents

from kernelCI_app.management.commands.helpers.process_submissions import (
insert_submission_data,
build_instances_from_submission,
)

VERBOSE = 0
Expand All @@ -35,11 +37,43 @@

logger = logging.getLogger("ingester")

# Thread-safe queue for database operations
db_queue = Queue()
# Batching and backpressure controls
try:
INGEST_BATCH_SIZE = int(os.environ.get("INGEST_BATCH_SIZE", "10000"))
except (ValueError, TypeError):
logger.warning("Invalid INGEST_BATCH_SIZE, using default 10000")
INGEST_BATCH_SIZE = 10000

try:
INGEST_FLUSH_TIMEOUT_SEC = float(os.environ.get("INGEST_FLUSH_TIMEOUT_SEC", "2.0"))
except (ValueError, TypeError):
logger.warning("Invalid INGEST_FLUSH_TIMEOUT_SEC, using default 2.0")
INGEST_FLUSH_TIMEOUT_SEC = 5.0

try:
INGEST_QUEUE_MAXSIZE = int(os.environ.get("INGEST_QUEUE_MAXSIZE", "5000"))
except (ValueError, TypeError):
logger.warning("Invalid INGEST_QUEUE_MAXSIZE, using default 5000")
INGEST_QUEUE_MAXSIZE = 5000

# Thread-safe queue for database operations (bounded for backpressure)
db_queue = Queue(maxsize=INGEST_QUEUE_MAXSIZE)
db_lock = threading.Lock()


def _ts() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())


def _out(msg: str) -> None:
"""Write debug/perf output to stdout"""
# logger was unreliable in some environments
try:
print(f"[{_ts()}] {msg}", flush=True)
except Exception:
pass


def move_file_to_failed_dir(filename, failed_dir):
try:
os.rename(filename, os.path.join(failed_dir, os.path.basename(filename)))
Expand Down Expand Up @@ -259,16 +293,7 @@ def prepare_file_data(filename, trees_name, spool_dir):
}


def process_submission_item(data: dict[str, Any], metadata: dict[str, Any]):
with db_lock:
insert_submission_data(data, metadata)

if VERBOSE and "processing_time" in metadata:
ing_speed = metadata["fsize"] / metadata["processing_time"] / 1024
logger.info("Ingested %s in %.2f KB/s", metadata["filename"], ing_speed)


def db_worker(stop_event: threading.Event):
def db_worker(stop_event: threading.Event): # noqa: C901
"""
Worker thread that processes the database queue.
This is the only thread that interacts with the database.
Expand All @@ -277,33 +302,167 @@ def db_worker(stop_event: threading.Event):
stop_event: threading.Event (flag) to signal the worker to stop processing
"""

# Local buffers for batching
issues_buf = []
checkouts_buf = []
builds_buf = []
tests_buf = []
incidents_buf = []

last_flush_ts = time.time()

def buffered_total():
return (
len(issues_buf)
+ len(checkouts_buf)
+ len(builds_buf)
+ len(tests_buf)
+ len(incidents_buf)
)

def flush_buffers():
nonlocal last_flush_ts
total = buffered_total()
if total == 0:
return

# Insert in dependency-safe order
flush_start = time.time()
try:
# Single transaction for all tables in the flush
with transaction.atomic():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it could work if you made a single shared function like

def bulk_create_items(table, item_buffer):
  t0 = time.time()
  # table should be Issues, Checkouts, etc.
  table.objects.bulk_create(
      item_buffer, batch_size=batch_size, ignore_conflicts=True
  )
  _out(
      "[%s] bulk_create issues: n=%d in %.3fs"
      % (_ts(), len(item_buffer), time.time() - t0)
  )

and then just call it like bulk_create_items(Issues, issue_buf) and similarly to the other tables. This way would reduce the code a lot

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it doesn't work, just move all this insertion to a separate function to lower the complexity of db_worker, please

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it doesn't work, just move all this insertion to a separate function to lower the complexity of db_worker, please

if issues_buf:
t0 = time.time()
Issues.objects.bulk_create(
issues_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
)
_out(
"bulk_create issues: n=%d in %.3fs"
% (len(issues_buf), time.time() - t0)
)
if checkouts_buf:
t0 = time.time()
Checkouts.objects.bulk_create(
checkouts_buf,
batch_size=INGEST_BATCH_SIZE,
ignore_conflicts=True,
)
_out(
"bulk_create checkouts: n=%d in %.3fs"
% (len(checkouts_buf), time.time() - t0)
)
if builds_buf:
t0 = time.time()
Builds.objects.bulk_create(
builds_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
)
_out(
"bulk_create builds: n=%d in %.3fs"
% (len(builds_buf), time.time() - t0)
)
if tests_buf:
t0 = time.time()
Tests.objects.bulk_create(
tests_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
)
_out(
"bulk_create tests: n=%d in %.3fs"
% (len(tests_buf), time.time() - t0)
)
if incidents_buf:
t0 = time.time()
Incidents.objects.bulk_create(
incidents_buf,
batch_size=INGEST_BATCH_SIZE,
ignore_conflicts=True,
)
_out(
"bulk_create incidents: n=%d in %.3fs"
% (len(incidents_buf), time.time() - t0)
)
except Exception as e:
logger.error("Error during bulk_create flush: %s", e)
finally:
flush_dur = time.time() - flush_start
rate = total / flush_dur if flush_dur > 0 else 0.0
msg = (
"Flushed batch in %.3fs (%.1f items/s): "
"issues=%d checkouts=%d builds=%d tests=%d incidents=%d"
% (
flush_dur,
rate,
len(issues_buf),
len(checkouts_buf),
len(builds_buf),
len(tests_buf),
len(incidents_buf),
)
)
_out(msg)
issues_buf.clear()
checkouts_buf.clear()
builds_buf.clear()
tests_buf.clear()
incidents_buf.clear()
last_flush_ts = time.time()

while not stop_event.is_set() or not db_queue.empty():
try:
item = db_queue.get(timeout=0.1)
if item is None:
db_queue.task_done() # Important: mark the poison pill as done
db_queue.task_done()
break
try:
data, metadata = item
if data is not None:
process_submission_item(data, metadata)
inst = build_instances_from_submission(data)
issues_buf.extend(inst["issues"])
checkouts_buf.extend(inst["checkouts"])
builds_buf.extend(inst["builds"])
tests_buf.extend(inst["tests"])
incidents_buf.extend(inst["incidents"])

if buffered_total() >= INGEST_BATCH_SIZE:
flush_buffers()

if VERBOSE:
logger.info(
"Processed file %s with size %s bytes",
metadata["filename"],
metadata["fsize"],
msg = (
"Queued from %s: "
"issues=%d checkouts=%d builds=%d tests=%d incidents=%d"
% (
metadata.get("filename", "unknown"),
len(inst["issues"]),
len(inst["checkouts"]),
len(inst["builds"]),
len(inst["tests"]),
len(inst["incidents"]),
)
)

_out(msg)
except Exception as e:
logger.error("Error processing item in db_worker: %s", e)
finally:
db_queue.task_done() # Always mark task as done, even if processing failed
db_queue.task_done()

except Empty:
continue # Timeout occurred, continue to check stop_event
# Time-based flush when idle
if (time.time() - last_flush_ts) >= INGEST_FLUSH_TIMEOUT_SEC:
if VERBOSE:
_out(
"Idle flush after %.1fs without new items (buffered=%d)"
% (
INGEST_FLUSH_TIMEOUT_SEC,
buffered_total(),
)
)
flush_buffers()
continue
except Exception as e:
logger.error("Unexpected error in db_worker: %s", e)

# Final flush after loop ends
flush_buffers()


def process_file(filename, trees_name, spool_dir):
"""
Expand Down Expand Up @@ -339,7 +498,7 @@ def process_file(filename, trees_name, spool_dir):
return True


def ingest_submissions_parallel(
def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threading
spool_dir: str, trees_name: dict[str, str], max_workers: int = 5
):
"""
Expand All @@ -356,7 +515,23 @@ def ingest_submissions_parallel(
if not json_files:
return

logger.info("Found %d files to process", len(json_files))
total_bytes = 0
for f in json_files:
try:
total_bytes += os.path.getsize(os.path.join(spool_dir, f))
except Exception:
pass

_out(
"Spool status: %d .json files queued (%.2f MB)"
% (
len(json_files),
total_bytes / (1024 * 1024) if total_bytes else 0.0,
)
)

cycle_start = time.time()
total_files_count = len(json_files)

# Start database worker thread
# This thread will constantly consume the db_queue and send data to the database
Expand All @@ -367,6 +542,11 @@ def ingest_submissions_parallel(
stat_ok = 0
stat_fail = 0

processed = 0
last_progress = cycle_start
progress_every_n = 200
progress_every_sec = 2.0

try:
# Process files in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
Expand All @@ -376,7 +556,7 @@ def ingest_submissions_parallel(
for filename in json_files
}

# Collect results
# Collect results progressively
for future in as_completed(future_to_file):
filename = future_to_file[future]
try:
Expand All @@ -388,25 +568,78 @@ def ingest_submissions_parallel(
except Exception as e:
logger.error("Exception processing %s: %s", filename, e)
stat_fail += 1

finally:
processed += 1
now = time.time()
if (
processed % progress_every_n == 0
or (now - last_progress) >= progress_every_sec
):
elapsed = now - cycle_start
rate = processed / elapsed if elapsed > 0 else 0.0
remaining = total_files_count - processed
eta = remaining / rate if rate > 0 else float("inf")
try:
qsz = db_queue.qsize()
except Exception:
qsz = -1
msg = (
"Progress: %d/%d files (ok=%d, fail=%d) | "
"%.2fs elapsed | %.1f files/s | ETA %.1fs | db_queue=%d"
% (
processed,
total_files_count,
stat_ok,
stat_fail,
elapsed,
rate,
eta,
qsz,
)
)
_out(msg)
last_progress = now

_out("Waiting for DB queue to drain... size=%d" % db_queue.qsize())
# Wait for all database operations to complete
db_queue.join()

except KeyboardInterrupt:
_out("KeyboardInterrupt: stopping ingestion and flushing...")
try:
# Attempt to cancel remaining futures and exit early
# Note: this only cancels tasks not yet started
pass
Comment on lines +609 to +612
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there some code supposed to be here?

Copy link
Member Author

@nuclearcat nuclearcat Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More graceful shutdown if process is interrupted. I decided it is not critical for now, as in general this ingester better to not interrupt, especially when it is in this part of code (due potential data loss)

finally:
raise
finally:
# Signal database worker to stop
stop_event.set()
db_queue.put(None) # Poison pill
db_thread.join()

if stat_ok + stat_fail > 0:
logger.info(
"Processed %d files: %d succeeded, %d failed",
stat_ok + stat_fail,
stat_ok,
stat_fail,
elapsed = time.time() - cycle_start
total_files = stat_ok + stat_fail
if total_files > 0:
files_per_sec = total_files / elapsed if elapsed > 0 else 0.0
mb = total_bytes / (1024 * 1024)
mb_per_sec = mb / elapsed if elapsed > 0 else 0.0
msg = (
"Ingest cycle: %d files (ok=%d, fail=%d) in %.2fs | "
"%.2f files/s | %.2f MB processed (%.2f MB/s)"
% (
total_files,
stat_ok,
stat_fail,
elapsed,
files_per_sec,
mb,
mb_per_sec,
)
)
_out(msg)
else:
logger.info("No files processed, nothing to do")
_out("No files processed, nothing to do")


def verify_dir(dir):
Expand Down
Loading
Loading