Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions backend/src/bin/jobs/syncActivities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { PlatformType } from '@crowd/types'
import { DB_CONFIG } from '@/conf'

import { CrowdJob } from '../../types/jobTypes'
import { retryBackoff } from '../../utils/backoff'

async function decideUpdatedAt(pgQx: QueryExecutor, maxUpdatedAt?: string): Promise<string> {
if (!maxUpdatedAt) {
Expand All @@ -32,10 +33,15 @@ function createWhereClause(updatedAt: string): string {
return formatQuery('"updatedAt" > $(updatedAt)', { updatedAt })
}

async function syncActivitiesBatch(
activityRepo: ActivityRepository,
activities: IDbActivityCreateData[],
) {
async function syncActivitiesBatch({
logger,
activityRepo,
activities,
}: {
logger: Logger
activityRepo: ActivityRepository
activities: IDbActivityCreateData[]
}) {
const result = {
inserted: 0,
updated: 0,
Expand All @@ -44,15 +50,19 @@ async function syncActivitiesBatch(
for (const activity of activities) {
const existingActivity = await activityRepo.existsWithId(activity.id)

if (existingActivity) {
await activityRepo.rawUpdate(activity.id, {
...activity,
platform: activity.platform as PlatformType,
})
result.updated++
} else {
await activityRepo.rawInsert(activity)
result.inserted++
try {
if (existingActivity) {
await activityRepo.rawUpdate(activity.id, {
...activity,
platform: activity.platform as PlatformType,
})
result.updated++
} else {
await activityRepo.rawInsert(activity)
result.inserted++
}
} catch (error) {
logger.error(`Error syncing activity ${activity.id}: ${error}`)
Comment on lines +53 to +65
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling strategy

While adding error handling is good, the current implementation has some potential issues:

  1. Errors are logged but swallowed, which could hide systematic problems
  2. No specific error type handling for different failure scenarios
  3. No retry mechanism for transient failures

Consider implementing more robust error handling:

 try {
   if (existingActivity) {
     await activityRepo.rawUpdate(activity.id, {
       ...activity,
       platform: activity.platform as PlatformType,
     })
     result.updated++
   } else {
     await activityRepo.rawInsert(activity)
     result.inserted++
   }
 } catch (error) {
-  logger.error(`Error syncing activity ${activity.id}: ${error}`)
+  // Determine error type for specific handling
+  if (error instanceof DatabaseError) {
+    logger.error(`Database error syncing activity ${activity.id}: ${error}`)
+    // Consider retrying on specific database errors
+    if (error.code === '40001') { // Serialization failure
+      throw error; // Allow retry at a higher level
+    }
+  } else {
+    logger.error(`Unexpected error syncing activity ${activity.id}: ${error}`)
+    // Track failed activities for reporting
+    result.failed = (result.failed || 0) + 1
+  }
 }

Committable suggestion skipped: line range outside the PR's diff.

}
}

Expand Down Expand Up @@ -97,15 +107,17 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
const result = await logExecutionTimeV2(
// eslint-disable-next-line @typescript-eslint/no-loop-func
() =>
qdbQx.select(
`
retryBackoff(() =>
qdbQx.select(
`
SELECT *
FROM activities
WHERE "updatedAt" > $(updatedAt)
ORDER BY "updatedAt"
LIMIT 1000;
`,
{ updatedAt },
{ updatedAt },
),
Comment on lines +110 to +120
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add explicit error handling for retry failures

The integration of retryBackoff is good for handling transient failures, but we should handle the case when all retries are exhausted.

Consider adding explicit error handling:

 retryBackoff(() =>
   qdbQx.select(
     `
     SELECT *
     FROM activities
     WHERE "updatedAt" > $(updatedAt)
     ORDER BY "updatedAt"
     LIMIT 1000;
   `,
     { updatedAt },
-  ),
+  )).catch(error => {
+    logger.error(`Failed to fetch activities after all retries: ${error}`);
+    throw new Error(`Activity fetch failed: ${error.message}`);
+  }),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
retryBackoff(() =>
qdbQx.select(
`
SELECT *
FROM activities
WHERE "updatedAt" > $(updatedAt)
ORDER BY "updatedAt"
LIMIT 1000;
`,
{ updatedAt },
{ updatedAt },
),
retryBackoff(() =>
qdbQx.select(
`
SELECT *
FROM activities
WHERE "updatedAt" > $(updatedAt)
ORDER BY "updatedAt"
LIMIT 1000;
`,
{ updatedAt },
)).catch(error => {
logger.error(`Failed to fetch activities after all retries: ${error}`);
throw new Error(`Activity fetch failed: ${error.message}`);
}),

),
logger,
`getting activities with updatedAt > ${updatedAt}`,
Expand All @@ -116,7 +128,11 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
}

const t = timer(logger)
const { inserted, updated } = await syncActivitiesBatch(activityRepo, result)
const { inserted, updated } = await syncActivitiesBatch({
logger,
activityRepo,
activities: result,
})
t.end(`Inserting ${inserted} and updating ${updated} activities`)

counter += inserted + updated
Expand Down
18 changes: 18 additions & 0 deletions backend/src/utils/backoff.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { timeout } from '@crowd/common'

export async function retryBackoff<T>(fn: () => Promise<T>, maxRetries: number = 3): Promise<T> {
let retries = 0
while (retries < maxRetries) {
try {
return await fn()
} catch (error) {
retries++
// Exponential backoff with base of 2 seconds
// 1st retry: 2s, 2nd: 4s, 3rd: 8s, etc
const backoffMs = 2000 * 2 ** (retries - 1)
await timeout(backoffMs)
}
}

throw new Error('Max retries reached')
}
25 changes: 0 additions & 25 deletions services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { performance } from 'perf_hooks'

import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services'
import { DbConnection, DbStore } from '@crowd/data-access-layer/src/database'
import { Logger } from '@crowd/logging'
Expand All @@ -17,8 +15,6 @@ import {
import DataSinkService from '../service/dataSink.service'

export class WorkerQueueReceiver extends PrioritizedQueueReciever {
private readonly timingMap = new Map<string, { count: number; time: number }>()

constructor(
level: QueuePriorityLevel,
client: IQueue,
Expand All @@ -43,8 +39,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
}

override async processMessage(message: IQueueMessage): Promise<void> {
const startTime = performance.now()

try {
this.log.trace({ messageType: message.type }, 'Processing message!')

Expand Down Expand Up @@ -83,25 +77,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
} catch (err) {
this.log.error(err, 'Error while processing message!')
throw err
} finally {
const endTime = performance.now()
const duration = endTime - startTime
this.log.info({ msgType: message.type }, `Message processed in ${duration.toFixed(2)}ms!`)

if (this.timingMap.has(message.type)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const data = this.timingMap.get(message.type)!
this.timingMap.set(message.type, { count: data.count + 1, time: data.time + duration })
} else {
this.timingMap.set(message.type, { count: 1, time: duration })
}

const data = this.timingMap.get(message.type)

this.log.info(
{ msgType: message.type },
`Average processing time: ${data.time / data.count}ms!`,
)
}
}
}
27 changes: 22 additions & 5 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ import { IActivityCreateData, IActivityUpdateData, ISentimentActivityInput } fro
import MemberService from './member.service'
import MemberAffiliationService from './memberAffiliation.service'

export class SuppressedActivityError extends Error {
constructor(message: string) {
super(message)
this.name = 'SuppressedActivityError'
}
}

export default class ActivityService extends LoggerBase {
constructor(
private readonly pgStore: DbStore,
Expand Down Expand Up @@ -441,10 +448,18 @@ export default class ActivityService extends LoggerBase {
(i) => i.platform === platform && i.type === MemberIdentityType.USERNAME,
)
if (!identity) {
this.log.error("Activity's member does not have an identity for the platform.")
throw new Error(
`Activity's member does not have an identity for the platform: ${platform}!`,
)
if (platform === PlatformType.JIRA) {
throw new SuppressedActivityError(
`Activity's member does not have an identity for the platform: ${platform}!`,
)
} else {
this.log.error(
"Activity's member does not have an identity for the platform. Suppressing it!",
)
throw new Error(
`Activity's member does not have an identity for the platform: ${platform}!`,
)
}
}

username = identity.value
Expand Down Expand Up @@ -1133,7 +1148,9 @@ export default class ActivityService extends LoggerBase {
await this.redisClient.sAdd('organizationIdsForAggComputation', organizationId)
}
} catch (err) {
this.log.error(err, 'Error while processing an activity!')
if (!(err instanceof SuppressedActivityError)) {
this.log.error(err, 'Error while processing an activity!')
}
throw err
}
}
Expand Down
5 changes: 4 additions & 1 deletion services/libs/data-access-layer/src/activities/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import merge from 'lodash.merge'
import { RawQueryParser, getEnv } from '@crowd/common'
import { DbConnOrTx } from '@crowd/database'
import { ActivityDisplayService } from '@crowd/integrations'
import { getServiceChildLogger } from '@crowd/logging'
import {
ActivityDisplayVariant,
IActivityBySentimentMoodResult,
Expand Down Expand Up @@ -383,6 +384,8 @@ export const ALL_COLUMNS_TO_SELECT: ActivityColumn[] = DEFAULT_COLUMNS_TO_SELECT
'gitIsMerge',
])

const logger = getServiceChildLogger('activities')

export async function queryActivities(
qdbConn: DbConnOrTx,
arg: IQueryActivitiesParameters,
Expand Down Expand Up @@ -549,7 +552,7 @@ export async function queryActivities(

query += ';'

console.log('QuestDB activity query', query)
logger.info('QuestDB activity query', query)

const [results, countResults] = await Promise.all([
qdbConn.any(query, params),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,6 @@ export default class ActivityRepository extends RepositoryBase<ActivityRepositor
public async rawInsert(data: IDbActivityCreateData): Promise<void> {
const prepared = RepositoryBase.prepare(data, this.insertActivityColumnSet)
const query = this.dbInstance.helpers.insert(prepared, this.insertActivityColumnSet)
await this.db().none(query)
await this.db().none(`${query} on conflict do nothing`)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ export default class RequestedForErasureMemberIdentitiesRepository extends Repos
private async wasIdentityRequestedForErasure(
identity: IMemberIdentity,
): Promise<{ id: string } | null> {
if (!identity.value) {
return null
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const params: any = {
type: identity.type,
Expand Down
Loading