-
Notifications
You must be signed in to change notification settings - Fork 729
Test queue perf #2758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Test queue perf #2758
Conversation
WalkthroughThe pull request introduces several modifications across multiple services in the data sink worker application. The changes primarily focus on error handling, queue processing, and data interface restructuring. Key updates include introducing a new Changes
Sequence DiagramsequenceDiagram
participant Worker as WorkerQueueReceiver
participant Service as DataSinkService
participant Repo as DataSinkRepository
Worker->>Service: Process Result
Service->>Repo: Get Integration Info
Repo-->>Service: Return Integration Data
Service->>Service: Create Activity Result
alt Error Occurs
Service->>Service: Check Error Type
alt Is UnrepeatableError
Service->>Service: Abort Processing
else
Service->>Service: Retry Processing
end
end
Possibly related PRs
Poem
Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (10)
services/apps/data_sink_worker/src/service/activity.service.ts (5)
588-588
: Simplify condition with optional chainingYou can simplify the condition
if (dbMember && dbMember.attributes[attName])
by using optional chaining.Apply this diff to simplify the code:
- if (dbMember && dbMember.attributes[attName]) { + if (dbMember?.attributes[attName]) {🧰 Tools
🪛 Biome (1.9.4)
[error] 588-588: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
598-598
: Simplify condition with optional chainingSimilarly, the condition
if (member.attributes[attName] && member.attributes[attName][platform])
can be simplified using optional chaining.Apply this diff:
- if (member.attributes[attName] && member.attributes[attName][platform]) { + if (member.attributes[attName]?.[platform]) {🧰 Tools
🪛 Biome (1.9.4)
[error] 598-598: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
605-605
: Simplify condition with optional chainingThe condition
} else if (member.attributes[attName] && member.attributes[attName][platform]) {
can be refactored for clarity.Apply this diff:
- } else if (member.attributes[attName] && member.attributes[attName][platform]) { + } else if (member.attributes[attName]?.[platform]) {🧰 Tools
🪛 Biome (1.9.4)
[error] 605-605: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
674-674
: Simplify condition with optional chainingThe condition
if (dbActivity && dbActivity?.deletedAt) {
can be simplified since the optional chaining already handles null checks.Apply this diff:
- if (dbActivity && dbActivity?.deletedAt) { + if (dbActivity?.deletedAt) {🧰 Tools
🪛 Biome (1.9.4)
[error] 674-674: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
445-445
: Consider usingUnrepeatableError
for consistencyCurrently, an
Error
is thrown when an activity lacks a username or member. For consistency in handling unrepeatable errors, consider throwingUnrepeatableError
instead.Apply this diff:
- throw new Error('Activity does not have a username or member.') + throw new UnrepeatableError('Activity does not have a username or member.')This ensures that unrepeatable errors are handled uniformly across the application.
services/apps/data_sink_worker/src/service/common.ts (1)
1-6
: Ensure proper prototype chain when extending built-in classesWhen extending the built-in
Error
class, it's recommended to set the prototype explicitly to maintain the correct prototype chain, especially when targeting ES5 or earlier.Apply this diff to set the prototype explicitly:
export class UnrepeatableError extends Error { constructor(message: string) { super(message) this.name = 'UnrepeatableError' + Object.setPrototypeOf(this, UnrepeatableError.prototype) } }
This change ensures that
instanceof
checks work correctly forUnrepeatableError
.services/apps/data_sink_worker/src/service/dataSink.service.ts (1)
138-149
: Enhanced result processing with optional parameter.Good optimization to avoid redundant database calls when the result is already available. However, consider adding a debug log when skipping the database fetch.
if (!resultInfo) { + this.log.debug({ resultId }, 'Using provided result data, skipping database fetch.') resultInfo = await this.repo.getResultInfo(resultId) }
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts (1)
36-47
: Add error handling to getIntegrationInfo.The method implementation is clean, but it should include error handling for database operations.
public async getIntegrationInfo(integrationId: string): Promise<IIntegrationData | null> { + try { const result = await this.db().oneOrNone( `select id as "integrationId", platform from integrations where id = $(integrationId)`, { integrationId, }, ) return result + } catch (err) { + this.log.error(err, 'Failed to get integration info!') + throw err + } }services/libs/queue/src/vendors/kafka/client.ts (2)
326-327
: Consider using exponential backoff for polling.While the polling implementation is good, using a fixed 10ms delay might not be optimal under high load.
+ private async waitForAvailability(maxConcurrentMessageProcessing: number, attempt = 1): Promise<void> { + const maxAttempts = 10 + const baseDelay = 10 + while (!this.isAvailable(maxConcurrentMessageProcessing) && attempt <= maxAttempts) { + const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), 1000) + await timeout(delay) + attempt++ + } + if (!this.isAvailable(maxConcurrentMessageProcessing)) { + throw new Error('Max wait attempts reached for message processing availability') + } + } // In the run method: - while (!this.isAvailable(maxConcurrentMessageProcessing)) { - await timeout(10) - } + await this.waitForAvailability(maxConcurrentMessageProcessing)
325-345
: Use optional chaining for message value check.The current null check can be simplified using optional chaining.
- if (message && message.value) { + if (message?.value) {Also, consider wrapping the JSON.parse in a try-catch block to handle malformed messages gracefully.
- const data = JSON.parse(message.value.toString()) + try { + const data = JSON.parse(message.value.toString()) + // ... rest of the processing + } catch (err) { + this.log.error(err, 'Failed to parse message value as JSON') + return + }🧰 Tools
🪛 Biome (1.9.4)
[error] 325-325: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
services/apps/data_sink_worker/src/queue/index.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.service.ts
(4 hunks)services/apps/data_sink_worker/src/service/common.ts
(1 hunks)services/apps/data_sink_worker/src/service/dataSink.service.ts
(3 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts
(2 hunks)services/libs/queue/src/vendors/kafka/client.ts
(1 hunks)
🧰 Additional context used
🪛 Biome (1.9.4)
services/apps/data_sink_worker/src/service/activity.service.ts
[error] 588-588: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
[error] 598-598: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
[error] 605-605: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
[error] 674-674: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
services/libs/queue/src/vendors/kafka/client.ts
[error] 325-325: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
🔇 Additional comments (7)
services/apps/data_sink_worker/src/service/activity.service.ts (2)
Line range hint
1-1162
: Overall, the changes enhance error handling and code clarityThe introduction of
UnrepeatableError
and the refactoring of identity checks improve the robustness of theActivityService
. The use of optional chaining simplifies conditional statements, and the updates to the member processing logic enhance maintainability.🧰 Tools
🪛 Biome (1.9.4)
[error] 588-588: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
[error] 598-598: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
[error] 605-605: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
[error] 674-674: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
510-544
: Handle unverified identities requested for erasure appropriatelyThe logic for filtering out unverified identities requested for erasure may lead to members without any identities, potentially causing issues in downstream processes.
Please verify that members are not left without any identities after filtering. Run the following script to check for any members without identities:
Ensure that the process either prevents members from having no identities or handles such cases gracefully.
✅ Verification successful
Identity handling during activity processing is appropriate
The code correctly handles cases where members might be left without identities after filtering:
- It preserves verified identities even when marked for erasure
- It explicitly skips activity processing when no valid identities remain
- It logs appropriate warnings for monitoring
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find members without identities after erasure filtering # Expected result: No members without identities psql -d your_database -c " SELECT m.id FROM members m LEFT JOIN member_identities mi ON mi.member_id = m.id WHERE mi.id IS NULL; "Length of output: 235
Script:
#!/bin/bash # Find similar identity checks and member handling ast-grep --pattern 'member.identities.filter' # Find activity service usage rg -A 3 "class ActivityService" # Look for member creation patterns ast-grep --pattern 'new Member($$$)'Length of output: 3752
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data.ts (2)
Line range hint
13-25
: Interface restructuring improves clarityThe introduction of
IIntegrationData
and its extension byIResultData
enhances the structure and clarity of the interfaces. This change promotes better code organization.
8-11
: Verify removal of properties does not impact the codebaseProperties
hasSampleData
,plan
,isTrialPlan
, andname
have been removed fromIResultData
. Ensure that these properties are not used elsewhere in the codebase to prevent runtime errors.Run the following script to search for usages of the removed properties:
This will help confirm that removing these properties won't introduce breaking changes.
services/apps/data_sink_worker/src/queue/index.ts (1)
33-33
: Verify performance impact of reduced concurrency.The concurrent message processing limit has been significantly reduced from 20 to 3. While this might help with system stability, it could impact throughput.
Please run load tests to ensure this change doesn't create a processing bottleneck. Consider:
- Message processing latency
- Queue backlog metrics
- System resource utilization
services/apps/data_sink_worker/src/service/dataSink.service.ts (2)
59-62
: Improved error handling with UnrepeatableError.Good addition of the UnrepeatableError check. This prevents unnecessary retries for errors that we know cannot be resolved through retries.
107-135
: Well-structured parallel execution optimization.Excellent optimization using Promise.all to parallelize integration info fetching and result creation. The code is now more efficient while maintaining readability.
Changes proposed ✍️
What
copilot:summary
copilot:poem
Why
How
copilot:walkthrough
Checklist ✅
Feature
,Improvement
, orBug
.Summary by CodeRabbit
Release Notes
New Features
UnrepeatableError
for improved error handling in activity processingBug Fixes
Refactor
Chores