-
Notifications
You must be signed in to change notification settings - Fork 729
Activities sync fixes #2685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Activities sync fixes #2685
Conversation
WalkthroughThe pull request introduces several modifications across multiple files, primarily focusing on enhancing error handling and logging mechanisms. Key changes include the refactoring of the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (8)
backend/src/utils/backoff.ts (2)
4-15
: Consider enhancing error handling and logging.The retry logic could be improved in the following ways:
- Add logging to track retry attempts and errors
- Preserve the last error for better debugging
- Add type checking for the caught error
Consider applying this improvement:
export async function retryBackoff<T>(fn: () => Promise<T>, maxRetries: number = 3): Promise<T> { let retries = 0 + let lastError: unknown while (retries < maxRetries) { try { return await fn() } catch (error) { + lastError = error retries++ + console.warn( + `Attempt ${retries}/${maxRetries} failed:`, + error instanceof Error ? error.message : 'Unknown error' + ) // Exponential backoff with base of 2 seconds // 1st retry: 2s, 2nd: 4s, 3rd: 8s, etc const backoffMs = 2000 * 2 ** (retries - 1) await timeout(backoffMs) } } - throw new Error('Max retries reached') + throw new Error( + `Max retries (${maxRetries}) reached. Last error: ${ + lastError instanceof Error ? lastError.message : 'Unknown error' + }` + ) }
12-13
: Consider adding jitter to prevent thundering herd.The exponential backoff implementation could benefit from adding jitter to prevent multiple retries from different processes synchronizing and causing load spikes.
Consider applying this improvement:
- const backoffMs = 2000 * 2 ** (retries - 1) + const baseDelay = 2000 * 2 ** (retries - 1) + const jitter = Math.random() * 1000 // Random delay between 0-1000ms + const backoffMs = baseDelay + jitter await timeout(backoffMs)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts (1)
32-34
: LGTM! Consider adding debug logging for skipped identities.The validation check is a good defensive programming practice that prevents unnecessary database queries and potential SQL errors.
Consider adding debug logging to track skipped identities:
if (!identity.value) { + this.log.debug('Skipping erasure check for identity with empty value', { type: identity.type, platform: identity.platform }) return null }
backend/src/bin/jobs/syncActivities.ts (2)
36-44
: LGTM: Well-structured function signature refactoringThe refactoring to use object destructuring improves maintainability and makes the function parameters self-documenting. The addition of the logger parameter enables better error tracking.
Consider adding JSDoc documentation to describe the function's purpose and return type:
+/** + * Synchronizes a batch of activities by inserting new ones and updating existing ones. + * @returns {Promise<{inserted: number, updated: number}>} Count of inserted and updated activities + */ async function syncActivitiesBatch({
110-120
: Consider adding monitoring and configurable batch sizeFor a critical data synchronization job, consider these architectural improvements:
- Make the batch size configurable through environment variables
- Add metrics for monitoring sync performance and failure rates
- Consider implementing a dead-letter queue for failed activities
Example configuration:
const BATCH_SIZE = process.env.SYNC_BATCH_SIZE ? parseInt(process.env.SYNC_BATCH_SIZE, 10) : 1000;This would allow for runtime tuning of the batch size based on system performance and load.
services/libs/data-access-layer/src/activities/sql.ts (2)
555-555
: Consider enhancing the log message with query parameters.While logging the query is helpful, including the query parameters (with sensitive data masked) would make debugging easier by providing the complete context of the query execution.
- logger.info('QuestDB activity query', query) + logger.info('QuestDB activity query', { + query, + params: { + ...params, + tenantId: '[MASKED]', // Mask sensitive data + segmentIds: '[MASKED]' + } + })
Line range hint
555-561
: Consider adding error logging for failed queries.The Promise.all execution of queries could benefit from error logging to help diagnose issues in production.
- const [results, countResults] = await Promise.all([ - qdbConn.any(query, params), - qdbConn.query(countQuery, params), - ]) + try { + const [results, countResults] = await Promise.all([ + qdbConn.any(query, params), + qdbConn.query(countQuery, params), + ]) + } catch (error) { + logger.error('Failed to execute activity queries', { + error, + query, + countQuery, + params: { + ...params, + tenantId: '[MASKED]', + segmentIds: '[MASKED]' + } + }) + throw error + }services/apps/data_sink_worker/src/service/activity.service.ts (1)
451-462
: Consider extracting platform-specific error handling logicThe platform-specific error handling logic creates tight coupling between error handling and platform types. Consider extracting this into a separate method or configuration to improve maintainability.
- if (platform === PlatformType.JIRA) { - throw new SuppressedActivityError( - `Activity's member does not have an identity for the platform: ${platform}!`, - ) - } else { - this.log.error( - "Activity's member does not have an identity for the platform. Suppressing it!", - ) - throw new Error( - `Activity's member does not have an identity for the platform: ${platform}!`, - ) - } + const errorMessage = `Activity's member does not have an identity for the platform: ${platform}!`; + if (this.shouldSuppressIdentityError(platform)) { + throw new SuppressedActivityError(errorMessage); + } + this.log.error("Activity's member does not have an identity for the platform. Suppressing it!"); + throw new Error(errorMessage); // Add this private method + private shouldSuppressIdentityError(platform: PlatformType): boolean { + return platform === PlatformType.JIRA; + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
backend/src/bin/jobs/syncActivities.ts
(5 hunks)backend/src/utils/backoff.ts
(1 hunks)services/apps/data_sink_worker/src/queue/index.ts
(0 hunks)services/apps/data_sink_worker/src/service/activity.service.ts
(3 hunks)services/libs/data-access-layer/src/activities/sql.ts
(3 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts
(1 hunks)
💤 Files with no reviewable changes (1)
- services/apps/data_sink_worker/src/queue/index.ts
🔇 Additional comments (9)
backend/src/utils/backoff.ts (3)
1-1
: LGTM!
The import statement is clean and properly imports the required utility.
3-3
: LGTM! Well-designed function signature.
The function signature is type-safe and flexible with the generic type parameter. The default value of 3 for maxRetries is a reasonable choice.
3-18
: Verify error handling strategy for activity sync.
Since this utility is used in syncActivitiesBatch
, we should verify that retrying is appropriate for all error cases and won't cause data inconsistencies.
Let's analyze the error handling in the activity sync implementation:
✅ Verification successful
Let me analyze the error handling in the activity sync implementation. Let me search for the specific usage of this retry mechanism.
Let me check the implementation of syncActivitiesBatch to understand the error handling better.
Based on the analysis of the code, I can now provide a final response:
Retry mechanism is appropriately implemented for activity sync
The retryBackoff
utility is used correctly in the activity sync context:
- It's specifically used for QuestDB query operations which can have transient failures
- The activity sync implementation (
syncActivitiesBatch
) has its own error handling that logs errors per activity without failing the entire batch - Failed activities are logged but don't prevent processing of other activities, ensuring resilience
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check error handling in activity sync implementation
# Look for error types and handling patterns in the activity sync code
# Search for error handling patterns in the activity sync file
rg -A 5 "catch.*Error" "backend/src/bin/jobs/syncActivities.ts"
# Look for specific error types that might need different retry strategies
ast-grep --pattern 'throw new Error($$$)'
Length of output: 60162
Script:
#!/bin/bash
# Search for usage of retryBackoff in activity sync related files
rg -A 5 "retryBackoff" "backend/src/bin/jobs/syncActivities.ts"
# Look for activity sync error handling patterns
rg -A 10 "syncActivitiesBatch" "backend/src/bin/jobs/syncActivities.ts"
Length of output: 1198
Script:
#!/bin/bash
# Look for the implementation of syncActivitiesBatch and its error handling
ast-grep --pattern 'async function syncActivitiesBatch($$$) {
$$$
}'
# Also check for any error handling in the activity repository
rg -A 5 "catch" "backend/src/database/repositories/activityRepository.ts"
Length of output: 2385
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1)
141-141
: Verify the impact of silently ignoring conflicts
While adding on conflict do nothing
aligns with sync improvements, silently ignoring conflicts could mask real errors or lead to data inconsistencies if not properly handled by the calling code.
Consider these architectural improvements:
- Return a boolean indicating whether the insert was successful
- Add logging when conflicts occur
- Consider using
RETURNING
clause to get the existing record's ID on conflict
Here's a script to verify the usage context:
Suggested implementation:
- public async rawInsert(data: IDbActivityCreateData): Promise<void> {
+ public async rawInsert(data: IDbActivityCreateData): Promise<boolean> {
const prepared = RepositoryBase.prepare(data, this.insertActivityColumnSet)
const query = this.dbInstance.helpers.insert(prepared, this.insertActivityColumnSet)
- await this.db().none(`${query} on conflict do nothing`)
+ const result = await this.db().result(`${query} on conflict do nothing`)
+ return result.rowCount > 0
}
✅ Verification successful
Based on the results, I can now generate the final response as we have sufficient context showing how rawInsert
is used in the codebase.
Silently ignoring conflicts is appropriate in this context
The usage in syncActivities.ts
shows that:
- The method is already wrapped in a try-catch block with proper error logging
- The success is tracked via
result.inserted++
- The silent conflict handling aligns with the sync operation's needs, where duplicate activities should be skipped without breaking the sync
The suggested architectural changes are unnecessary as:
- Error handling exists at the caller level
- Success tracking is already implemented
- Logging of errors is properly handled in the sync job
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check how rawInsert is used in the codebase and verify error handling
# Find direct usage of rawInsert
rg -A 5 "rawInsert\("
# Check for error handling patterns around the usage
ast-grep --pattern 'try {
$$$
rawInsert($$$)
$$$
} catch'
Length of output: 1315
backend/src/bin/jobs/syncActivities.ts (2)
14-14
: LGTM: Clean import of retry utility
The import is well-placed and follows the project's import organization pattern.
131-135
: LGTM: Clean function call with new parameter structure
The function call correctly implements the new parameter structure and maintains clean destructuring of return values.
services/libs/data-access-layer/src/activities/sql.ts (1)
7-7
: LGTM: Good addition of structured logging.
The introduction of structured logging using getServiceChildLogger
is a good practice for better observability and debugging capabilities.
Also applies to: 387-388
services/apps/data_sink_worker/src/service/activity.service.ts (2)
45-50
: LGTM: Clean implementation of custom error class
The SuppressedActivityError
class follows TypeScript best practices for custom error handling.
1151-1153
: LGTM: Proper error handling differentiation
Good practice to prevent logging of suppressed errors while still propagating them up the stack.
retryBackoff(() => | ||
qdbQx.select( | ||
` | ||
SELECT * | ||
FROM activities | ||
WHERE "updatedAt" > $(updatedAt) | ||
ORDER BY "updatedAt" | ||
LIMIT 1000; | ||
`, | ||
{ updatedAt }, | ||
{ updatedAt }, | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add explicit error handling for retry failures
The integration of retryBackoff is good for handling transient failures, but we should handle the case when all retries are exhausted.
Consider adding explicit error handling:
retryBackoff(() =>
qdbQx.select(
`
SELECT *
FROM activities
WHERE "updatedAt" > $(updatedAt)
ORDER BY "updatedAt"
LIMIT 1000;
`,
{ updatedAt },
- ),
+ )).catch(error => {
+ logger.error(`Failed to fetch activities after all retries: ${error}`);
+ throw new Error(`Activity fetch failed: ${error.message}`);
+ }),
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
retryBackoff(() => | |
qdbQx.select( | |
` | |
SELECT * | |
FROM activities | |
WHERE "updatedAt" > $(updatedAt) | |
ORDER BY "updatedAt" | |
LIMIT 1000; | |
`, | |
{ updatedAt }, | |
{ updatedAt }, | |
), | |
retryBackoff(() => | |
qdbQx.select( | |
` | |
SELECT * | |
FROM activities | |
WHERE "updatedAt" > $(updatedAt) | |
ORDER BY "updatedAt" | |
LIMIT 1000; | |
`, | |
{ updatedAt }, | |
)).catch(error => { | |
logger.error(`Failed to fetch activities after all retries: ${error}`); | |
throw new Error(`Activity fetch failed: ${error.message}`); | |
}), |
try { | ||
if (existingActivity) { | ||
await activityRepo.rawUpdate(activity.id, { | ||
...activity, | ||
platform: activity.platform as PlatformType, | ||
}) | ||
result.updated++ | ||
} else { | ||
await activityRepo.rawInsert(activity) | ||
result.inserted++ | ||
} | ||
} catch (error) { | ||
logger.error(`Error syncing activity ${activity.id}: ${error}`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling strategy
While adding error handling is good, the current implementation has some potential issues:
- Errors are logged but swallowed, which could hide systematic problems
- No specific error type handling for different failure scenarios
- No retry mechanism for transient failures
Consider implementing more robust error handling:
try {
if (existingActivity) {
await activityRepo.rawUpdate(activity.id, {
...activity,
platform: activity.platform as PlatformType,
})
result.updated++
} else {
await activityRepo.rawInsert(activity)
result.inserted++
}
} catch (error) {
- logger.error(`Error syncing activity ${activity.id}: ${error}`)
+ // Determine error type for specific handling
+ if (error instanceof DatabaseError) {
+ logger.error(`Database error syncing activity ${activity.id}: ${error}`)
+ // Consider retrying on specific database errors
+ if (error.code === '40001') { // Serialization failure
+ throw error; // Allow retry at a higher level
+ }
+ } else {
+ logger.error(`Unexpected error syncing activity ${activity.id}: ${error}`)
+ // Track failed activities for reporting
+ result.failed = (result.failed || 0) + 1
+ }
}
Committable suggestion skipped: line range outside the PR's diff.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Chores