1- import { PrismaClient } from "@sourcebot/db" ;
1+ import * as Sentry from "@sentry/node" ;
2+ import { PrismaClient , Repo , RepoPermissionSyncStatus } from "@sourcebot/db" ;
23import { createLogger } from "@sourcebot/logger" ;
34import { BitbucketConnectionConfig } from "@sourcebot/schemas/v3/bitbucket.type" ;
45import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type" ;
56import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type" ;
67import { GitlabConnectionConfig } from "@sourcebot/schemas/v3/gitlab.type" ;
78import { Job , Queue , Worker } from 'bullmq' ;
89import { Redis } from 'ioredis' ;
10+ import { env } from "./env.js" ;
911import { createOctokitFromConfig , getUserIdsWithReadAccessToRepo } from "./github.js" ;
1012import { RepoWithConnections } from "./types.js" ;
1113
@@ -17,6 +19,8 @@ const QUEUE_NAME = 'repoPermissionSyncQueue';
1719
1820const logger = createLogger ( 'permission-syncer' ) ;
1921
22+ const SUPPORTED_CODE_HOST_TYPES = [ 'github' ] ;
23+
2024export class RepoPermissionSyncer {
2125 private queue : Queue < RepoPermissionSyncJob > ;
2226 private worker : Worker < RepoPermissionSyncJob > ;
@@ -30,48 +34,94 @@ export class RepoPermissionSyncer {
3034 } ) ;
3135 this . worker = new Worker < RepoPermissionSyncJob > ( QUEUE_NAME , this . runJob . bind ( this ) , {
3236 connection : redis ,
37+ concurrency : 1 ,
3338 } ) ;
3439 this . worker . on ( 'completed' , this . onJobCompleted . bind ( this ) ) ;
3540 this . worker . on ( 'failed' , this . onJobFailed . bind ( this ) ) ;
3641 }
3742
38- public async scheduleJob ( repoId : number ) {
39- await this . queue . add ( QUEUE_NAME , {
40- repoId,
41- } ) ;
42- }
43-
4443 public startScheduler ( ) {
4544 logger . debug ( 'Starting scheduler' ) ;
4645
47- // @todo : we should only sync permissions for a repository if it has been at least ~24 hours since the last sync.
4846 return setInterval ( async ( ) => {
47+ // @todo : make this configurable
48+ const thresholdDate = new Date ( Date . now ( ) - 1000 * 60 * 60 * 24 ) ;
4949 const repos = await this . db . repo . findMany ( {
50+ // Repos need their permissions to be synced against the code host when...
5051 where : {
51- external_codeHostType : {
52- in : [ 'github' ] ,
53- }
52+ // They belong to a code host that supports permissions syncing
53+ AND : [
54+ {
55+ external_codeHostType : {
56+ in : SUPPORTED_CODE_HOST_TYPES ,
57+ }
58+ } ,
59+ // and, they either require a sync (SYNC_NEEDED) or have been in a completed state (SYNCED or FAILED)
60+ // for > some duration (default 24 hours)
61+ {
62+ OR : [
63+ {
64+ permissionSyncStatus : RepoPermissionSyncStatus . SYNC_NEEDED
65+ } ,
66+ {
67+ AND : [
68+ {
69+ OR : [
70+ { permissionSyncStatus : RepoPermissionSyncStatus . SYNCED } ,
71+ { permissionSyncStatus : RepoPermissionSyncStatus . FAILED } ,
72+ ]
73+ } ,
74+ {
75+ OR : [
76+ { permissionSyncJobLastCompletedAt : null } ,
77+ { permissionSyncJobLastCompletedAt : { lt : thresholdDate } }
78+ ]
79+ }
80+ ]
81+ }
82+ ]
83+ } ,
84+ ]
5485 }
5586 } ) ;
5687
57- for ( const repo of repos ) {
58- await this . scheduleJob ( repo . id ) ;
59- }
60-
61- // @todo : make this configurable
62- } , 1000 * 60 ) ;
88+ await this . schedulePermissionSync ( repos ) ;
89+ } , 1000 * 30 ) ;
6390 }
6491
6592 public dispose ( ) {
6693 this . worker . close ( ) ;
6794 this . queue . close ( ) ;
6895 }
6996
97+ private async schedulePermissionSync ( repos : Repo [ ] ) {
98+ await this . db . $transaction ( async ( tx ) => {
99+ await tx . repo . updateMany ( {
100+ where : { id : { in : repos . map ( repo => repo . id ) } } ,
101+ data : { permissionSyncStatus : RepoPermissionSyncStatus . IN_SYNC_QUEUE } ,
102+ } ) ;
103+
104+ await this . queue . addBulk ( repos . map ( repo => ( {
105+ name : 'repoPermissionSyncJob' ,
106+ data : {
107+ repoId : repo . id ,
108+ } ,
109+ opts : {
110+ removeOnComplete : env . REDIS_REMOVE_ON_COMPLETE ,
111+ removeOnFail : env . REDIS_REMOVE_ON_FAIL ,
112+ }
113+ } ) ) )
114+ } ) ;
115+ }
116+
70117 private async runJob ( job : Job < RepoPermissionSyncJob > ) {
71118 const id = job . data . repoId ;
72- const repo = await this . db . repo . findUnique ( {
119+ const repo = await this . db . repo . update ( {
73120 where : {
74- id,
121+ id
122+ } ,
123+ data : {
124+ permissionSyncStatus : RepoPermissionSyncStatus . SYNCING ,
75125 } ,
76126 include : {
77127 connections : {
@@ -86,6 +136,8 @@ export class RepoPermissionSyncer {
86136 throw new Error ( `Repo ${ id } not found` ) ;
87137 }
88138
139+ logger . info ( `Syncing permissions for repo ${ repo . displayName } ...` ) ;
140+
89141 const connection = getFirstConnectionWithToken ( repo ) ;
90142 if ( ! connection ) {
91143 throw new Error ( `No connection with token found for repo ${ id } ` ) ;
@@ -119,8 +171,6 @@ export class RepoPermissionSyncer {
119171 return [ ] ;
120172 } ) ( ) ;
121173
122- logger . info ( `User IDs with read access to repo ${ id } : ${ userIds } ` ) ;
123-
124174 await this . db . repo . update ( {
125175 where : {
126176 id : repo . id ,
@@ -141,11 +191,43 @@ export class RepoPermissionSyncer {
141191 }
142192
143193 private async onJobCompleted ( job : Job < RepoPermissionSyncJob > ) {
144- logger . info ( `Repo permission sync job completed for repo ${ job . data . repoId } ` ) ;
194+ const repo = await this . db . repo . update ( {
195+ where : {
196+ id : job . data . repoId ,
197+ } ,
198+ data : {
199+ permissionSyncStatus : RepoPermissionSyncStatus . SYNCED ,
200+ permissionSyncJobLastCompletedAt : new Date ( ) ,
201+ } ,
202+ } ) ;
203+
204+ logger . info ( `Permissions synced for repo ${ repo . displayName ?? repo . name } ` ) ;
145205 }
146206
147207 private async onJobFailed ( job : Job < RepoPermissionSyncJob > | undefined , err : Error ) {
148- logger . error ( `Repo permission sync job failed for repo ${ job ?. data . repoId } : ${ err } ` ) ;
208+ Sentry . captureException ( err , {
209+ tags : {
210+ repoId : job ?. data . repoId ,
211+ queue : QUEUE_NAME ,
212+ }
213+ } ) ;
214+
215+ const errorMessage = ( repoName : string ) => `Repo permission sync job failed for repo ${ repoName } : ${ err } ` ;
216+
217+ if ( job ) {
218+ const repo = await this . db . repo . update ( {
219+ where : {
220+ id : job ?. data . repoId ,
221+ } ,
222+ data : {
223+ permissionSyncStatus : RepoPermissionSyncStatus . FAILED ,
224+ permissionSyncJobLastCompletedAt : new Date ( ) ,
225+ } ,
226+ } ) ;
227+ logger . error ( errorMessage ( repo . displayName ?? repo . name ) ) ;
228+ } else {
229+ logger . error ( errorMessage ( 'unknown repo (id not found)' ) ) ;
230+ }
149231 }
150232}
151233
0 commit comments