Skip to content

Conversation

nuclearcat
Copy link
Member

Also added some verbose logs to monitor ingestion rate, queue, etc.

@nuclearcat nuclearcat force-pushed the ingester-port branch 4 times, most recently from aa39103 to 9e3cd40 Compare September 15, 2025 16:33
Comment on lines 75 to 80
json_files = [
f
for f in os.listdir(spool_dir)
if os.path.isfile(os.path.join(spool_dir, f))
and f.endswith(".json")
]
Copy link

@tales-aparecida tales-aparecida Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChatGPT suggested scandir, which is a bit more cleaner around is_file, and https://stackoverflow.com/questions/59268696/why-is-os-scandir-as-slow-as-os-listdir seems to suggest it would be as fast as listdir (which apparently is due to how it's using C structs instead of Python objects).

with os.scandir(spool_dir) as it:
    json_files = [
        entry.name
        for entry in it
        if entry.is_file() and entry.name.endswith(".json")
    ]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# Batching and backpressure controls
try:
INGEST_BATCH_SIZE = int(os.environ.get("INGEST_BATCH_SIZE", "1000"))
except Exception:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer being more specific (applies to lines below)

Suggested change
except Exception:
except (ValueError, TypeError):

Comment on lines 67 to 70
# Write debug/perf output to stdout
# logger was unreliable in some environments

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
# Write debug/perf output to stdout
# logger was unreliable in some environments
"""Write debug/perf output to stdout."""
# logger was unreliable in some environments

Comment on lines 304 to 306
def db_worker( # noqa: C901
stop_event: threading.Event,
):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, I'm almost sure you could've put the noqa in front.

Suggested change
def db_worker( # noqa: C901
stop_event: threading.Event,
):
def db_worker(stop_event: threading.Event): # noqa: C901

But I'd argue that the c901 alert was accurate, and we should try to refactor this function in the future

@tales-aparecida
Copy link

Don't worry too much about the nitpicking, it's just how I walk through the patch while I understand the work.

I appreciate the batching and the need to call to bulk_create. There are a few "caveats" to using it, but I believe we can find alternatives whenever they become necessary.

I think the code could be drier... but I also think the functionality was already thoroughly tested manually. So we could merge this critical piece, write unit tests and performance tests over it, and only then think about refining the code.

@tales-aparecida
Copy link

In other words, I'm looking at this PR as your requirements with regards to performance, and matching the functionality we had in kcidb ingester, and we need to work on top of it.

flush_start = time.time()
try:
# Single transaction for all tables in the flush
with transaction.atomic():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it could work if you made a single shared function like

def bulk_create_items(table, item_buffer):
  t0 = time.time()
  # table should be Issues, Checkouts, etc.
  table.objects.bulk_create(
      item_buffer, batch_size=batch_size, ignore_conflicts=True
  )
  _out(
      "[%s] bulk_create issues: n=%d in %.3fs"
      % (_ts(), len(item_buffer), time.time() - t0)
  )

and then just call it like bulk_create_items(Issues, issue_buf) and similarly to the other tables. This way would reduce the code a lot

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it doesn't work, just move all this insertion to a separate function to lower the complexity of db_worker, please

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it doesn't work, just move all this insertion to a separate function to lower the complexity of db_worker, please

@nuclearcat
Copy link
Member Author

This is just initial stage to get things working, and to disable old ingester.
Then after some (short) monitoring, we might disable verbose logging for a while and search for better strategy, unless we hit performance issues.
It has many caveats, i think for example if we get it crashed, it might lose some of json data, because file is already considered to be processed, but in reality data was in queue to db worker.
Also i think batching strategy is suboptimal, i didnt got used yet to django-specific functions. I might need to look to raw SQL, how it looks, during ingestion.

@nuclearcat nuclearcat force-pushed the ingester-port branch 2 times, most recently from b90b8cf to 29c10e3 Compare September 16, 2025 18:19
Copy link
Collaborator

@MarceloRobert MarceloRobert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why add clutter to the code when it can simply be refactored without much effort. I understand the reasoning, but this ingester is not even being currently used, we don't need to keep "legacy" code.

flush_start = time.time()
try:
# Single transaction for all tables in the flush
with transaction.atomic():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it doesn't work, just move all this insertion to a separate function to lower the complexity of db_worker, please

@nuclearcat
Copy link
Member Author

nuclearcat commented Sep 19, 2025

@MarceloRobert i removed legacy code now

@nuclearcat nuclearcat force-pushed the ingester-port branch 3 times, most recently from 3be7455 to 338ad56 Compare September 19, 2025 14:58
Copy link
Collaborator

@MarceloRobert MarceloRobert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could still be some improvements, but looks good enough to me.

try:
# Attempt to cancel remaining futures and exit early
# Note: this only cancels tasks not yet started
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there some code supposed to be here?

Copy link
Member Author

@nuclearcat nuclearcat Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More graceful shutdown if process is interrupted. I decided it is not critical for now, as in general this ingester better to not interrupt, especially when it is in this part of code (due potential data loss)

@nuclearcat nuclearcat force-pushed the ingester-port branch 3 times, most recently from c97b59b to 8750e8b Compare September 19, 2025 17:29
Also added some verbose logs to monitor ingestion rate,
queue, etc.

Signed-off-by: Denys Fedoryshchenko <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants