Skip to content

Commit f53ca10

Browse files
Refactor user permission syncing to happen on the Account level
1 parent 26ec7af commit f53ca10

File tree

7 files changed

+213
-143
lines changed

7 files changed

+213
-143
lines changed
Lines changed: 102 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as Sentry from "@sentry/node";
2-
import { PrismaClient, User, UserPermissionSyncJobStatus } from "@sourcebot/db";
2+
import { PrismaClient, AccountPermissionSyncJobStatus, Account } from "@sourcebot/db";
33
import { createLogger } from "@sourcebot/logger";
44
import { Job, Queue, Worker } from "bullmq";
55
import { Redis } from "ioredis";
@@ -14,27 +14,26 @@ const LOG_TAG = 'user-permission-syncer';
1414
const logger = createLogger(LOG_TAG);
1515
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
1616

17-
const QUEUE_NAME = 'userPermissionSyncQueue';
17+
const QUEUE_NAME = 'accountPermissionSyncQueue';
1818

19-
type UserPermissionSyncJob = {
19+
type AccountPermissionSyncJob = {
2020
jobId: string;
2121
}
2222

23-
24-
export class UserPermissionSyncer {
25-
private queue: Queue<UserPermissionSyncJob>;
26-
private worker: Worker<UserPermissionSyncJob>;
23+
export class AccountPermissionSyncer {
24+
private queue: Queue<AccountPermissionSyncJob>;
25+
private worker: Worker<AccountPermissionSyncJob>;
2726
private interval?: NodeJS.Timeout;
2827

2928
constructor(
3029
private db: PrismaClient,
3130
private settings: Settings,
3231
redis: Redis,
3332
) {
34-
this.queue = new Queue<UserPermissionSyncJob>(QUEUE_NAME, {
33+
this.queue = new Queue<AccountPermissionSyncJob>(QUEUE_NAME, {
3534
connection: redis,
3635
});
37-
this.worker = new Worker<UserPermissionSyncJob>(QUEUE_NAME, this.runJob.bind(this), {
36+
this.worker = new Worker<AccountPermissionSyncJob>(QUEUE_NAME, this.runJob.bind(this), {
3837
connection: redis,
3938
concurrency: 1,
4039
});
@@ -52,16 +51,12 @@ export class UserPermissionSyncer {
5251
this.interval = setInterval(async () => {
5352
const thresholdDate = new Date(Date.now() - this.settings.experiment_userDrivenPermissionSyncIntervalMs);
5453

55-
const users = await this.db.user.findMany({
54+
const accounts = await this.db.account.findMany({
5655
where: {
5756
AND: [
5857
{
59-
accounts: {
60-
some: {
61-
provider: {
62-
in: PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES
63-
}
64-
}
58+
provider: {
59+
in: PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES
6560
}
6661
},
6762
{
@@ -79,15 +74,15 @@ export class UserPermissionSyncer {
7974
{
8075
status: {
8176
in: [
82-
UserPermissionSyncJobStatus.PENDING,
83-
UserPermissionSyncJobStatus.IN_PROGRESS,
77+
AccountPermissionSyncJobStatus.PENDING,
78+
AccountPermissionSyncJobStatus.IN_PROGRESS,
8479
],
8580
}
8681
},
8782
// Don't schedule if there are recent failed jobs (within the threshold date). Note `gt` is used here since this is a inverse condition.
8883
{
8984
AND: [
90-
{ status: UserPermissionSyncJobStatus.FAILED },
85+
{ status: AccountPermissionSyncJobStatus.FAILED },
9186
{ completedAt: { gt: thresholdDate } },
9287
]
9388
}
@@ -100,7 +95,7 @@ export class UserPermissionSyncer {
10095
}
10196
});
10297

103-
await this.schedulePermissionSync(users);
98+
await this.schedulePermissionSync(accounts);
10499
}, 1000 * 5);
105100
}
106101

@@ -112,18 +107,18 @@ export class UserPermissionSyncer {
112107
await this.queue.close();
113108
}
114109

115-
private async schedulePermissionSync(users: User[]) {
110+
private async schedulePermissionSync(accounts: Account[]) {
116111
// @note: we don't perform this in a transaction because
117112
// we want to avoid the situation where a job is created and run
118113
// prior to the transaction being committed.
119-
const jobs = await this.db.userPermissionSyncJob.createManyAndReturn({
120-
data: users.map(user => ({
121-
userId: user.id,
114+
const jobs = await this.db.accountPermissionSyncJob.createManyAndReturn({
115+
data: accounts.map(account => ({
116+
accountId: account.id,
122117
})),
123118
});
124119

125120
await this.queue.addBulk(jobs.map((job) => ({
126-
name: 'userPermissionSyncJob',
121+
name: 'accountPermissionSyncJob',
127122
data: {
128123
jobId: job.id,
129124
},
@@ -134,145 +129,143 @@ export class UserPermissionSyncer {
134129
})))
135130
}
136131

137-
private async runJob(job: Job<UserPermissionSyncJob>) {
132+
private async runJob(job: Job<AccountPermissionSyncJob>) {
138133
const id = job.data.jobId;
139134
const logger = createJobLogger(id);
140135

141-
const { user } = await this.db.userPermissionSyncJob.update({
136+
const { account } = await this.db.accountPermissionSyncJob.update({
142137
where: {
143138
id,
144139
},
145140
data: {
146-
status: UserPermissionSyncJobStatus.IN_PROGRESS,
141+
status: AccountPermissionSyncJobStatus.IN_PROGRESS,
147142
},
148143
select: {
149-
user: {
144+
account: {
150145
include: {
151-
accounts: true,
146+
user: true,
152147
}
153148
}
154149
}
155150
});
156151

157-
if (!user) {
158-
throw new Error(`User ${id} not found`);
159-
}
160-
161-
logger.info(`Syncing permissions for user ${user.email}...`);
152+
logger.info(`Syncing permissions for ${account.provider} account (id: ${account.id}) for user ${account.user.email}...`);
162153

163154
// Get a list of all repos that the user has access to from all connected accounts.
164155
const repoIds = await (async () => {
165156
const aggregatedRepoIds: Set<number> = new Set();
166157

167-
for (const account of user.accounts) {
168-
if (account.provider === 'github') {
169-
if (!account.access_token) {
170-
throw new Error(`User '${user.email}' does not have an GitHub OAuth access token associated with their GitHub account.`);
171-
}
158+
if (account.provider === 'github') {
159+
if (!account.access_token) {
160+
throw new Error(`User '${account.user.email}' does not have an GitHub OAuth access token associated with their GitHub account.`);
161+
}
172162

173-
const { octokit } = await createOctokitFromToken({
174-
token: account.access_token,
175-
url: env.AUTH_EE_GITHUB_BASE_URL,
176-
});
177-
// @note: we only care about the private repos since we don't need to build a mapping
178-
// for public repos.
179-
// @see: packages/web/src/prisma.ts
180-
const githubRepos = await getReposForAuthenticatedUser(/* visibility = */ 'private', octokit);
181-
const gitHubRepoIds = githubRepos.map(repo => repo.id.toString());
182-
183-
const repos = await this.db.repo.findMany({
184-
where: {
185-
external_codeHostType: 'github',
186-
external_id: {
187-
in: gitHubRepoIds,
188-
}
163+
const { octokit } = await createOctokitFromToken({
164+
token: account.access_token,
165+
url: env.AUTH_EE_GITHUB_BASE_URL,
166+
});
167+
// @note: we only care about the private repos since we don't need to build a mapping
168+
// for public repos.
169+
// @see: packages/web/src/prisma.ts
170+
const githubRepos = await getReposForAuthenticatedUser(/* visibility = */ 'private', octokit);
171+
const gitHubRepoIds = githubRepos.map(repo => repo.id.toString());
172+
173+
const repos = await this.db.repo.findMany({
174+
where: {
175+
external_codeHostType: 'github',
176+
external_id: {
177+
in: gitHubRepoIds,
189178
}
190-
});
191-
192-
repos.forEach(repo => aggregatedRepoIds.add(repo.id));
193-
} else if (account.provider === 'gitlab') {
194-
if (!account.access_token) {
195-
throw new Error(`User '${user.email}' does not have a GitLab OAuth access token associated with their GitLab account.`);
196179
}
180+
});
197181

198-
const api = await createGitLabFromOAuthToken({
199-
oauthToken: account.access_token,
200-
url: env.AUTH_EE_GITLAB_BASE_URL,
201-
});
202-
203-
// @note: we only care about the private and internal repos since we don't need to build a mapping
204-
// for public repos.
205-
// @see: packages/web/src/prisma.ts
206-
const privateGitLabProjects = await getProjectsForAuthenticatedUser('private', api);
207-
const internalGitLabProjects = await getProjectsForAuthenticatedUser('internal', api);
208-
209-
const gitLabProjectIds = [
210-
...privateGitLabProjects,
211-
...internalGitLabProjects,
212-
].map(project => project.id.toString());
213-
214-
const repos = await this.db.repo.findMany({
215-
where: {
216-
external_codeHostType: 'gitlab',
217-
external_id: {
218-
in: gitLabProjectIds,
219-
}
182+
repos.forEach(repo => aggregatedRepoIds.add(repo.id));
183+
} else if (account.provider === 'gitlab') {
184+
if (!account.access_token) {
185+
throw new Error(`User '${account.user.email}' does not have a GitLab OAuth access token associated with their GitLab account.`);
186+
}
187+
188+
const api = await createGitLabFromOAuthToken({
189+
oauthToken: account.access_token,
190+
url: env.AUTH_EE_GITLAB_BASE_URL,
191+
});
192+
193+
// @note: we only care about the private and internal repos since we don't need to build a mapping
194+
// for public repos.
195+
// @see: packages/web/src/prisma.ts
196+
const privateGitLabProjects = await getProjectsForAuthenticatedUser('private', api);
197+
const internalGitLabProjects = await getProjectsForAuthenticatedUser('internal', api);
198+
199+
const gitLabProjectIds = [
200+
...privateGitLabProjects,
201+
...internalGitLabProjects,
202+
].map(project => project.id.toString());
203+
204+
const repos = await this.db.repo.findMany({
205+
where: {
206+
external_codeHostType: 'gitlab',
207+
external_id: {
208+
in: gitLabProjectIds,
220209
}
221-
});
210+
}
211+
});
222212

223-
repos.forEach(repo => aggregatedRepoIds.add(repo.id));
224-
}
213+
repos.forEach(repo => aggregatedRepoIds.add(repo.id));
225214
}
226215

227216
return Array.from(aggregatedRepoIds);
228217
})();
229218

230219
await this.db.$transaction([
231-
this.db.user.update({
220+
this.db.account.update({
232221
where: {
233-
id: user.id,
222+
id: account.id,
234223
},
235224
data: {
236225
accessibleRepos: {
237226
deleteMany: {},
238227
}
239228
}
240229
}),
241-
this.db.userToRepoPermission.createMany({
230+
this.db.accountToRepoPermission.createMany({
242231
data: repoIds.map(repoId => ({
243-
userId: user.id,
232+
accountId: account.id,
244233
repoId,
245234
})),
246235
skipDuplicates: true,
247236
})
248237
]);
249238
}
250239

251-
private async onJobCompleted(job: Job<UserPermissionSyncJob>) {
240+
private async onJobCompleted(job: Job<AccountPermissionSyncJob>) {
252241
const logger = createJobLogger(job.data.jobId);
253242

254-
const { user } = await this.db.userPermissionSyncJob.update({
243+
const { account } = await this.db.accountPermissionSyncJob.update({
255244
where: {
256245
id: job.data.jobId,
257246
},
258247
data: {
259-
status: UserPermissionSyncJobStatus.COMPLETED,
260-
user: {
248+
status: AccountPermissionSyncJobStatus.COMPLETED,
249+
account: {
261250
update: {
262251
permissionSyncedAt: new Date(),
263-
}
252+
},
264253
},
265254
completedAt: new Date(),
266255
},
267256
select: {
268-
user: true
257+
account: {
258+
include: {
259+
user: true,
260+
}
261+
}
269262
}
270263
});
271264

272-
logger.info(`Permissions synced for user ${user.email}`);
265+
logger.info(`Permissions synced for ${account.provider} account (id: ${account.id}) for user ${account.user.email}`);
273266
}
274267

275-
private async onJobFailed(job: Job<UserPermissionSyncJob> | undefined, err: Error) {
268+
private async onJobFailed(job: Job<AccountPermissionSyncJob> | undefined, err: Error) {
276269
const logger = createJobLogger(job?.data.jobId ?? 'unknown');
277270

278271
Sentry.captureException(err, {
@@ -282,26 +275,30 @@ export class UserPermissionSyncer {
282275
}
283276
});
284277

285-
const errorMessage = (email: string) => `User permission sync job failed for user ${email}: ${err.message}`;
278+
const errorMessage = (accountId: string, email: string) => `Account permission sync job failed for account (id: ${accountId}) for user ${email}: ${err.message}`;
286279

287280
if (job) {
288-
const { user } = await this.db.userPermissionSyncJob.update({
281+
const { account } = await this.db.accountPermissionSyncJob.update({
289282
where: {
290283
id: job.data.jobId,
291284
},
292285
data: {
293-
status: UserPermissionSyncJobStatus.FAILED,
286+
status: AccountPermissionSyncJobStatus.FAILED,
294287
completedAt: new Date(),
295288
errorMessage: err.message,
296289
},
297290
select: {
298-
user: true,
291+
account: {
292+
include: {
293+
user: true,
294+
}
295+
}
299296
}
300297
});
301298

302-
logger.error(errorMessage(user.email ?? user.id));
299+
logger.error(errorMessage(account.id, account.user.email ?? 'unknown user (email not found)'));
303300
} else {
304-
logger.error(errorMessage('unknown job (id not found)'));
301+
logger.error(errorMessage('unknown account (id not found)', 'unknown user (id not found)'));
305302
}
306303
}
307304
}

0 commit comments

Comments
 (0)