-
Notifications
You must be signed in to change notification settings - Fork 730
Fix merging of big orgs #2694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix merging of big orgs #2694
Conversation
WalkthroughThis pull request introduces several changes across multiple files to enhance synchronization processes and database tracking. A new Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (7)
services/libs/data-access-layer/src/members/segments.ts (1)
10-16
: Consider adding explicit error handling.The implementation looks good and aligns well with the PR objectives of optimizing large org merging. However, consider adding explicit error handling to log database errors for better debugging.
export async function findLastSyncDate(qx: QueryExecutor, memberId: string): Promise<Date | null> { + try { const result = await qx.selectOneOrNone( `SELECT MAX("createdAt") AS "lastSyncDate" FROM "memberSegmentsAgg" WHERE "memberId" = $(memberId)`, { memberId }, ) return result?.lastSyncDate ? new Date(result.lastSyncDate) : null + } catch (error) { + log.error({ error, memberId }, 'Error finding last sync date for member') + throw error + } }services/apps/search_sync_api/src/routes/member.ts (1)
52-59
: Consider adding input validation for syncFrom parameter.The implementation aligns well with the PR objectives by introducing the
syncFrom
parameter to optimize member syncing. However, consider adding validation for thesyncFrom
parameter to ensure it's a valid date string before attempting conversion.- const { organizationId, syncFrom } = req.body + const { organizationId, syncFrom } = req.body + + if (syncFrom && isNaN(Date.parse(syncFrom))) { + throw new Error('Invalid syncFrom date format') + } + try {services/apps/entity_merging_worker/src/activities/organizations.ts (1)
69-75
: Consider performance optimizations for member sync.While the timeout increase and sync tracking help prevent failures, the underlying performance issue (20s per member in QuestDB) remains. Consider these long-term improvements:
- Implement batch processing for member syncs
- Add caching for frequently accessed member data
- Optimize QuestDB queries for aggregate calculations
- Consider implementing a queue-based system for large organizations
Would you like me to elaborate on any of these optimization strategies?
services/libs/opensearch/src/apiClient.ts (1)
51-51
: Consider handling date serialization explicitly.When sending dates via axios, ensure proper serialization to avoid timezone or format issues. Consider transforming the date to ISO string format before sending.
await this.searchSyncApi.post('/sync/organization/members', { organizationId, - syncFrom, + syncFrom: syncFrom?.toISOString() ?? null, })services/apps/entity_merging_worker/src/workflows/all.ts (2)
24-24
: Consider adding monitoring for long-running activities.The timeout increase from 15 to 60 minutes aligns with the PR objectives and should accommodate large organizations. However, activities running for extended periods could impact system resources.
Consider:
- Adding metrics to track activity execution times
- Setting up alerts for activities approaching the timeout
- Implementing circuit breakers for resource protection
132-134
: Consider parallel execution of sync operations.The sync operations for primary and secondary organizations are currently sequential. Since they use the same syncStart timestamp, they could potentially be executed in parallel to improve performance.
Consider using Promise.all:
const syncStart = new Date() - await syncOrganization(primaryId, syncStart) - await syncOrganization(secondaryId, syncStart) + await Promise.all([ + syncOrganization(primaryId, syncStart), + syncOrganization(secondaryId, syncStart) + ])services/libs/opensearch/src/service/member.sync.service.ts (1)
379-379
: Simplify date comparison by directly comparingDate
objectsYou can enhance readability by comparing
Date
objects directly without usinggetTime()
:- if (lastSyncDate && lastSyncDate.getTime() > opts.syncFrom.getTime()) { + if (lastSyncDate && lastSyncDate > opts.syncFrom) {JavaScript and TypeScript support direct comparison of
Date
objects, which makes the code cleaner and more intuitive.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
backend/src/database/migrations/V1732118484__members-segments-agg-created-at.sql
(1 hunks)services/apps/entity_merging_worker/src/activities/organizations.ts
(1 hunks)services/apps/entity_merging_worker/src/workflows/all.ts
(3 hunks)services/apps/search_sync_api/src/routes/member.ts
(1 hunks)services/libs/data-access-layer/src/members/segments.ts
(1 hunks)services/libs/opensearch/src/apiClient.ts
(1 hunks)services/libs/opensearch/src/service/member.sync.service.ts
(5 hunks)
✅ Files skipped from review due to trivial changes (1)
- backend/src/database/migrations/V1732118484__members-segments-agg-created-at.sql
🔇 Additional comments (4)
services/apps/search_sync_api/src/routes/member.ts (1)
57-59
: Verify MemberSyncService integration.
Let's verify that the MemberSyncService.syncOrganizationMembers
method properly handles the new options parameter.
✅ Verification successful
The MemberSyncService integration is properly implemented
The implementation in services/libs/opensearch/src/service/member.sync.service.ts
shows that the syncOrganizationMembers
method correctly accepts and handles the new options parameter with syncFrom
. The method properly:
- Accepts the options parameter with the exact shape:
{ syncFrom: Date | null }
- Uses this parameter in the sync logic to determine whether to skip sync based on the last sync date
- Matches the usage shown in the route handler
The integration is correctly implemented and no issues were found.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify MemberSyncService implementation matches the new method signature
# Search for the syncOrganizationMembers method definition
ast-grep --pattern 'class MemberSyncService {
$$$
syncOrganizationMembers(organizationId, $opts) {
$$$
}
$$$
}'
# Search for any direct calls to this method to ensure they're updated
rg -A 2 'syncOrganizationMembers.*\(' --type typescript
Length of output: 218
Script:
#!/bin/bash
# Let's try a different approach to find the implementation and usages
# Find the MemberSyncService implementation
rg -l "class MemberSyncService" --type ts
# Find the file containing the implementation
fd -e ts -e tsx -x rg -l "syncOrganizationMembers.*\("
# Look for the actual implementation with context
rg "syncOrganizationMembers.*\(" -A 10 -B 2 --type ts --type tsx
# Look for all imports of MemberSyncService
rg "import.*MemberSyncService" --type ts --type tsx
Length of output: 794
Script:
#!/bin/bash
# Let's examine the implementation in both MemberSyncService files and the actual method
# Check the implementation in opensearch lib
cat services/libs/opensearch/src/service/member.sync.service.ts
# Check the implementation in integration sync worker
cat services/apps/integration_sync_worker/src/service/member.sync.service.ts
# Let's also check the actual usage in the search sync api
cat services/apps/search_sync_api/src/routes/member.ts
Length of output: 43049
services/apps/entity_merging_worker/src/activities/organizations.ts (1)
69-75
: Verify temporal activity timeout configuration.
The PR objectives mention increasing the activity timeout from 15 minutes to 1 hour. Ensure this configuration is properly set in the temporal workflow definition.
✅ Verification successful
Let me try a different search strategy to locate the temporal workflow configurations.
Let me try one more search strategy focusing on the workflow file structure and temporal configuration patterns.
Timeout configuration is properly set to 60 minutes
The temporal activity timeout is correctly configured in services/apps/entity_merging_worker/src/workflows/all.ts
with startToCloseTimeout: '60 minutes'
for all activities including syncOrganization
. This matches the PR objective of increasing the timeout from 15 minutes to 1 hour.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for temporal workflow configuration files and activity timeout settings
echo "Searching for temporal workflow configurations..."
rg -l "finishOrgMerging" | while read -r file; do
echo "=== $file ==="
rg -A 5 -B 5 "scheduleToCloseTimeout|startToCloseTimeout" "$file"
done
Length of output: 148
Script:
#!/bin/bash
# Search for workflow and activity configurations using different patterns
echo "Searching for temporal workflow and activity configurations..."
# Search for workflow definitions
echo "=== Workflow Definitions ==="
rg -A 10 -B 10 "createWorkflow|workflowOptions" services/apps/entity_merging_worker/
# Search for activity configurations
echo "=== Activity Configurations ==="
rg -A 10 -B 10 "activityOptions|ActivityOptions" services/apps/entity_merging_worker/
Length of output: 449
Script:
#!/bin/bash
# First, find relevant workflow files
echo "=== Finding workflow files ==="
fd -e ts -e js . services/apps/entity_merging_worker/src/workflows/
# Search for timeout configurations in the workflow directory
echo "=== Searching timeout configurations ==="
rg -A 5 -B 5 "timeout|Timeout" services/apps/entity_merging_worker/src/
# Look for activity registrations
echo "=== Activity registrations ==="
rg -A 5 -B 5 "proxyActivities|registerActivity" services/apps/entity_merging_worker/src/
Length of output: 2876
services/apps/entity_merging_worker/src/workflows/all.ts (2)
Line range hint 102-134
: Verify error handling in the workflow.
The workflow modifications handle the timing aspects well, but we should verify the error handling for scenarios such as:
- Partial sync failures
- Network timeouts during long-running operations
- Recovery after activity failures
✅ Verification successful
Error handling is appropriately managed through Temporal's workflow engine
The workflow is using Temporal's built-in error handling and retry mechanisms:
- Activities are configured with a 60-minute timeout via
startToCloseTimeout: '60 minutes'
- Each activity is proxied through Temporal's activity system, which provides:
- Automatic retry handling
- State persistence
- Exactly-once execution guarantees
- The workflow maintains consistency by:
- Updating state transitions through
setMergeAction
- Executing operations in a specific order
- Using synchronous operations where needed (e.g.,
recalculateActivityAffiliationsOfOrganizationSynchronous
)
- Updating state transitions through
The original concern about error handling verification is addressed by the framework's built-in capabilities, and no additional error handling is required at the workflow level.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for error handling patterns in the workflow
rg -A 5 "catch|error|fail" services/apps/entity_merging_worker/src/
# Look for retry policies
ast-grep --pattern 'retryPolicy: {
$$$
}'
Length of output: 2076
Script:
#!/bin/bash
# Check for workflow configuration and error handling patterns
rg -A 10 "workflow\(" services/apps/entity_merging_worker/src/
# Look for activity configurations
ast-grep --pattern 'createActivityHandle<$_>({
$$$
})'
# Check for error handling in the main workflow file
rg -A 5 "try|catch|throw" services/apps/entity_merging_worker/src/workflows/
Length of output: 202
Script:
#!/bin/bash
# Check the workflow file content to understand the structure
cat services/apps/entity_merging_worker/src/workflows/all.ts
# Look for temporal workflow configurations
rg -l "proxyActivities|defineQuery|defineSignal" services/apps/entity_merging_worker/src/
# Check activity definitions
rg -A 5 "export async function" services/apps/entity_merging_worker/src/activities/
Length of output: 13709
102-103
: Verify sync service implementation of the syncStart parameter.
The addition of syncStart timestamp aligns with the PR objectives. However, we should verify that the sync service correctly utilizes this parameter to filter already synced members.
✅ Verification successful
Based on the search results, I can now provide a final response as we have found the implementation details showing how the syncStart parameter is used in the member sync process:
Verified: syncStart parameter is correctly implemented for filtering already synced members
The implementation in MemberSyncService correctly utilizes the syncStart parameter to avoid re-syncing already synced members:
- The syncStart timestamp is passed through to the member sync API endpoint
- MemberSyncService checks the last sync date for each member against the syncStart parameter
- Members whose last sync date is more recent than syncStart are skipped from re-syncing
- The implementation includes proper logging of skipped members with their last sync dates
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for the implementation of syncOrganization to verify syncStart usage
ast-grep --pattern 'async function syncOrganization($_, syncStart: Date) {
$$$
}'
# Search for related sync service implementation
rg -A 10 "syncOrganizationMembers.*syncFrom"
Length of output: 132
Script:
#!/bin/bash
# Let's try different approaches to find the implementation
# Search for syncOrganization function with any parameter pattern
ast-grep --pattern 'async function syncOrganization($_) {
$$$
}'
# Search for syncOrganization function declaration
rg -A 15 "async.*function.*syncOrganization"
# Search for any usage of syncStart parameter
rg -A 5 "syncStart.*Date"
# Search for organization sync related functions
rg -A 10 "sync.*Organization"
Length of output: 103194
Script:
#!/bin/bash
# Let's search for the implementation of syncOrganizationMembers with syncFrom parameter
ast-grep --pattern 'syncOrganizationMembers($_, $_, { syncFrom: $_ })'
# Search for member sync service implementation with context
rg -A 10 "class MemberSyncService"
# Search for member sync implementation with syncFrom parameter
rg -A 10 "syncFrom.*Date.*null"
Length of output: 6205
|
||
export async function findLastSyncDate(qx: QueryExecutor, memberId: string): Promise<Date | null> { | ||
const result = await qx.selectOneOrNone( | ||
`SELECT MAX("createdAt") AS "lastSyncDate" FROM "memberSegmentsAgg" WHERE "memberId" = $(memberId)`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Add index for memberId and createdAt columns to optimize the MAX aggregation query
The memberSegmentsAgg
table has the createdAt
column properly defined, but it lacks an index that would optimize the specific query that filters by memberId
and performs MAX(createdAt)
. Current indexes:
- A unique constraint on (
memberId
,segmentId
) - An index on
segmentId
To optimize the query performance, you should add a composite index on (memberId
, createdAt
).
🔗 Analysis chain
Verify database schema and indexing.
Let's verify the existence of the createdAt
column and check if there's an index to optimize the MAX aggregation query.
Let me generate another verification script to check for indexes specifically on the memberSegmentsAgg
table, as the previous results didn't show them clearly.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check schema and indexes for memberSegmentsAgg table
# Check if createdAt column exists and its definition
echo "Checking createdAt column definition:"
rg -A 5 '"memberSegmentsAgg".*CREATE TABLE|ALTER TABLE.*"memberSegmentsAgg".*ADD.*"createdAt"'
# Check for indexes on memberId and createdAt
echo "Checking indexes:"
rg -A 5 'CREATE.*INDEX.*ON.*"memberSegmentsAgg".*\("memberId"|"createdAt"\)'
Length of output: 10157
Script:
#!/bin/bash
# Search for memberSegmentsAgg table creation and its indexes
echo "Checking memberSegmentsAgg table creation and indexes:"
rg -A 10 'CREATE TABLE.*"memberSegmentsAgg"|CREATE.*INDEX.*"memberSegmentsAgg"'
# Search for any migration file containing memberSegmentsAgg
echo "Checking migrations containing memberSegmentsAgg:"
fd -e sql . | xargs rg "memberSegmentsAgg"
Length of output: 2540
export async function syncOrganization(organizationId: string, syncStart: Date): Promise<void> { | ||
const syncApi = new SearchSyncApiClient({ | ||
baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'], | ||
}) | ||
|
||
await syncApi.triggerOrganizationSync(organizationId) | ||
await syncApi.triggerOrganizationMembersSync(null, organizationId) | ||
await syncApi.triggerOrganizationMembersSync(null, organizationId, syncStart) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling and parameter validation.
The function lacks error handling for API calls and parameter validation for the new syncStart
parameter. This could lead to silent failures or unexpected behavior.
Consider applying these improvements:
-export async function syncOrganization(organizationId: string, syncStart: Date): Promise<void> {
+/**
+ * Synchronizes an organization and its members
+ * @param organizationId - The ID of the organization to sync
+ * @param syncStart - The timestamp to use as the starting point for member sync
+ * @throws {Error} If the sync operations fail
+ */
+export async function syncOrganization(organizationId: string, syncStart: Date): Promise<void> {
+ if (!syncStart || !(syncStart instanceof Date)) {
+ throw new Error('syncStart must be a valid Date object');
+ }
+
const syncApi = new SearchSyncApiClient({
baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'],
})
- await syncApi.triggerOrganizationSync(organizationId)
- await syncApi.triggerOrganizationMembersSync(null, organizationId, syncStart)
+ try {
+ await syncApi.triggerOrganizationSync(organizationId)
+ await syncApi.triggerOrganizationMembersSync(null, organizationId, syncStart)
+ } catch (error) {
+ throw new Error(`Failed to sync organization ${organizationId}: ${error.message}`);
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
export async function syncOrganization(organizationId: string, syncStart: Date): Promise<void> { | |
const syncApi = new SearchSyncApiClient({ | |
baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'], | |
}) | |
await syncApi.triggerOrganizationSync(organizationId) | |
await syncApi.triggerOrganizationMembersSync(null, organizationId) | |
await syncApi.triggerOrganizationMembersSync(null, organizationId, syncStart) | |
/** | |
* Synchronizes an organization and its members | |
* @param organizationId - The ID of the organization to sync | |
* @param syncStart - The timestamp to use as the starting point for member sync | |
* @throws {Error} If the sync operations fail | |
*/ | |
export async function syncOrganization(organizationId: string, syncStart: Date): Promise<void> { | |
if (!syncStart || !(syncStart instanceof Date)) { | |
throw new Error('syncStart must be a valid Date object'); | |
} | |
const syncApi = new SearchSyncApiClient({ | |
baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'], | |
}) | |
try { | |
await syncApi.triggerOrganizationSync(organizationId) | |
await syncApi.triggerOrganizationMembersSync(null, organizationId, syncStart) | |
} catch (error) { | |
throw new Error(`Failed to sync organization ${organizationId}: ${error.message}`); | |
} | |
} |
public async triggerOrganizationMembersSync( | ||
tenantId: string, | ||
organizationId: string, | ||
syncFrom: Date | null = null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Update needed: triggerOrganizationMembersSync
calls need to be aligned with the new signature
The verification revealed inconsistencies in the usage of triggerOrganizationMembersSync
:
- In
backend/src/services/searchSyncService.ts
: The method is called with only two parameters (tenantId, organizationId
) and a hardcodedfalse
- In
services/apps/entity_merging_worker/src/activities/organizations.ts
: The method is called withnull
astenantId
and includessyncStart
- In
services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts
: The method signature needs to be checked for alignment
Please update all call sites to match the new signature with the optional syncFrom
parameter.
🔗 Analysis chain
LGTM! Verify the parameter usage across the codebase.
The addition of the optional syncFrom
parameter with a default value maintains backward compatibility while enabling optimized syncing.
Let's verify the parameter usage across the codebase:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify all calls to triggerOrganizationMembersSync to ensure they're updated correctly
# Search for method calls
ast-grep --pattern 'triggerOrganizationMembersSync($$$)'
# Search for the route handler to verify parameter handling
rg -A 5 "'/sync/organization/members'"
Length of output: 1581
Script:
#!/bin/bash
# Let's search for actual calls to the method in TypeScript/JavaScript files
rg -t ts -t js "triggerOrganizationMembersSync" -B 2 -A 2
# Also search for any test files that might be using this method
fd -e test.ts -e spec.ts -X rg "triggerOrganizationMembersSync"
Length of output: 2357
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good - ts build is failing tho
9d2aeaf
to
2358940
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (2)
services/apps/entity_merging_worker/src/workflows/all.ts (1)
Line range hint
27-146
: Consider applying syncStart pattern to member workflowsThe
syncStart
parameter has been added to organization workflows but not to member workflows (finishMemberMerging
andfinishMemberUnmerging
). For consistency and to prevent similar timing issues, consider applying the same pattern to member synchronization.This would provide consistent behavior across all entity types and prevent potential timing issues when dealing with large member merges in the future.
backend/src/services/searchSyncService.ts (1)
Line range hint
88-94
: Consider adding sync date parameter for optimization.Based on the PR objectives to avoid re-syncing already synced members, this method should accept a
syncStart
parameter to optimize the synchronization process for large organizations.Consider updating the method signature and implementation:
- async triggerOrganizationMembersSync(tenantId: string, organizationId: string) { + async triggerOrganizationMembersSync(tenantId: string, organizationId: string, syncStart?: Date) { const client = await this.getSearchSyncClient() if (client instanceof SearchSyncApiClient || client instanceof SearchSyncWorkerEmitter) { await this.logExecutionTime( - () => client.triggerOrganizationMembersSync(tenantId, organizationId), + () => client.triggerOrganizationMembersSync(tenantId, organizationId, syncStart), `triggerOrganizationMembersSync: tenant:${tenantId}, organization:${organizationId}`, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
backend/src/database/migrations/V1732118484__members-segments-agg-created-at.sql
(1 hunks)backend/src/services/searchSyncService.ts
(1 hunks)services/apps/entity_merging_worker/src/activities/organizations.ts
(1 hunks)services/apps/entity_merging_worker/src/workflows/all.ts
(3 hunks)services/apps/search_sync_api/src/routes/member.ts
(1 hunks)services/libs/data-access-layer/src/members/segments.ts
(1 hunks)services/libs/opensearch/src/apiClient.ts
(1 hunks)services/libs/opensearch/src/service/member.sync.service.ts
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- backend/src/database/migrations/V1732118484__members-segments-agg-created-at.sql
- services/apps/entity_merging_worker/src/activities/organizations.ts
- services/apps/search_sync_api/src/routes/member.ts
- services/libs/data-access-layer/src/members/segments.ts
- services/libs/opensearch/src/apiClient.ts
- services/libs/opensearch/src/service/member.sync.service.ts
🔇 Additional comments (2)
services/apps/entity_merging_worker/src/workflows/all.ts (2)
24-24
: Verify impact of increased timeout on other activities
While the increased timeout addresses the organization merging issues, this change affects all proxy activities. We should ensure this doesn't mask potential performance issues in other operations.
Consider splitting the proxy activities into groups with different timeouts based on their expected duration, rather than using a single timeout for all activities.
102-103
: Consider potential race conditions with syncStart timing
The syncStart
timestamp is created immediately before the sync operation. However, there could be a slight delay between timestamp creation and the actual start of synchronization, potentially missing updates that occur in this window.
Consider creating the timestamp at the start of the workflow and passing it through, or implementing a small buffer (e.g., subtract a few seconds from syncStart) to ensure no updates are missed.
✅ Verification successful
Let me check how syncStart
is used in syncOrganization
to better understand the timing implications.
Let me check how the syncStart
timestamp is used in the member sync operation since that's where timing would be critical.
Let me check the implementation of the sync API endpoint to understand how the syncFrom
parameter is used in filtering members.
Let me check the implementation of memberSyncService.syncOrganizationMembers
to understand how the syncFrom
parameter is used to filter members.
Based on the implementation, I can now provide a final response about the race condition concern:
Race condition with syncStart is properly handled
The implementation shows that the syncStart
timestamp is correctly used as a filter in the member sync process. The sync service checks if a member was already synced after the provided syncStart
timestamp and skips re-syncing in such cases (lines 377-386 in member.sync.service.ts). This prevents any race conditions as:
- The timestamp is passed through to the actual sync operation
- The sync service explicitly checks the last sync date against the provided timestamp
- Members synced after the provided timestamp are intentionally skipped
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other uses of syncStart to understand timing implications
rg -A 5 "syncStart.*=.*new Date\(\)"
Length of output: 1250
Script:
#!/bin/bash
# Search for syncOrganization implementation to understand how syncStart is used
ast-grep --pattern 'function syncOrganization($_,$_) {
$$$
}'
# Also search for any direct references to understand the context
rg -A 10 "function syncOrganization"
Length of output: 4057
Script:
#!/bin/bash
# Search for triggerOrganizationMembersSync implementation or usage to understand how syncStart affects member syncing
rg -A 10 "triggerOrganizationMembersSync"
# Also check for any SearchSyncApiClient related files that might contain the implementation
fd SearchSyncApiClient
Length of output: 4496
Script:
#!/bin/bash
# Search for the sync API endpoint implementation
rg -A 10 "/sync/organization/members"
# Also look for any member sync related implementations
ast-grep --pattern 'class $_ {
$$$
sync($$$) {
$$$
}
$$$
}'
Length of output: 1837
Script:
#!/bin/bash
# Search for memberSyncService implementation
rg -A 15 "syncOrganizationMembers.*\("
# Also look for any member sync related service implementations
ast-grep --pattern 'class MemberSyncService {
$$$
}'
Length of output: 65334
const syncStart = new Date() | ||
await syncOrganization(primaryId, syncStart) | ||
await syncOrganization(secondaryId, syncStart) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Evaluate shared syncStart timestamp for sequential syncs
Both organizations use the same syncStart
timestamp, but the syncs are performed sequentially. This means the second sync (secondaryId
) might miss updates that occurred during the first sync (primaryId
).
Consider this alternative implementation:
- const syncStart = new Date()
- await syncOrganization(primaryId, syncStart)
- await syncOrganization(secondaryId, syncStart)
+ const primarySyncStart = new Date()
+ await syncOrganization(primaryId, primarySyncStart)
+ const secondarySyncStart = new Date()
+ await syncOrganization(secondaryId, secondarySyncStart)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const syncStart = new Date() | |
await syncOrganization(primaryId, syncStart) | |
await syncOrganization(secondaryId, syncStart) | |
const primarySyncStart = new Date() | |
await syncOrganization(primaryId, primarySyncStart) | |
const secondarySyncStart = new Date() | |
await syncOrganization(secondaryId, secondarySyncStart) |
if (client instanceof SearchSyncApiClient || client instanceof SearchSyncWorkerEmitter) { | ||
await this.logExecutionTime( | ||
() => client.triggerOrganizationMembersSync(tenantId, organizationId, false), | ||
() => client.triggerOrganizationMembersSync(tenantId, organizationId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
The parameter removal needs to be reverted
Based on the codebase analysis, the triggerOrganizationMembersSync
method in the merging worker is passing a syncStart
parameter which is being removed in this change. This parameter is important for the PR's objective of avoiding re-syncing already synced members.
services/apps/entity_merging_worker/src/activities/organizations.ts
: UsessyncStart
parameter- The removal of this parameter will break the merging worker's functionality
🔗 Analysis chain
Verify the removal of the parameter.
The removal of a parameter from triggerOrganizationMembersSync
call needs verification to ensure it doesn't break existing functionality.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other implementations/usages of triggerOrganizationMembersSync
# to verify the parameter removal is consistent
# Check the interface/class definition
ast-grep --pattern 'interface SearchSyncWorkerEmitter {
$$$
triggerOrganizationMembersSync($$$): $$$
$$$
}'
# Check the implementation
ast-grep --pattern 'class $_ implements SearchSyncWorkerEmitter {
$$$
triggerOrganizationMembersSync($$$) {
$$$
}
$$$
}'
Length of output: 250
Script:
#!/bin/bash
# Let's try a different approach using ripgrep to find all occurrences
# of triggerOrganizationMembersSync in the codebase
# Search for method definition and usages
rg "triggerOrganizationMembersSync" -A 3 -B 3
# Also search for the interface name to ensure we don't miss any definitions
rg "SearchSyncWorkerEmitter" -A 5 -B 5
Length of output: 51910
Script:
#!/bin/bash
# Let's check the implementation of triggerOrganizationMembersSync in the API client
rg "triggerOrganizationMembersSync.*\{" -A 10 "services/libs/opensearch/src/apiClient.ts"
# Also check the implementation in the worker emitter
rg "triggerOrganizationMembersSync.*\{" -A 10 "services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts"
# Check the usage in the merging worker where syncStart is passed
rg "triggerOrganizationMembersSync.*syncStart" -A 3 -B 3
Length of output: 964
Script:
#!/bin/bash
# Let's try a more specific pattern to find the method implementations
rg "public.*triggerOrganizationMembersSync.*\{" -A 10
# Also check for the interface definition
rg "interface.*\{" -A 15 -B 2 "services/libs/opensearch/src/apiClient.ts"
Length of output: 501
2358940
to
da75c54
Compare
So that if there are too many members, and the sync fails, we don't retry syncing all several hundred members again and again, never actually finishing all of them
We'll get to it whenever we calculate aggregates anyway. This query only takes a lot of unnecessary time and tends to timeout
da75c54
to
27f5575
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (2)
services/libs/opensearch/src/service/member.sync.service.ts (1)
383-392
: LGTM! Efficient skip logic implementationThe implementation correctly prevents unnecessary re-syncing of member aggregates by comparing the last sync date with the provided syncFrom date.
Consider extracting the date comparison to a reusable helper function for better maintainability:
-if (lastSyncDate && lastSyncDate.getTime() > opts.syncFrom.getTime()) { +const isMoreRecent = (date1: Date, date2: Date): boolean => date1.getTime() > date2.getTime(); +if (lastSyncDate && isMoreRecent(lastSyncDate, opts.syncFrom)) {services/libs/opensearch/src/repo/member.repo.ts (1)
Line range hint
97-219
: Optimize the Complex SQL Query ingetMemberData
MethodThe SQL query in the
getMemberData
method utilizes multiple Common Table Expressions (CTEs) and aggregates, which may impact performance, especially with large datasets.Consider the following optimizations:
- Indexing: Ensure that the database tables involved have appropriate indexes on the columns used in JOIN and WHERE clauses to improve query execution time.
- Query Simplification: Evaluate if all CTEs and joins are necessary for the required data. If certain data can be fetched separately or is not critical, consider removing or deferring those parts of the query.
- Data Caching: Frequently accessed data that doesn't change often can be cached to reduce database load.
- Pagination: If applicable, implement pagination to limit the amount of data processed and returned.
These optimizations can enhance the performance and scalability of the application.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
services/libs/opensearch/src/repo/member.repo.ts
(4 hunks)services/libs/opensearch/src/service/member.sync.service.ts
(4 hunks)
🔇 Additional comments (4)
services/libs/opensearch/src/service/member.sync.service.ts (2)
13-13
: LGTM!
The addition of findLastSyncDate
import aligns with the new functionality for tracking sync dates.
318-318
: Performance improvement: Increased batch size
Increasing the batch size from 200 to 500 will help process large organizations more efficiently, reducing the number of database round trips.
services/libs/opensearch/src/repo/member.repo.ts (2)
265-265
:
Fix Missing FROM
Clause in SQL Query
In the checkMembersExists
method, the SQL query is missing the FROM
clause, which will result in a syntax error during execution.
Apply this diff to include the missing FROM
clause:
return await this.db().any(
`
- select m.id as "memberId", m."manuallyCreated"
+ select m.id as "memberId", m."manuallyCreated"
+ from members m
where m."tenantId" = $(tenantId) and
m.id in ($(memberIds:csv)) and
exists(select 1 from "memberIdentities" mi where mi."memberId" = m.id)
`,
{
tenantId,
memberIds,
},
);
Likely invalid or redundant comment.
67-68
: Ensure All Calls to getOrganizationMembersForSync
Include the New syncFrom
Parameter
The method getOrganizationMembersForSync
now accepts an optional syncFrom
parameter. Please verify that all calls to this method throughout the codebase are updated to include this parameter where necessary to prevent unexpected behavior or errors.
Run the following script to find all usages of getOrganizationMembersForSync
and check if they handle the syncFrom
parameter:
✅ Verification successful
Let me check the implementation and usage context more thoroughly.
Let me check the full context of the sync service to understand how the syncFrom
parameter is handled.
The syncFrom
Parameter is Correctly Handled - No Changes Required
The syncFrom
parameter is correctly propagated in the only place where getOrganizationMembersForSync
is called. In member.sync.service.ts
, the method is called within syncOrganizationMembers
where opts.syncFrom
is properly passed through to getOrganizationMembersForSync
. The parameter is optional by design and the implementation correctly handles both cases where it's provided and where it's not.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Locate all calls to `getOrganizationMembersForSync` and display their contexts.
rg 'getOrganizationMembersForSync\(' -A 2
Length of output: 576
Script:
#!/bin/bash
# Get the implementation and usage with more context
rg 'getOrganizationMembersForSync' -B 5 -A 10
Length of output: 3168
Script:
#!/bin/bash
# Get the full context of member.sync.service.ts to understand how opts.syncFrom is initialized
cat services/libs/opensearch/src/service/member.sync.service.ts
Length of output: 20547
public async syncOrganizationMembers( | ||
organizationId: string, | ||
opts: { syncFrom: Date | null } = { syncFrom: null }, | ||
): Promise<void> { | ||
this.log.debug({ organizationId }, 'Syncing all organization members!') | ||
const batchSize = 500 | ||
let docCount = 0 | ||
let memberCount = 0 | ||
|
||
const now = new Date() | ||
|
||
const loadNextPage = async (lastId?: string): Promise<string[]> => { | ||
this.log.info('Loading next page of organization members!', { organizationId, lastId }) | ||
const memberIdData = await logExecutionTimeV2( | ||
() => this.memberRepo.getOrganizationMembersForSync(organizationId, batchSize, lastId), | ||
const memberIds = await logExecutionTimeV2( | ||
() => | ||
this.memberRepo.getOrganizationMembersForSync( | ||
organizationId, | ||
batchSize, | ||
lastId, | ||
opts.syncFrom, | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Based on the search results, I can now generate the final response:
Update required for search_sync_worker's syncOrganizationMembers call
The syncFrom
parameter is correctly propagated in the search_sync_api implementation, but the search_sync_worker's call at services/apps/search_sync_worker/src/queue/index.ts
needs to be updated to include the new syncFrom
parameter:
services/apps/search_sync_worker/src/queue/index.ts
: Update.syncOrganizationMembers(data.organizationId)
to include the options parameter
Note: The syncOrganizationMembers
implementation in integration_sync_worker is a different method with different parameters, so it's not affected by this change.
🔗 Analysis chain
Verify the propagation of syncFrom parameter
The syncFrom parameter is correctly propagated to downstream calls. Let's verify there are no other callers that need to be updated.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for direct calls to getOrganizationMembersForSync to ensure all callers pass the syncFrom parameter
rg "getOrganizationMembersForSync" -A 5 -B 5
# Search for calls to syncOrganizationMembers to verify they're updated to use the new parameter
rg "syncOrganizationMembers" -A 5 -B 5
Length of output: 11134
const rows = await this.db().any( | ||
` | ||
select distinct mo."memberId", m."manuallyCreated" | ||
from "memberOrganizations" mo | ||
inner join members m on mo."memberId" = m.id | ||
where mo."organizationId" = $(organizationId) and | ||
mo."deletedAt" is null and | ||
${lastId !== undefined ? 'mo."memberId" > $(lastId) and' : ''} | ||
m."deletedAt" is null and | ||
SELECT | ||
DISTINCT mo."memberId" | ||
FROM "memberOrganizations" mo | ||
INNER JOIN members m ON mo."memberId" = m.id | ||
${syncFrom !== undefined ? 'LEFT JOIN "memberSegmentsAgg" msa ON m.id = msa."memberId"' : ''} | ||
WHERE mo."organizationId" = $(organizationId) AND | ||
mo."deletedAt" is null AND | ||
${syncFrom !== undefined ? '(msa."createdAt" < $(syncFrom) OR msa."createdAt" IS NULL) AND' : ''} | ||
${lastId !== undefined ? 'mo."memberId" > $(lastId) AND' : ''} | ||
m."deletedAt" is null AND | ||
exists (select 1 from "memberIdentities" where "memberId" = mo."memberId") | ||
order by mo."memberId" | ||
limit ${perPage};`, | ||
ORDER BY mo."memberId" | ||
LIMIT ${perPage}; | ||
`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid SQL Injection Risks with Safe Query Construction
The SQL query is conditionally constructed using template literals and string interpolation, which can introduce SQL injection risks if not handled carefully. Even though parameters are used, it's safer to use query builders or parameterized query methods to construct SQL queries dynamically.
Consider refactoring the query to build the conditional clauses securely. Here's an example using parameterized conditions:
const conditions = [
'mo."organizationId" = $(organizationId)',
'mo."deletedAt" IS NULL',
'm."deletedAt" IS NULL',
'EXISTS (SELECT 1 FROM "memberIdentities" WHERE "memberId" = mo."memberId")',
];
if (syncFrom !== undefined) {
conditions.push('(msa."createdAt" < $(syncFrom) OR msa."createdAt" IS NULL)');
}
if (lastId !== undefined) {
conditions.push('mo."memberId" > $(lastId)');
}
const query = `
SELECT DISTINCT mo."memberId"
FROM "memberOrganizations" mo
INNER JOIN members m ON mo."memberId" = m.id
${syncFrom !== undefined ? 'LEFT JOIN "memberSegmentsAgg" msa ON m.id = msa."memberId"' : ''}
WHERE ${conditions.join(' AND ')}
ORDER BY mo."memberId"
LIMIT ${perPage};
`;
const rows = await this.db().any(query, {
organizationId,
syncFrom,
lastId,
});
This approach helps prevent SQL injection and enhances the readability and maintainability of your code.
This PR fixes merging for orgs that have a lot of members.
The actual problem with them is this:
finishOrgMerging
temporal workflow callssearchSyncApi.syncOrgMembers(orgId)
search-sync-api
finds all org members, and sync them one by onefinishOrgMerging
temporal workflow has a limit of 15 minutes for activity executionHow it's fixed:
updatedAt
of rows inmemberSegmentsAgg
finishOrgMerging
temporal workflow from 15 minutes to 1 hourSummary by CodeRabbit
Release Notes
New Features
createdAt
timestamp column to track record creation in the member segments.syncFrom
parameter for improved synchronization control in organization member syncing.Bug Fixes
Documentation