-
Notifications
You must be signed in to change notification settings - Fork 729
Data sink worker processing changes #2754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…atching for kafka msg processing
WalkthroughThis pull request introduces modifications across multiple services, focusing on error handling and message processing improvements. The changes primarily affect the data sink worker, data access layer, and Kafka queue service. The modifications include updating method signatures, enhancing error data management, and transitioning from individual message processing to batch processing in the Kafka queue service. Changes
Sequence DiagramsequenceDiagram
participant DataSinkService
participant DataSinkRepository
participant KafkaQueueService
DataSinkService->>DataSinkService: Create structured error data
DataSinkService->>DataSinkRepository: Call delayResult with error
DataSinkRepository-->>DataSinkService: Store error information
KafkaQueueService->>KafkaQueueService: Process message batch
KafkaQueueService->>KafkaQueueService: Manage concurrent processing
Possibly related PRs
Poem
Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
services/libs/queue/src/vendors/kafka/client.ts (1)
328-328
: Simplify null check using optional chainingThe condition
if (message && message.value)
can be simplified using optional chaining for better readability.Apply this diff:
- if (message && message.value) { + if (message?.value) {🧰 Tools
🪛 Biome (1.9.4)
[error] 328-328: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
services/apps/data_sink_worker/src/service/dataSink.service.ts (1)
48-55
: Ensure safe serialization of theerror
objectUsing
JSON.stringify(error)
may throw an error if theerror
object contains circular references or non-enumerable properties. Consider using a safer serialization method likeutil.inspect(error)
.Add the import at the top of the file:
+import util from 'util'
Modify the
errorData
assignment:const errorData = { location, message, metadata, errorMessage: error?.message, errorStack: error?.stack, - errorString: error ? JSON.stringify(error) : undefined, + errorString: error ? util.inspect(error) : undefined, }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
services/apps/data_sink_worker/src/service/dataSink.service.ts
(2 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts
(1 hunks)services/libs/queue/src/vendors/kafka/client.ts
(1 hunks)
🧰 Additional context used
🪛 Biome (1.9.4)
services/libs/queue/src/vendors/kafka/client.ts
[error] 328-328: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: lint-format
- GitHub Check: lint-format-services
🔇 Additional comments (1)
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts (1)
261-273
: Verify safe serialization of theerror
object when updating the databaseWhen updating the
integration.results
table,JSON.stringify(error)
is used to serialize theerror
object. Ensure that theerror
object does not contain circular references or properties that cannot be serialized, which could causeJSON.stringify
to throw an error.Confirm that the
error
object passed todelayResult
is safe for JSON serialization or consider using a safer serialization method.
|
||
this.addJob() | ||
const data = JSON.parse(message.value.toString()) | ||
const now = performance.now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import performance
from the perf_hooks
module
The performance.now()
method is used without importing performance
. In Node.js, performance
is available via the perf_hooks
module.
Add the following import at the top of the file:
+import { performance } from 'perf_hooks'
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const now = performance.now() | |
import { performance } from 'perf_hooks' | |
const now = performance.now() |
// if (resultInfo.state !== IntegrationResultState.PENDING) { | ||
// this.log.warn({ actualState: resultInfo.state }, 'Result is not pending.') | ||
// if (resultInfo.state === IntegrationResultState.PROCESSED) { | ||
// this.log.warn('Result has already been processed. Skipping...') | ||
// return false | ||
// } | ||
|
||
await this.repo.resetResults([resultId]) | ||
return false | ||
} | ||
// await this.repo.resetResults([resultId]) | ||
// return false | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review the removal of state checks in processResult
Commenting out the state checks for resultInfo.state
may lead to processing results that are not in the PENDING
state, potentially causing duplicate processing or inconsistent behavior.
Consider restoring these checks to ensure that only results in the PENDING
state are processed. If the removal is intentional, please verify that it won't lead to unintended consequences.
Changes proposed ✍️
What
copilot:summary
copilot:poem
Why
How
copilot:walkthrough
Checklist ✅
Feature
,Improvement
, orBug
.Summary by CodeRabbit
New Features
Improvements
Technical Updates