Skip to content

Conversation

monishkadas-ms
Copy link
Collaborator

No description provided.

monishkadas-ms and others added 30 commits April 30, 2025 15:33
Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.
Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.

Updated kusto.rb
Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.

Updated kusto.rb
Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.

Updated kusto.rb and ingestor.rb with the implementation of memory based
buffer and flushing and removed file based buffer
The max_size config now refers to the size of the buffer (defaults to
10MB) instead of number of events in the buffer
The max_size config now refers to the size of the buffer (defaults to
10MB) instead of number of events in the buffer
** Updated the kusto_spec.rb test
Removed the temp file buffer and added retry to prevent data loss
Added exponential backoff to retries. Removed max_retries.
Updated upload_async and upload to raise errors during network downtime
to trigger rescue block in flush buffer so the data sent for flushing
can be restored and the flush can be attempted later.

Added file buffer to store data ONLY when flush fails due to network
issues. Once the network is back online the each file in the buffer is
flushed and deleted first and the regular in-memory buffer is used post
that.
Updated buffer_flush and upload()
Updated failed_items_path to be a required config parameter. If set to
"nil", the failed items will not be persisted to local storage, else
the items are stored in the file path provided after max_retries.
#@logger.info("Ingestion result: #{ingest_result}")
}
.rescue{ |e|
@logger.error("Ingestion failed: #{e.message}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple threads can fail simultaneously and call persist_batch without synchronization, which can lead to directory creation race conditions, file write conflicts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directory is created when the @file_persistence instance is initialized in kusto.rb so it should not be an issue. I did add write mutex to persist_batch to ensure thread-safe file saving.


module LogStash; module Outputs; class KustoOutputInternal
class FilePersistence
attr_reader :failed_dir

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Module instance variable @failed_dir is shared across all threads without synchronization, perhaps we will need a mutex protection. one solution can be, directory creation can be moved to init to prevent race conditions from multiple threads during directory creation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The directory creation is already in the initialize, and we only create one instance of the @file_persistance in our code so I don't think adding a mutex for directory creation would be useful, but it would help to add one for persist_batch (multiple threads can try to save failed files)

::File.write(filename, JSON.dump(batch))
end

def load_batches

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_batches-> for each file, it reads the entire file into memory, parses into json and then returns an array which is done inside a map, which means all contents are loaded into memory at once since map returns an array containing all the results.
This can potentially lead to OOM errors if the number of failes files are large in number.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! We are now loading only one batch at a time.


def persist_batch(batch)
filename = ::File.join(@failed_dir, "failed_batch_#{Time.now.to_i}_#{SecureRandom.uuid}.json")
::File.write(filename, JSON.dump(batch))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct me if I am wrong, File.write is not atomic which means if the process crashes amidst a write, you get a corrupted JSON file. When load_batche tries to read it later, JSON.load will fail.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I am writing to temp file first and using file rename to atomically move it to the correct location. Added error handling and retries (default 3 atm) to keep trying to persist the failed batch in case of transient errors.


def load_batches
return [] unless Dir.exist?(@failed_dir)
Dir.glob(::File.join(@failed_dir, 'failed_batch_*.json')).map do |file|

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In continuation to the previous comment, we need to add error handling for corrupted/partial JSON files. Also how are we handling the files which are deleted between glob and read.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Added error handling, for deleted files, and also for the corrupted files (they are moved to a quarantine dir so it doesn’t block future processing instead of ignoring/deleting them immediately)

def process_failed_batches
@file_persistence.load_batches.each do |file, batch|
begin
@buffer_state[:flush_mutex].lock
Copy link

@tanmaya-panda1 tanmaya-panda1 Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush_mutex only protects the flush operation, but race condition happens before the mutex is acquired.

Consider the following
T1 calls process_failed_batches
T1 executes load_batches and gets the list of files
T2 calls process_failed_batches
T2 executes load_batches and also gets the list of files
T1 acquires flush_mutex and processes file1
T2 waits for flush_mutex
T1 deletes the file1 and releases mutex
T2 acquires flush_mutex and tries to process the file1 - but it's already deleted.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process_failed_batches fn is only called once on startup from buffer_initialize, which is also only called once at the beginning. So it's not like multiple threads would call the fn. Although, just in case we end up using multiple threads later, I did add a rename to 'filename.processing' step to make sure multiple threads don't try to load/delete the same file. There is also error handling in place for corrupted files and failed loading of files. I removed the load_batches fn in file_persistence.rb and added the logic in the process_failed_files fn directly because ruby was still trying fetch the entire list of files after enumerating instead of loading one at a time. That is fixed now.

return items_flushed
end

def process_failed_batches

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also do check for conditions when process_failed_batches is called from multiple places like on startup and from a timer. When both the threads try to retrieve the same file and try to flush leading to file already deleted/duplicate flushing to kusto.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, that fn is only called once on startup atm. If we do add a feature to run background recovery by setting a timer, it would make sense to add another mutex, but ideally it should still be running on the same timer thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants