From c42ca8bd44436393e1eacc87a717f27bb6f794b1 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Tue, 29 Aug 2023 12:50:21 -0400 Subject: [PATCH 01/35] Added queue.start() session tracking on jobs --- Models/Queue.js | 13 ++++++++----- config/Database.js | 3 ++- config/config.js | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index 99e2ec7..ca6b067 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -145,13 +145,14 @@ export class Queue { const startTime = Date.now(); let lifespanRemaining = null; let concurrentJobs = []; + let session = uuid.v4(); if (lifespan !== 0) { lifespanRemaining = lifespan - (Date.now() - startTime); lifespanRemaining = (lifespanRemaining === 0) ? -1 : lifespanRemaining; // Handle exactly zero lifespan remaining edge case. - concurrentJobs = await this.getConcurrentJobs(lifespanRemaining); + concurrentJobs = await this.getConcurrentJobs(session, lifespanRemaining); } else { - concurrentJobs = await this.getConcurrentJobs(); + concurrentJobs = await this.getConcurrentJobs(session); } while (this.status === 'active' && concurrentJobs.length) { @@ -168,9 +169,9 @@ export class Queue { if (lifespan !== 0) { lifespanRemaining = lifespan - (Date.now() - startTime); lifespanRemaining = (lifespanRemaining === 0) ? -1 : lifespanRemaining; // Handle exactly zero lifespan remaining edge case. - concurrentJobs = await this.getConcurrentJobs(lifespanRemaining); + concurrentJobs = await this.getConcurrentJobs(session, lifespanRemaining); } else { - concurrentJobs = await this.getConcurrentJobs(); + concurrentJobs = await this.getConcurrentJobs(session); } } @@ -220,10 +221,11 @@ export class Queue { * If queue is running with a lifespan, only jobs with timeouts at least 500ms < than REMAINING lifespan * AND a set timeout (ie timeout > 0) will be returned. See Queue.start() for more info. * + * @param session {uuid} - The unique ID of the queue.start() instance. * @param queueLifespanRemaining {number} - The remaining lifespan of the current queue process (defaults to indefinite). * @return {promise} - Promise resolves to an array of job(s) to be processed next by the queue. */ - async getConcurrentJobs(queueLifespanRemaining = 0) { + async getConcurrentJobs(session, queueLifespanRemaining = 0) { let concurrentJobs = []; this.realm.write(() => { @@ -268,6 +270,7 @@ export class Queue { // Mark concurrent jobs as active jobsToMarkActive = jobsToMarkActive.map( job => { job.active = true; + job.session = session; }); // Reselect now-active concurrent jobs by id. diff --git a/config/Database.js b/config/Database.js index 7a3c68f..7839bc8 100644 --- a/config/Database.js +++ b/config/Database.js @@ -16,7 +16,8 @@ const JobSchema = { active: { type: 'bool', default: false}, // Whether or not job is currently being processed. timeout: 'int', // Job timeout in ms. 0 means no timeout. created: 'date', // Job creation timestamp. - failed: 'date?' // Job failure timestamp (null until failure). + failed: 'date?', // Job failure timestamp (null until failure). + session: 'uuid?', // Unique session id for queue.start() instance that pulled the job in. } }; diff --git a/config/config.js b/config/config.js index e2e30b7..ccac1fa 100644 --- a/config/config.js +++ b/config/config.js @@ -5,5 +5,5 @@ */ export const Config = { REALM_PATH: 'reactNativeQueue.realm', // Name of realm database. - REALM_SCHEMA_VERSION: 0 // Must be incremented if data model updates. + REALM_SCHEMA_VERSION: 1 // Must be incremented if data model updates. }; From 09525fb8dbdf99d8c2948297ddf38b5547f98cd3 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Tue, 29 Aug 2023 13:09:23 -0400 Subject: [PATCH 02/35] Single failed job per queue.start() session --- Models/Queue.js | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index ca6b067..daf457e 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -253,8 +253,27 @@ export class Queue { const concurrency = this.worker.getConcurrency(nextJob.name); const allRelatedJobsQuery = (queueLifespanRemaining) - ? '(name == "'+ nextJob.name +'" AND active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound + ') OR (name == "'+ nextJob.name +'" AND active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound + ')' - : '(name == "'+ nextJob.name +'" AND active == FALSE AND failed == null) OR (name == "'+ nextJob.name +'" AND active == TRUE AND failed == null)'; + ? `(name == "${nextJob.name}" AND + active == FALSE AND + session != ${session} AND + failed == null AND + timeout > 0 AND + timeout < ${timeoutUpperBound}) + OR (name == "${nextJob.name}" AND + active == FALSE AND + session != ${session} AND + failed == null AND + timeout > 0 AND + timeout < ${timeoutUpperBound})` + + : `(name == "${nextJob.name}" AND + active == FALSE AND + session != ${session} AND + failed == null) + OR (name == "${nextJob.name}" AND + active == TRUE AND + session != ${session} AND + failed == null)`; const allRelatedJobs = this.realm.objects('Job') .filtered(allRelatedJobsQuery) From 408da462d28f4e6b175600b487177b225ab164cf Mon Sep 17 00:00:00 2001 From: RobNewton Date: Tue, 29 Aug 2023 13:26:32 -0400 Subject: [PATCH 03/35] Added session filtering to main query --- Models/Queue.js | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index daf457e..c2519be 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -237,8 +237,23 @@ export class Queue { const timeoutUpperBound = (queueLifespanRemaining - 500 > 0) ? queueLifespanRemaining - 499 : 0; // Only get jobs with timeout at least 500ms < queueLifespanRemaining. const initialQuery = (queueLifespanRemaining) - ? '(active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound + ') OR (active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound + ')' - : '(active == FALSE AND failed == null) OR (active == TRUE && failed == null)'; + ? `(active == FALSE AND + session != ${session} AND + failed == null AND + timeout > 0 AND + timeout < ${timeoutUpperBound}) + OR (active == FALSE AND + session != ${session} AND + failed == null AND + timeout > 0 AND + timeout < ${timeoutUpperBound})` + + : `(active == FALSE AND + session != ${session} AND + failed == null) + OR (active == TRUE AND + session != ${session} AND + failed == null)`; let jobs = Array.from(this.realm.objects('Job') .filtered(initialQuery) From c2db21d11d2baf072fc20f32d60bfdf868092d2f Mon Sep 17 00:00:00 2001 From: RobNewton Date: Tue, 29 Aug 2023 13:28:28 -0400 Subject: [PATCH 04/35] Increment minor version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 772a1b3..04c2dee 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@sourcetoad/react-native-queue", - "version": "2.2.0", + "version": "2.3.0", "description": "A React Native Job Queue", "main": "index.js", "files": [ From 58b07274509343e9db11716fa7de40b51373117a Mon Sep 17 00:00:00 2001 From: RobNewton Date: Tue, 29 Aug 2023 13:50:44 -0400 Subject: [PATCH 05/35] Corrected model issues and query filter --- Models/Queue.js | 19 ++++++++++--------- config/Database.js | 2 +- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index c2519be..2c597c3 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -100,7 +100,8 @@ export class Queue { active: false, timeout: (options.timeout >= 0) ? options.timeout : 25000, created: new Date(), - failed: null + failed: null, + session: null }); }); @@ -238,21 +239,21 @@ export class Queue { const initialQuery = (queueLifespanRemaining) ? `(active == FALSE AND - session != ${session} AND + session == null AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound}) OR (active == FALSE AND - session != ${session} AND + session == null AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound})` : `(active == FALSE AND - session != ${session} AND + session == null AND failed == null) OR (active == TRUE AND - session != ${session} AND + session == null AND failed == null)`; let jobs = Array.from(this.realm.objects('Job') @@ -270,24 +271,24 @@ export class Queue { const allRelatedJobsQuery = (queueLifespanRemaining) ? `(name == "${nextJob.name}" AND active == FALSE AND - session != ${session} AND + session == null AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound}) OR (name == "${nextJob.name}" AND active == FALSE AND - session != ${session} AND + session == null AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound})` : `(name == "${nextJob.name}" AND active == FALSE AND - session != ${session} AND + session == null AND failed == null) OR (name == "${nextJob.name}" AND active == TRUE AND - session != ${session} AND + session == null AND failed == null)`; const allRelatedJobs = this.realm.objects('Job') diff --git a/config/Database.js b/config/Database.js index 7839bc8..bf27bad 100644 --- a/config/Database.js +++ b/config/Database.js @@ -17,7 +17,7 @@ const JobSchema = { timeout: 'int', // Job timeout in ms. 0 means no timeout. created: 'date', // Job creation timestamp. failed: 'date?', // Job failure timestamp (null until failure). - session: 'uuid?', // Unique session id for queue.start() instance that pulled the job in. + session: 'string?', // Session UUID for queue.start() instance that pulled the job in. } }; From 021781b9f0aecc48c265a2ad789dc15dac1f1506 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Tue, 29 Aug 2023 14:04:14 -0400 Subject: [PATCH 06/35] Fixed query filter --- Models/Queue.js | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index 2c597c3..996e750 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -239,21 +239,21 @@ export class Queue { const initialQuery = (queueLifespanRemaining) ? `(active == FALSE AND - session == null AND + session != "${session}" AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound}) OR (active == FALSE AND - session == null AND + session != "${session}" AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound})` : `(active == FALSE AND - session == null AND + session != "${session}" AND failed == null) OR (active == TRUE AND - session == null AND + session != "${session}" AND failed == null)`; let jobs = Array.from(this.realm.objects('Job') @@ -271,24 +271,24 @@ export class Queue { const allRelatedJobsQuery = (queueLifespanRemaining) ? `(name == "${nextJob.name}" AND active == FALSE AND - session == null AND + session != "${session}" AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound}) OR (name == "${nextJob.name}" AND active == FALSE AND - session == null AND + session != "${session}" AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound})` : `(name == "${nextJob.name}" AND active == FALSE AND - session == null AND + session != "${session}" AND failed == null) OR (name == "${nextJob.name}" AND active == TRUE AND - session == null AND + session != "${session}" AND failed == null)`; const allRelatedJobs = this.realm.objects('Job') From 5fabde9a79b7ffef120cc714d8ce17ac60001fd6 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Tue, 29 Aug 2023 16:42:11 -0400 Subject: [PATCH 07/35] Added configurable retries and min duration between --- Models/Queue.js | 129 +++++++++++++++++++++++++++++++-------------- Models/Worker.js | 64 ++++++++++++++++++++++ config/Database.js | 1 + notes.js | 30 +++++++++++ 4 files changed, 183 insertions(+), 41 deletions(-) create mode 100644 notes.js diff --git a/Models/Queue.js b/Models/Queue.js index 996e750..8bf508c 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -100,6 +100,7 @@ export class Queue { active: false, timeout: (options.timeout >= 0) ? options.timeout : 25000, created: new Date(), + lastFailed: null, failed: null, session: null }); @@ -239,21 +240,17 @@ export class Queue { const initialQuery = (queueLifespanRemaining) ? `(active == FALSE AND - session != "${session}" AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound}) OR (active == FALSE AND - session != "${session}" AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound})` : `(active == FALSE AND - session != "${session}" AND failed == null) OR (active == TRUE AND - session != "${session}" AND failed == null)`; let jobs = Array.from(this.realm.objects('Job') @@ -268,29 +265,63 @@ export class Queue { if (nextJob) { const concurrency = this.worker.getConcurrency(nextJob.name); + const attemptBehavior = this.worker.getAttemptBehavior(nextJob.name); + + const minMsBetweenAttempts = this.worker.getMinimumMillisBetweenAttempts(nextJob.name); + let earliestLastFailedTimestamp = new Date(); + earliestLastFailedTimestamp.setMilliseconds(earliestLastFailedTimestamp.getMilliseconds() - minMsBetweenAttempts); + + // If the worker is configured to attempt failed moves immediately, + // then we don't want to filter out jobs that were already attempted + // by this session. + const attemptFilterPart = () => { + switch (attemptBehavior) { + case 'immediate': + return ''; + case 'oncePerStart': + return `session != "${session}" AND`; + default: + return ''; + } + }; + const allRelatedJobsQuery = (queueLifespanRemaining) ? `(name == "${nextJob.name}" AND active == FALSE AND - session != "${session}" AND + ${attemptFilterPart()} failed == null AND + (lastFailed == null + OR + lastFailed < ${earliestLastFailedTimestamp}) AND timeout > 0 AND timeout < ${timeoutUpperBound}) OR (name == "${nextJob.name}" AND active == FALSE AND - session != "${session}" AND + ${attemptFilterPart()} failed == null AND + (lastFailed == null + OR + lastFailed < ${earliestLastFailedTimestamp}) AND timeout > 0 AND timeout < ${timeoutUpperBound})` : `(name == "${nextJob.name}" AND active == FALSE AND - session != "${session}" AND + ${attemptFilterPart()} + (lastFailed == null + OR + lastFailed < ${earliestLastFailedTimestamp}) AND failed == null) OR (name == "${nextJob.name}" AND active == TRUE AND - session != "${session}" AND + ${attemptFilterPart()} + (lastFailed == null + OR + lastFailed < ${earliestLastFailedTimestamp}) AND failed == null)`; + console.log('allRelatedJobsQuery', allRelatedJobsQuery); // eslint-disable-line no-console + const allRelatedJobs = this.realm.objects('Job') .filtered(allRelatedJobsQuery) .sorted([['priority', true], ['created', false]]); @@ -364,39 +395,55 @@ export class Queue { let jobData = JSON.parse(job.data); const errorMessage = error?.message || ''; - this.realm.write(() => { - // Increment failed attempts number - if (!jobData.failedAttempts) { - jobData.failedAttempts = 1; - } else { - jobData.failedAttempts++; - } - - // Log error - if (!jobData.errors) { - jobData.errors = [ errorMessage ]; - } else { - jobData.errors.push(errorMessage); - } - - job.data = JSON.stringify(jobData); - - // Reset active status - job.active = false; - - // Mark job as failed if too many attempts - if (jobData.failedAttempts >= jobData.attempts) { - job.failed = new Date(); - } - }); - - // Execute job onFailure lifecycle callback. - this.worker.executeJobLifecycleCallback('onFailure', jobName, jobId, jobPayload); - - // If job has failed all attempts execute job onFailed and onComplete lifecycle callbacks. - if (jobData.failedAttempts >= jobData.attempts) { - this.worker.executeJobLifecycleCallback('onFailed', jobName, jobId, jobPayload); - this.worker.executeJobLifecycleCallback('onComplete', jobName, jobId, jobPayload); + // Call the optional error profiler from the worker.options to learn what we should + // do with this error. If the profiler returns true, we should attempt the job. + const failureBehavior = this.worker.getFailureBehavior(jobName); + + switch (failureBehavior) { + case 'standard': + this.realm.write(() => { + // Increment failed attempts number + if (!jobData.failedAttempts) { + jobData.failedAttempts = 1; + } else { + jobData.failedAttempts++; + } + + // Log error + if (!jobData.errors) { + jobData.errors = [errorMessage]; + } else { + jobData.errors.push(errorMessage); + } + + job.data = JSON.stringify(jobData); + + // Reset active status + job.active = false; + + // Use the same date object for both failure times if last failure + const now = new Date(); + + // Record when this attempt failed + job.lastFailed = now; + + // Mark job as failed if too many attempts + if (jobData.failedAttempts >= jobData.attempts) { + job.failed = now; + } + }); + + // Execute job onFailure lifecycle callback. + this.worker.executeJobLifecycleCallback('onFailure', jobName, jobId, jobPayload); + + // If job has failed all attempts execute job onFailed and onComplete lifecycle callbacks. + if (jobData.failedAttempts >= jobData.attempts) { + this.worker.executeJobLifecycleCallback('onFailed', jobName, jobId, jobPayload); + this.worker.executeJobLifecycleCallback('onComplete', jobName, jobId, jobPayload); + } + break; + default: + break; } } } diff --git a/Models/Worker.js b/Models/Worker.js index 1c02f54..927006c 100644 --- a/Models/Worker.js +++ b/Models/Worker.js @@ -39,6 +39,9 @@ export default class Worker { // Attach options to worker worker.options = { concurrency: options.concurrency || 1, + attemptBehavior: options.attemptBehavior || 'immediate', // immediate | oncePerStart + failureBehavior: options.failureBehavior || 'standard', // standard | custom + minimumMillisBetweenAttempts: options.minimumMillisBetweenAttempts || 0, onStart: options.onStart || null, onSuccess: options.onSuccess || null, onFailure: options.onFailure || null, @@ -78,6 +81,67 @@ export default class Worker { return Worker.workers[jobName].options.concurrency; } + /** + * Get the attempt behavior setting for a worker. + * + * Defaults to standard attempt behavior. + * + * @param jobName {string} - Name associated with jobs assigned to this worker. + * @throws Throws error if no worker is currently assigned to passed in job name. + * @return {object} + */ + getAttemptBehavior(jobName) { + // If no worker assigned to job name, throw error. + if (!Worker.workers[jobName]) { + throw new Error('Job ' + jobName + ' does not have a worker assigned to it.'); + } + + return Worker.workers[jobName].options.attemptBehavior || null; + } + + /** + * Get the minimum duration (ms) between attempts setting for a worker. + * + * Defaults to 0. + * + * @param jobName {string} - Name associated with jobs assigned to this worker. + * @throws Throws error if no worker is currently assigned to passed in job name. + * @return {number} + */ + getMinimumMillisBetweenAttempts(jobName) { + // If no worker assigned to job name, throw error. + if (!Worker.workers[jobName]) { + throw new Error('Job ' + jobName + ' does not have a worker assigned to it.'); + } + + let minimum = 0; + try { + minimum = parseInt(Worker.workers[jobName].options.minimumMillisBetweenAttempts); + } catch (error) { + console.error(error); // eslint-disable-line no-console + } + + return minimum; + } + + /** + * Get the failure behavior setting for a worker. + * + * Defaults to standard failure behavior. + * + * @param jobName {string} - Name associated with jobs assigned to this worker. + * @throws Throws error if no worker is currently assigned to passed in job name. + * @return {string} + */ + getFailureBehavior(jobName) { + // If no worker assigned to job name, throw error. + if (!Worker.workers[jobName]) { + throw new Error('Job ' + jobName + ' does not have a worker assigned to it.'); + } + + return Worker.workers[jobName].options.failureBehavior || null; + } + /** * * Execute the worker function assigned to the passed in job name. diff --git a/config/Database.js b/config/Database.js index bf27bad..c1b6702 100644 --- a/config/Database.js +++ b/config/Database.js @@ -17,6 +17,7 @@ const JobSchema = { timeout: 'int', // Job timeout in ms. 0 means no timeout. created: 'date', // Job creation timestamp. failed: 'date?', // Job failure timestamp (null until failure). + lastFailed: 'date?', // Last job failure timestamp (set after each failed attempt). session: 'string?', // Session UUID for queue.start() instance that pulled the job in. } }; diff --git a/notes.js b/notes.js new file mode 100644 index 0000000..2eacc79 --- /dev/null +++ b/notes.js @@ -0,0 +1,30 @@ +/* + +When we grab a job off of the realm jobs db, it gets marked as active. When +we set it to active, also set a session id to a unique UUID from that instance +of calling queue.start(). + +In getConcurrentJobs(), we should only grab jobs that do are active = false and +sessionId = null. This will prevent us from grabbing jobs that are already +active in another queue instance. + + +Send status change to server: +send breadcrumb trail of all status changes to server. + + +*/ +try { + if (await job.dependenciesMet(job)) { + await this.worker.executeJob(job); + } else { + this.realm.write(() => { + this.realm.write((job.active = false)); + }); + return; + } +} catch (error) { + this.realm.write(() => { + this.realm.write((job.active = false)); + }); +} From 5001b4229272be684e52df7342041ae8b21318d5 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Tue, 29 Aug 2023 17:04:10 -0400 Subject: [PATCH 08/35] filter issues wip --- Models/Queue.js | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index 8bf508c..149c96c 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -289,41 +289,33 @@ export class Queue { ? `(name == "${nextJob.name}" AND active == FALSE AND ${attemptFilterPart()} + (lastFailed == null OR lastFailed <= $0) AND failed == null AND - (lastFailed == null - OR - lastFailed < ${earliestLastFailedTimestamp}) AND timeout > 0 AND timeout < ${timeoutUpperBound}) OR (name == "${nextJob.name}" AND active == FALSE AND ${attemptFilterPart()} + (lastFailed == null OR lastFailed <= $0) AND failed == null AND - (lastFailed == null - OR - lastFailed < ${earliestLastFailedTimestamp}) AND timeout > 0 AND timeout < ${timeoutUpperBound})` : `(name == "${nextJob.name}" AND active == FALSE AND ${attemptFilterPart()} - (lastFailed == null - OR - lastFailed < ${earliestLastFailedTimestamp}) AND + (lastFailed == null OR lastFailed <= $0) AND failed == null) OR (name == "${nextJob.name}" AND active == TRUE AND ${attemptFilterPart()} - (lastFailed == null - OR - lastFailed < ${earliestLastFailedTimestamp}) AND + (lastFailed == null OR lastFailed <= $0) AND failed == null)`; console.log('allRelatedJobsQuery', allRelatedJobsQuery); // eslint-disable-line no-console const allRelatedJobs = this.realm.objects('Job') - .filtered(allRelatedJobsQuery) + .filtered(allRelatedJobsQuery, earliestLastFailedTimestamp) .sorted([['priority', true], ['created', false]]); let jobsToMarkActive = allRelatedJobs.slice(0, concurrency); From 1f2305f97da0f9ea06c8a2e24a2277287900d08b Mon Sep 17 00:00:00 2001 From: RobNewton Date: Thu, 31 Aug 2023 11:43:36 -0400 Subject: [PATCH 09/35] Fixed bug with excluded jobs --- Models/Queue.js | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index 149c96c..230b600 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -312,8 +312,6 @@ export class Queue { (lastFailed == null OR lastFailed <= $0) AND failed == null)`; - console.log('allRelatedJobsQuery', allRelatedJobsQuery); // eslint-disable-line no-console - const allRelatedJobs = this.realm.objects('Job') .filtered(allRelatedJobsQuery, earliestLastFailedTimestamp) .sorted([['priority', true], ['created', false]]); @@ -323,7 +321,7 @@ export class Queue { // Grab concurrent job ids to reselect jobs as marking these jobs as active will remove // them from initial selection when write transaction exits. // See: https://stackoverflow.com/questions/47359368/does-realm-support-select-for-update-style-read-locking/47363356#comment81772710_47363356 - const concurrentJobIds = jobsToMarkActive.map( job => job.id); + const concurrentJobIds = jobsToMarkActive.map(job => job.id); // Mark concurrent jobs as active jobsToMarkActive = jobsToMarkActive.map( job => { @@ -332,12 +330,15 @@ export class Queue { }); // Reselect now-active concurrent jobs by id. - const reselectQuery = concurrentJobIds.map( jobId => 'id == "' + jobId + '"').join(' OR '); - const reselectedJobs = Array.from(this.realm.objects('Job') - .filtered(reselectQuery) - .sorted([['priority', true], ['created', false]])); - - concurrentJobs = reselectedJobs.slice(0, concurrency); + if (concurrentJobIds.length > 0) { + const reselectQuery = concurrentJobIds.map(jobId => 'id == "' + jobId + '"').join(' OR '); + console.log(`[RNQ] Reselect query: ${reselectQuery}`); + const reselectedJobs = Array.from(this.realm.objects('Job') + .filtered(reselectQuery) + .sorted([['priority', true], ['created', false]])); + + concurrentJobs = reselectedJobs.slice(0, concurrency); + } } }); From 8ee8a4f9be6f5353d6f487b4fa3e38b51d428e36 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 1 Sep 2023 18:01:38 -0400 Subject: [PATCH 10/35] Fixed with worker specific filters --- Models/Queue.js | 81 ++++++++++++++++++++++++------------------------ Models/Worker.js | 39 +++++++++++++---------- 2 files changed, 63 insertions(+), 57 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index 230b600..9f6880a 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -230,6 +230,9 @@ export class Queue { async getConcurrentJobs(session, queueLifespanRemaining = 0) { let concurrentJobs = []; + const workersArr = this.worker.getWorkersAsArray(); + if (workersArr.length === 0) return concurrentJobs; + this.realm.write(() => { // Get next job from queue. let nextJob = null; @@ -238,19 +241,45 @@ export class Queue { // If queueLife const timeoutUpperBound = (queueLifespanRemaining - 500 > 0) ? queueLifespanRemaining - 499 : 0; // Only get jobs with timeout at least 500ms < queueLifespanRemaining. + // Get worker specific minimum time between attempts. + let workerFilters; + if (workersArr.length > 0) { + workerFilters = workersArr.map(worker => { + const { name, minimumMillisBetweenAttempts = 0 } = worker; + let earliestLastFailed = new Date(); + earliestLastFailed.setMilliseconds(earliestLastFailed.getMilliseconds() - minimumMillisBetweenAttempts); + const realmFilterableDate = earliestLastFailed.toISOString().replace('T', '@').split('.')[0] + ':00'; + const workerFiler = ` + ( + name == "${name}" AND + ( + lastFailed == null OR + lastFailed <= ${realmFilterableDate} + ) + )`; + return workerFiler; + }); + } + const initialQuery = (queueLifespanRemaining) - ? `(active == FALSE AND + ? ` + (active == FALSE AND failed == null AND + (${workerFilters?.join(' OR ')}) AND timeout > 0 AND timeout < ${timeoutUpperBound}) OR (active == FALSE AND failed == null AND + (${workerFilters?.join(' OR ')}) AND timeout > 0 AND timeout < ${timeoutUpperBound})` - : `(active == FALSE AND + : ` + (active == FALSE AND + (${workerFilters?.join(' OR ')}) AND failed == null) OR (active == TRUE AND + (${workerFilters?.join(' OR ')}) AND failed == null)`; let jobs = Array.from(this.realm.objects('Job') @@ -265,55 +294,31 @@ export class Queue { if (nextJob) { const concurrency = this.worker.getConcurrency(nextJob.name); - const attemptBehavior = this.worker.getAttemptBehavior(nextJob.name); - - const minMsBetweenAttempts = this.worker.getMinimumMillisBetweenAttempts(nextJob.name); - let earliestLastFailedTimestamp = new Date(); - earliestLastFailedTimestamp.setMilliseconds(earliestLastFailedTimestamp.getMilliseconds() - minMsBetweenAttempts); - - // If the worker is configured to attempt failed moves immediately, - // then we don't want to filter out jobs that were already attempted - // by this session. - const attemptFilterPart = () => { - switch (attemptBehavior) { - case 'immediate': - return ''; - case 'oncePerStart': - return `session != "${session}" AND`; - default: - return ''; - } - }; - const allRelatedJobsQuery = (queueLifespanRemaining) ? `(name == "${nextJob.name}" AND active == FALSE AND - ${attemptFilterPart()} - (lastFailed == null OR lastFailed <= $0) AND + (${workerFilters?.join(' OR ')}) AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound}) OR (name == "${nextJob.name}" AND active == FALSE AND - ${attemptFilterPart()} - (lastFailed == null OR lastFailed <= $0) AND + (${workerFilters?.join(' OR ')}) AND failed == null AND timeout > 0 AND timeout < ${timeoutUpperBound})` : `(name == "${nextJob.name}" AND active == FALSE AND - ${attemptFilterPart()} - (lastFailed == null OR lastFailed <= $0) AND + (${workerFilters?.join(' OR ')}) AND failed == null) OR (name == "${nextJob.name}" AND active == TRUE AND - ${attemptFilterPart()} - (lastFailed == null OR lastFailed <= $0) AND + (${workerFilters?.join(' OR ')}) AND failed == null)`; const allRelatedJobs = this.realm.objects('Job') - .filtered(allRelatedJobsQuery, earliestLastFailedTimestamp) + .filtered(allRelatedJobsQuery) .sorted([['priority', true], ['created', false]]); let jobsToMarkActive = allRelatedJobs.slice(0, concurrency); @@ -330,15 +335,11 @@ export class Queue { }); // Reselect now-active concurrent jobs by id. - if (concurrentJobIds.length > 0) { - const reselectQuery = concurrentJobIds.map(jobId => 'id == "' + jobId + '"').join(' OR '); - console.log(`[RNQ] Reselect query: ${reselectQuery}`); - const reselectedJobs = Array.from(this.realm.objects('Job') - .filtered(reselectQuery) - .sorted([['priority', true], ['created', false]])); - - concurrentJobs = reselectedJobs.slice(0, concurrency); - } + const reselectQuery = concurrentJobIds.map(jobId => 'id == "' + jobId + '"').join(' OR '); + const reselectedJobs = Array.from(this.realm.objects('Job') + .filtered(reselectQuery) + .sorted([['priority', true], ['created', false]])); + concurrentJobs = reselectedJobs.slice(0, concurrency); } }); diff --git a/Models/Worker.js b/Models/Worker.js index 927006c..8d473e0 100644 --- a/Models/Worker.js +++ b/Models/Worker.js @@ -63,40 +63,45 @@ export default class Worker { } /** + * Get an array of all registered workers. + * Each worker object in the array is the worker options object + * with the name property added. * - * Get the concurrency setting for a worker. - * - * Worker concurrency defaults to 1. + * @returns {Array} - Array of worker options with name property added. + */ + getWorkersAsArray() { + return Object.keys(Worker.workers).map(jobName => { + return { ...this.getWorkerOptions(jobName), name: jobName }; + }); + } + + /** + * Get the worker options for a worker by job name. * * @param jobName {string} - Name associated with jobs assigned to this worker. - * @throws Throws error if no worker is currently assigned to passed in job name. - * @return {number} + * @returns {Object} worker options object */ - getConcurrency(jobName) { - // If no worker assigned to job name, throw error. - if (!Worker.workers[jobName]) { - throw new Error('Job ' + jobName + ' does not have a worker assigned to it.'); - } - - return Worker.workers[jobName].options.concurrency; + getWorkerOptions(jobName) { + return Worker.workers[jobName].options; } /** - * Get the attempt behavior setting for a worker. * - * Defaults to standard attempt behavior. + * Get the concurrency setting for a worker. + * + * Worker concurrency defaults to 1. * * @param jobName {string} - Name associated with jobs assigned to this worker. * @throws Throws error if no worker is currently assigned to passed in job name. - * @return {object} + * @return {number} */ - getAttemptBehavior(jobName) { + getConcurrency(jobName) { // If no worker assigned to job name, throw error. if (!Worker.workers[jobName]) { throw new Error('Job ' + jobName + ' does not have a worker assigned to it.'); } - return Worker.workers[jobName].options.attemptBehavior || null; + return Worker.workers[jobName].options.concurrency; } /** From 193a3942fc057cd150c935b69f370c05c00c18df Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 1 Sep 2023 22:14:59 -0400 Subject: [PATCH 11/35] isRunnable + onSkipped + minimumMillisBetweenAttempts --- Models/Queue.js | 64 ++++++++++++++++++++++++++++++++++++++++++------ Models/Worker.js | 38 ++++++++++++++++++++-------- 2 files changed, 85 insertions(+), 17 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index 9f6880a..c7c8edc 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -231,6 +231,34 @@ export class Queue { let concurrentJobs = []; const workersArr = this.worker.getWorkersAsArray(); + const workersArrToJSON = () => { + return workersArr.map(worker => { + let ret = {}; + const { + concurrency, + isJobRunnable, + failureBehavior, + minimumMillisBetweenAttempts, + onStart, + onSuccess, + onFailure, + onFailed, + onComplete } = worker; + if (concurrency) ret.concurrency = typeof concurrency == 'number' ? concurrency : 'undefined'; + if (isJobRunnable) ret.isJobRunnable = typeof isJobRunnable == 'function' ? 'isJobRunnable()' : 'undefined'; + if (failureBehavior) ret.failureBehavior = typeof failureBehavior == 'string' ? failureBehavior : 'undefined'; + if (minimumMillisBetweenAttempts) ret.minimumMillisBetweenAttempts = typeof minimumMillisBetweenAttempts == 'number' ? minimumMillisBetweenAttempts : 'undefined'; + if (onStart) ret.onStart = typeof onStart == 'function' ? 'onStart()' : 'undefined'; + if (onSuccess) ret.onSuccess = typeof onSuccess == 'function' ? 'onSuccess()' : 'undefined'; + if (onFailure) ret.onFailure = typeof onFailure == 'function' ? 'onFailure()' : 'undefined'; + if (onFailed) ret.onFailed = typeof onFailed == 'function' ? 'onFailed()' : 'undefined'; + if (onComplete) ret.onComplete = typeof onComplete == 'function' ? 'onComplete()' : 'undefined'; + return ret; + }); + }; + + + //console.log(`[RNQ] Found ${workersArr.length} workers to process...\n${JSON.stringify(workersArrToJSON(), null, 2)}`); if (workersArr.length === 0) return concurrentJobs; this.realm.write(() => { @@ -286,6 +314,8 @@ export class Queue { .filtered(initialQuery) .sorted([['priority', true], ['created', false]])); + console.log(`[RNQ] Initially found ${jobs?.length} jobs to process.`); + if (jobs.length) { nextJob = jobs[0]; } @@ -321,7 +351,23 @@ export class Queue { .filtered(allRelatedJobsQuery) .sorted([['priority', true], ['created', false]]); - let jobsToMarkActive = allRelatedJobs.slice(0, concurrency); + // Filter out any jobs that are not runnable. + let runnableJobs = []; + for (let index = 0; index < allRelatedJobs.length; index++) { + const job = allRelatedJobs[index]; + const { runnable, reason } = this.worker.execIsJobRunnable(job.name, job); + if (runnable) { + runnableJobs.push(job); + } else { + // Fire onSkipped job lifecycle callback + const jobPayload = JSON.parse(job.payload); + this.worker.executeJobLifecycleCallback('onSkipped', job.name, job.id, {...jobPayload, skippedReason: reason}); + } + } + + console.log(`[RNQ] Only ${runnableJobs?.length} of those jobs are runnable.`); + + let jobsToMarkActive = runnableJobs.slice(0, concurrency); // Grab concurrent job ids to reselect jobs as marking these jobs as active will remove // them from initial selection when write transaction exits. @@ -335,16 +381,20 @@ export class Queue { }); // Reselect now-active concurrent jobs by id. - const reselectQuery = concurrentJobIds.map(jobId => 'id == "' + jobId + '"').join(' OR '); - const reselectedJobs = Array.from(this.realm.objects('Job') - .filtered(reselectQuery) - .sorted([['priority', true], ['created', false]])); - concurrentJobs = reselectedJobs.slice(0, concurrency); + if (concurrentJobIds.length > 0) { + const reselectQuery = concurrentJobIds.map(jobId => 'id == "' + jobId + '"').join(' OR '); + const reselectedJobs = Array.from(this.realm.objects('Job') + .filtered(reselectQuery) + .sorted([['priority', true], ['created', false]])); + concurrentJobs = reselectedJobs.slice(0, concurrency); + console.log(`[RNQ] Found ${concurrentJobs?.length} concurrent jobs to process.`); + } else { + console.log(`[RNQ] No jobs to process.`); + } } }); return concurrentJobs; - } /** diff --git a/Models/Worker.js b/Models/Worker.js index 8d473e0..78c1f13 100644 --- a/Models/Worker.js +++ b/Models/Worker.js @@ -36,14 +36,17 @@ export default class Worker { throw new Error('Job name and associated worker function must be supplied.'); } + const defaultIsJobRunnable = async (id, payload) => true; + // Attach options to worker worker.options = { concurrency: options.concurrency || 1, - attemptBehavior: options.attemptBehavior || 'immediate', // immediate | oncePerStart + isJobRunnable: options.isJobRunnable || defaultIsJobRunnable, failureBehavior: options.failureBehavior || 'standard', // standard | custom minimumMillisBetweenAttempts: options.minimumMillisBetweenAttempts || 0, onStart: options.onStart || null, onSuccess: options.onSuccess || null, + onSkipped: options.onSkipped || null, onFailure: options.onFailure || null, onFailed: options.onFailed || null, onComplete: options.onComplete || null @@ -104,6 +107,28 @@ export default class Worker { return Worker.workers[jobName].options.concurrency; } + /** + * + * Call the options.isJobRunnable function if it exists for a worker. + * + * @param jobName {string} - Name associated with jobs assigned to this worker. + * @throws Throws error if no worker is currently assigned to passed in job name. + * @return {Object} + */ + execIsJobRunnable(jobName, job) { + // If no worker assigned to job name, throw error. + if (!Worker.workers[jobName]) { + throw new Error('Job ' + jobName + ' does not have a worker assigned to it.'); + } + + const isJobRunnable = Worker.workers[jobName].options.isJobRunnable; + if (isJobRunnable && typeof isJobRunnable === 'function') { + return isJobRunnable(job.id, JSON.parse(job.payload)); + }; + + return { runnable: true }; + } + /** * Get the minimum duration (ms) between attempts setting for a worker. * @@ -119,14 +144,7 @@ export default class Worker { throw new Error('Job ' + jobName + ' does not have a worker assigned to it.'); } - let minimum = 0; - try { - minimum = parseInt(Worker.workers[jobName].options.minimumMillisBetweenAttempts); - } catch (error) { - console.error(error); // eslint-disable-line no-console - } - - return minimum; + return parseInt(Worker.workers[jobName].options.minimumMillisBetweenAttempts); } /** @@ -193,7 +211,7 @@ export default class Worker { */ async executeJobLifecycleCallback(callbackName, jobName, jobId, jobPayload) { // Validate callback name - const validCallbacks = ['onStart', 'onSuccess', 'onFailure', 'onFailed', 'onComplete']; + const validCallbacks = ['onStart', 'onSuccess', 'onSkipped', 'onFailure', 'onFailed', 'onComplete']; if (!validCallbacks.includes(callbackName)) { throw new Error('Invalid job lifecycle callback name.'); } From 3b4ec2be6e88e41bc9d92696f7ac1cae3d141b62 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 1 Sep 2023 22:23:02 -0400 Subject: [PATCH 12/35] isJobRunnable takes full job now --- Models/Worker.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Models/Worker.js b/Models/Worker.js index 78c1f13..6a648a8 100644 --- a/Models/Worker.js +++ b/Models/Worker.js @@ -123,7 +123,7 @@ export default class Worker { const isJobRunnable = Worker.workers[jobName].options.isJobRunnable; if (isJobRunnable && typeof isJobRunnable === 'function') { - return isJobRunnable(job.id, JSON.parse(job.payload)); + return isJobRunnable(job); }; return { runnable: true }; From dfa6d5ed6c7e31790378102475abfd5f478e840d Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 1 Sep 2023 22:37:40 -0400 Subject: [PATCH 13/35] Updated readme --- README.md | 65 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 8fd8a5c..5a34eb8 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,8 @@ A React Native at-least-once priority job queue / task queue backed by persisten ## Features * **Simple API:** Set up job workers and begin creating your jobs in minutes with just two basic API calls - * queue.addWorker(name, workerFunction, options = {}) - * queue.createJob(name, payload = {}, options = {}, startQueue = true) + * queue.addWorker(name, workerFunction, options = {}) + * queue.createJob(name, payload = {}, options = {}, startQueue = true) * **Powerful options:** Easily modify default functionality. Set job timeouts, number of retry attempts, priority, job lifecycle callbacks, and worker concurrency with an options object. Start queue processing with a lifespan to easily meet OS background task time limits. * **Persistent Jobs:** Jobs are persisted with Realm. Because jobs persist, you can easily continue to process jobs across app restarts or in OS background tasks until completed or failed (or app is uninstalled). * **Powerful Integrations:** React Native Queue was designed to play well with others. The queue quickly integrates with a variety of OS background task and Worker packages so processing your jobs in a background service or dedicated thread have never been easier. @@ -36,7 +36,7 @@ A React Native at-least-once priority job queue / task queue backed by persisten ## React Native Compatibility At the core this package leverages [Realm](https://github.com/realm/realm-js/blob/main/COMPATIBILITY.md) which maintains its own compatibility. This produces -an interesting problem as we depend on a package which enforces React Native compatibility, but peer to react native. +an interesting problem as we depend on a package which enforces React Native compatibility, but peer to react native. This means it's very crucial to respect to select the proper version and respect the peering. @@ -82,7 +82,7 @@ $ yarn add @sourcetoad/react-native-queue React Native Queue is a standard job/task queue built specifically for react native applications. If you have a long-running task, or a large number of tasks, consider turning that task into a job(s) and throwing it/them onto the queue to be processed in the background instead of blocking your UI until task(s) complete. Creating and processing jobs consists of: - + 1. Importing and initializing React Native Queue 2. Registering worker functions (the functions that execute your jobs). 3. Creating jobs. @@ -91,7 +91,7 @@ Creating and processing jobs consists of: ```js import queueFactory from '@sourcetoad/react-native-queue'; -// Of course this line needs to be in the context of an async function, +// Of course this line needs to be in the context of an async function, // otherwise use queueFactory.then((queue) => { console.log('add workers and jobs here'); }); const queue = await queueFactory(); @@ -99,7 +99,7 @@ const queue = await queueFactory(); queue.addWorker('example-job', async (id, payload) => { console.log('EXECUTING "example-job" with id: ' + id); console.log(payload, 'payload'); - + await new Promise((resolve) => { setTimeout(() => { console.log('"example-job" has completed!'); @@ -112,7 +112,7 @@ queue.addWorker('example-job', async (id, payload) => { // Example job passes a payload of data to 'example-job' worker. // Default settings are used (note the empty options object). -// Because false is passed, the queue won't automatically start when this job is created, so usually queue.start() +// Because false is passed, the queue won't automatically start when this job is created, so usually queue.start() // would have to be manually called. However in the final createJob() below we don't pass false so it will start the queue. // NOTE: We pass false for example purposes. In most scenarios starting queue on createJob() is perfectly fine. queue.createJob('example-job', { @@ -157,43 +157,70 @@ queue.addWorker() accepts an options object in order to tweak standard functiona ```js queue.addWorker('job-name-here', async (id, payload) => { console.log(id); }, { - + // Set max number of jobs for this worker to process concurrently. // Defaults to 1. concurrency: 5, - + + // Sets the behavior of failures on this worker. Possible values are: standard | custom + // standard: If a job fails more than the maximum number of attempts, it will be marked as failed. + // custom: If a job fails more than the maximum number of attempts, it will be retried if the job + // Defaults to standard. + failureBehavior: 'standard', + + // Set min number of milliseconds to wait before for this worker will perform another attempt. + // Defaults to 0. + minimumMillisBetweenAttempts: 1 * 1000, + + // A function that determines if a job is runnable. If the job is not runnable, it will be skipped. + // This function should return an object with a "runnable" boolean property and a "reason" string property. + // If the job is not runnable, the reason will be passed to the onSkipped callback payload as skippedReason. + // Defaults to a function that always returns true. + isJobRunnable: (job) => { + // In this example we will only allow jobs to run if they are at least 15 seconds old. + let reason; + const diffSec = Math.floor((new Date().getTime() - new Date(job?.created).getTime()) / 1000); + if (diffSec < 15) reason = `Must be at least 15 seconds old. Currently ${diffSec} seconds old.`; + return { runnable: diffSec >= 15, reason }; + }, + // JOB LIFECYCLE CALLBACKS - + // onStart job callback handler is fired when a job begins processing. // - // IMPORTANT: Job lifecycle callbacks are executed asynchronously and do not block job processing + // IMPORTANT: Job lifecycle callbacks are executed asynchronously and do not block job processing // (even if the callback returns a promise it will not be "awaited" on). // As such, do not place any logic in onStart that your actual job worker function will depend on, // this type of logic should of course go inside the job worker function itself. + onSkipped: async (id, payload) => { + const { skippedReason } = payload; + console.log('Job "job-name-here" with id ' + id + ' has been skipped because ' + skippedReason); + }, + onStart: async (id, payload) => { console.log('Job "job-name-here" with id ' + id + ' has started processing.'); }, - + // onSuccess job callback handler is fired after a job successfully completes processing. onSuccess: async (id, payload) => { console.log('Job "job-name-here" with id ' + id + ' was successful.'); }, - + // onFailure job callback handler is fired after each time a job fails (onFailed also fires if job has reached max number of attempts). onFailure: async (id, payload) => { console.log('Job "job-name-here" with id ' + id + ' had an attempt end in failure.'); }, - + // onFailed job callback handler is fired if job fails enough times to reach max number of attempts. onFailed: async (id, payload) => { console.log('Job "job-name-here" with id ' + id + ' has failed.'); }, - + // onComplete job callback handler fires after job has completed processing successfully or failed entirely. onComplete: async (id, payload) => { console.log('Job "job-name-here" with id ' + id + ' has completed processing.'); } -}); +}); ``` @@ -207,13 +234,13 @@ queue.createJob('job-name-here', {foo: 'bar'}, { // Any int will work, priority 1000 will be processed before priority 10, though this is probably overkill. // Defaults to 0. priority: 10, // High priority - + // Timeout in ms before job is considered failed. // Use this setting to kill off hanging jobs that are clogging up // your queue, or ensure your jobs finish in a timely manner if you want // to execute jobs in OS background tasks. // - // IMPORTANT: Jobs are required to have a timeout > 0 set in order to be processed + // IMPORTANT: Jobs are required to have a timeout > 0 set in order to be processed // by a queue that has been started with a lifespan. As such, if you want to process // jobs in an OS background task, you MUST give the jobs a timeout setting. // @@ -221,7 +248,7 @@ queue.createJob('job-name-here', {foo: 'bar'}, { // // Defaults to 25000. timeout: 30000, // Timeout in 30 seconds - + // Number of times to attempt a failing job before marking job as failed and moving on. // Defaults to 1. attempts: 4, // If this job fails to process 4 times in a row, it will be marked as failed. From dfa4b5101095259ff7aa7f50d8ea97c2658382dd Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 1 Sep 2023 23:26:43 -0400 Subject: [PATCH 14/35] Removed notes --- notes.js | 30 ------------------------------ 1 file changed, 30 deletions(-) delete mode 100644 notes.js diff --git a/notes.js b/notes.js deleted file mode 100644 index 2eacc79..0000000 --- a/notes.js +++ /dev/null @@ -1,30 +0,0 @@ -/* - -When we grab a job off of the realm jobs db, it gets marked as active. When -we set it to active, also set a session id to a unique UUID from that instance -of calling queue.start(). - -In getConcurrentJobs(), we should only grab jobs that do are active = false and -sessionId = null. This will prevent us from grabbing jobs that are already -active in another queue instance. - - -Send status change to server: -send breadcrumb trail of all status changes to server. - - -*/ -try { - if (await job.dependenciesMet(job)) { - await this.worker.executeJob(job); - } else { - this.realm.write(() => { - this.realm.write((job.active = false)); - }); - return; - } -} catch (error) { - this.realm.write(() => { - this.realm.write((job.active = false)); - }); -} From ffbeae3b1bb0c26d1b6fd166f3282a87485bab2b Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 1 Sep 2023 23:32:21 -0400 Subject: [PATCH 15/35] Removed the unused session field --- Models/Queue.js | 56 ++++++---------------------------------------- config/Database.js | 1 - 2 files changed, 7 insertions(+), 50 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index c7c8edc..3443faf 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -101,8 +101,7 @@ export class Queue { timeout: (options.timeout >= 0) ? options.timeout : 25000, created: new Date(), lastFailed: null, - failed: null, - session: null + failed: null }); }); @@ -147,14 +146,13 @@ export class Queue { const startTime = Date.now(); let lifespanRemaining = null; let concurrentJobs = []; - let session = uuid.v4(); if (lifespan !== 0) { lifespanRemaining = lifespan - (Date.now() - startTime); lifespanRemaining = (lifespanRemaining === 0) ? -1 : lifespanRemaining; // Handle exactly zero lifespan remaining edge case. - concurrentJobs = await this.getConcurrentJobs(session, lifespanRemaining); + concurrentJobs = await this.getConcurrentJobs(lifespanRemaining); } else { - concurrentJobs = await this.getConcurrentJobs(session); + concurrentJobs = await this.getConcurrentJobs(); } while (this.status === 'active' && concurrentJobs.length) { @@ -171,9 +169,9 @@ export class Queue { if (lifespan !== 0) { lifespanRemaining = lifespan - (Date.now() - startTime); lifespanRemaining = (lifespanRemaining === 0) ? -1 : lifespanRemaining; // Handle exactly zero lifespan remaining edge case. - concurrentJobs = await this.getConcurrentJobs(session, lifespanRemaining); + concurrentJobs = await this.getConcurrentJobs(lifespanRemaining); } else { - concurrentJobs = await this.getConcurrentJobs(session); + concurrentJobs = await this.getConcurrentJobs(); } } @@ -220,46 +218,14 @@ export class Queue { * worker function that has concurrency X > 1, then X related (jobs with same name) * jobs will be returned. * - * If queue is running with a lifespan, only jobs with timeouts at least 500ms < than REMAINING lifespan - * AND a set timeout (ie timeout > 0) will be returned. See Queue.start() for more info. - * - * @param session {uuid} - The unique ID of the queue.start() instance. * @param queueLifespanRemaining {number} - The remaining lifespan of the current queue process (defaults to indefinite). * @return {promise} - Promise resolves to an array of job(s) to be processed next by the queue. */ - async getConcurrentJobs(session, queueLifespanRemaining = 0) { + async getConcurrentJobs(queueLifespanRemaining = 0) { let concurrentJobs = []; const workersArr = this.worker.getWorkersAsArray(); - const workersArrToJSON = () => { - return workersArr.map(worker => { - let ret = {}; - const { - concurrency, - isJobRunnable, - failureBehavior, - minimumMillisBetweenAttempts, - onStart, - onSuccess, - onFailure, - onFailed, - onComplete } = worker; - if (concurrency) ret.concurrency = typeof concurrency == 'number' ? concurrency : 'undefined'; - if (isJobRunnable) ret.isJobRunnable = typeof isJobRunnable == 'function' ? 'isJobRunnable()' : 'undefined'; - if (failureBehavior) ret.failureBehavior = typeof failureBehavior == 'string' ? failureBehavior : 'undefined'; - if (minimumMillisBetweenAttempts) ret.minimumMillisBetweenAttempts = typeof minimumMillisBetweenAttempts == 'number' ? minimumMillisBetweenAttempts : 'undefined'; - if (onStart) ret.onStart = typeof onStart == 'function' ? 'onStart()' : 'undefined'; - if (onSuccess) ret.onSuccess = typeof onSuccess == 'function' ? 'onSuccess()' : 'undefined'; - if (onFailure) ret.onFailure = typeof onFailure == 'function' ? 'onFailure()' : 'undefined'; - if (onFailed) ret.onFailed = typeof onFailed == 'function' ? 'onFailed()' : 'undefined'; - if (onComplete) ret.onComplete = typeof onComplete == 'function' ? 'onComplete()' : 'undefined'; - return ret; - }); - }; - - - //console.log(`[RNQ] Found ${workersArr.length} workers to process...\n${JSON.stringify(workersArrToJSON(), null, 2)}`); - if (workersArr.length === 0) return concurrentJobs; + if (workersArr.length === 0) return []; this.realm.write(() => { // Get next job from queue. @@ -314,8 +280,6 @@ export class Queue { .filtered(initialQuery) .sorted([['priority', true], ['created', false]])); - console.log(`[RNQ] Initially found ${jobs?.length} jobs to process.`); - if (jobs.length) { nextJob = jobs[0]; } @@ -365,8 +329,6 @@ export class Queue { } } - console.log(`[RNQ] Only ${runnableJobs?.length} of those jobs are runnable.`); - let jobsToMarkActive = runnableJobs.slice(0, concurrency); // Grab concurrent job ids to reselect jobs as marking these jobs as active will remove @@ -377,7 +339,6 @@ export class Queue { // Mark concurrent jobs as active jobsToMarkActive = jobsToMarkActive.map( job => { job.active = true; - job.session = session; }); // Reselect now-active concurrent jobs by id. @@ -387,9 +348,6 @@ export class Queue { .filtered(reselectQuery) .sorted([['priority', true], ['created', false]])); concurrentJobs = reselectedJobs.slice(0, concurrency); - console.log(`[RNQ] Found ${concurrentJobs?.length} concurrent jobs to process.`); - } else { - console.log(`[RNQ] No jobs to process.`); } } }); diff --git a/config/Database.js b/config/Database.js index c1b6702..8840141 100644 --- a/config/Database.js +++ b/config/Database.js @@ -18,7 +18,6 @@ const JobSchema = { created: 'date', // Job creation timestamp. failed: 'date?', // Job failure timestamp (null until failure). lastFailed: 'date?', // Last job failure timestamp (set after each failed attempt). - session: 'string?', // Session UUID for queue.start() instance that pulled the job in. } }; From 628bc1f796fb354dd7384f8d456af27c9bb8c44a Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 1 Sep 2023 23:36:17 -0400 Subject: [PATCH 16/35] Fixed default isJobRunnable sig --- Models/Worker.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Models/Worker.js b/Models/Worker.js index 6a648a8..dff5cc3 100644 --- a/Models/Worker.js +++ b/Models/Worker.js @@ -36,7 +36,7 @@ export default class Worker { throw new Error('Job name and associated worker function must be supplied.'); } - const defaultIsJobRunnable = async (id, payload) => true; + const defaultIsJobRunnable = (job) => true; // Attach options to worker worker.options = { From 5f222023f8708f760168a30acfc527dc9320d4b7 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 1 Sep 2023 23:44:44 -0400 Subject: [PATCH 17/35] Fixed default runnable function --- Models/Worker.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Models/Worker.js b/Models/Worker.js index dff5cc3..834a27f 100644 --- a/Models/Worker.js +++ b/Models/Worker.js @@ -36,7 +36,7 @@ export default class Worker { throw new Error('Job name and associated worker function must be supplied.'); } - const defaultIsJobRunnable = (job) => true; + const defaultIsJobRunnable = (job) => ({ runnable: true }); // Attach options to worker worker.options = { From ccd7104b01f0c0a35116d64883f61631eec51f95 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Mon, 4 Sep 2023 09:38:56 -0400 Subject: [PATCH 18/35] Added get job function --- Models/Queue.js | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/Models/Queue.js b/Models/Queue.js index 3443faf..73adadd 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -190,6 +190,26 @@ export class Queue { this.status = 'inactive'; } + /** + * + * Get a job by id from the queue. + * + * @param sync {boolean} - This should be true if you want to guarantee job data is fresh. Otherwise you could receive job data that is not up to date if a write transaction is occuring concurrently. + * @return {promise} - Promise that resolves to a collection of all the jobs in the queue. + */ + async getJob(id, sync = false) { + if (sync) { + let job = null; + this.realm.write(() => { + job = this.realm.object('Job', id); + }); + + return job; + } else { + return await this.realm.objects('Job', id); + } + } + /** * * Get a collection of all the jobs in the queue. From 6b8cb70eac9596b8d8297dbd49223207a1b10618 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Mon, 4 Sep 2023 10:05:04 -0400 Subject: [PATCH 19/35] Fixed getJob --- Models/Queue.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index 73adadd..d467d59 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -201,12 +201,12 @@ export class Queue { if (sync) { let job = null; this.realm.write(() => { - job = this.realm.object('Job', id); + job = this.realm.objectForPrimaryKey('Job', id); }); return job; } else { - return await this.realm.objects('Job', id); + return await this.realm.objectForPrimaryKey('Job', id); } } From aa290fc55ae0f44955258ea7f706c744bc25a514 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Mon, 4 Sep 2023 18:56:50 -0400 Subject: [PATCH 20/35] Status change observer --- Models/Queue.js | 43 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index d467d59..f6710bb 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -21,6 +21,7 @@ export class Queue { this.realm = null; this.worker = new Worker(); this.status = 'inactive'; + this.statusChangeObserver = null; } /** @@ -67,6 +68,42 @@ export class Queue { this.worker.removeWorker(jobName); } + /** + * Listen for changes in the queue status (starting and stopping). This method + * returns a unsubscribe function to stop listening to events. Always ensure you + * unsubscribe from the listener when no longer needed to prevent updates to + * components no longer in use. + * + * #### Example + * + * ```js + * const unsubscribe = queue.onQueueStateChanged((user) => { + * if (user) { + * // Signed in + * } else { + * // Signed out + * } + * }); + * + * // Unsubscribe from further state changes + * unsubscribe(); + * ``` + * + * @param listener A listener function which triggers when auth state changed (for example signing out). + */ + onQueueStateChanged(listener) { + this.statusChangeObserver = listener; + return () => {this.statusChangeObserver = null}; + } + + changeStatus(status) { + this.status = status; + if (this.statusChangeObserver) { + this.statusChangeObserver(status); + } + } + + /** * * Creates a new job and adds it to queue. @@ -140,7 +177,7 @@ export class Queue { return false; } - this.status = 'active'; + this.changeStatus('active'); // Get jobs to process const startTime = Date.now(); @@ -175,7 +212,7 @@ export class Queue { } } - this.status = 'inactive'; + this.changeStatus('inactive'); } /** @@ -187,7 +224,7 @@ export class Queue { * */ stop() { - this.status = 'inactive'; + this.changeStatus('inactive'); } /** From 01c7dea7ed88d218db2052d6d00937e157c156bb Mon Sep 17 00:00:00 2001 From: RobNewton Date: Mon, 4 Sep 2023 18:59:05 -0400 Subject: [PATCH 21/35] Corrected comments --- Models/Queue.js | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index f6710bb..59f8d06 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -77,19 +77,15 @@ export class Queue { * #### Example * * ```js - * const unsubscribe = queue.onQueueStateChanged((user) => { - * if (user) { - * // Signed in - * } else { - * // Signed out - * } + * const unsubscribe = queue.onQueueStateChanged((state) => { + * console.log(`Queue state changed to ${state}`); * }); * * // Unsubscribe from further state changes * unsubscribe(); * ``` * - * @param listener A listener function which triggers when auth state changed (for example signing out). + * @param listener A listener function which triggers when queue status changed (for example starting). */ onQueueStateChanged(listener) { this.statusChangeObserver = listener; From 5ec4e45ab85cbce95c285963c875018a13bb0572 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Tue, 5 Sep 2023 09:42:18 -0400 Subject: [PATCH 22/35] Added some comments --- Models/Queue.js | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Models/Queue.js b/Models/Queue.js index 59f8d06..bffd413 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -92,6 +92,13 @@ export class Queue { return () => {this.statusChangeObserver = null}; } + /** + * A simple wrapper for setting the status of the queue and notifying any + * listeners of the change. + * + * @private + * @param {string} status + */ changeStatus(status) { this.status = status; if (this.statusChangeObserver) { @@ -99,7 +106,6 @@ export class Queue { } } - /** * * Creates a new job and adds it to queue. From 6d3108feee6991d0e35fa9cd6ec1808c1b2ecfbc Mon Sep 17 00:00:00 2001 From: RobNewton Date: Thu, 7 Sep 2023 11:36:31 -0400 Subject: [PATCH 23/35] onQueueJobChanged event handler --- Models/Queue.js | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/Models/Queue.js b/Models/Queue.js index bffd413..08b9abb 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -92,6 +92,38 @@ export class Queue { return () => {this.statusChangeObserver = null}; } + /** + * Listen for changes in the jobs collection such as jobs changing status. This method + * returns a unsubscribe function to stop listening to events. Always ensure you + * unsubscribe from the listener when no longer needed to prevent updates to + * components no longer in use. + * + * #### Example + * + * ```js + * const unsubscribe = queue.onQueueJobChanged(() => { + * console.log(`A job changed!`); + * }); + * + * // Unsubscribe from further state changes + * unsubscribe(); + * ``` + * + * @param listener A listener function which triggers when jobs collection changed. + */ + onQueueJobChanged(listener) { + // Add the listener callback to the realm + try { + this.realm.addListener("change", listener); + } catch (error) { + console.error( + `An exception was thrown within the react native queue change listener: ${error}` + ); + } + + return () => { this.realm.removeListener("change", listener); }; + } + /** * A simple wrapper for setting the status of the queue and notifying any * listeners of the change. From 74b4fc784104a5720ff34ed98cd0ee8854a9011e Mon Sep 17 00:00:00 2001 From: RobNewton Date: Thu, 7 Sep 2023 15:35:46 -0400 Subject: [PATCH 24/35] Added skip tracking --- Models/Queue.js | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/Models/Queue.js b/Models/Queue.js index 08b9abb..5d73400 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -414,8 +414,26 @@ export class Queue { if (runnable) { runnableJobs.push(job); } else { - // Fire onSkipped job lifecycle callback const jobPayload = JSON.parse(job.payload); + let jobData = JSON.parse(job.data); + + // Increment failed attempts number + if (!jobData.skippedAttempts) { + jobData.skippedAttempts = 1; + } else { + jobData.skippedAttempts++; + } + + // Log skipped reasons + if (!jobData.skippedReasons) { + jobData.skippedReasons = [reason]; + } else { + jobData.skippedReasons.push(reason); + } + + job.data = JSON.stringify(jobData); + + // Fire onSkipped job lifecycle callback this.worker.executeJobLifecycleCallback('onSkipped', job.name, job.id, {...jobPayload, skippedReason: reason}); } } From 6e71ac5b43b1763f4598e641d6eb54c187a681e3 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 8 Sep 2023 18:20:00 -0400 Subject: [PATCH 25/35] Updated package name prefix --- README.md | 10 +++++----- package.json | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 5a34eb8..71d2c63 100644 --- a/README.md +++ b/README.md @@ -68,13 +68,13 @@ Need advanced task functionality like dedicated worker threads or OS services? E ## Installation ```bash -$ npm install --save @sourcetoad/react-native-queue +$ npm install --save hopdrive/react-native-queue ``` Or ```bash -$ yarn add @sourcetoad/react-native-queue +$ yarn add hopdrive/react-native-queue ``` ## Basic Usage @@ -89,7 +89,7 @@ Creating and processing jobs consists of: 4. Starting the queue (note this happens automatically on job creation, but sometimes the queue must be explicitly started such as in a OS background task or on app restart). Queue can be started with a lifespan in order to limit queue processing time. ```js -import queueFactory from '@sourcetoad/react-native-queue'; +import queueFactory from 'hopdrive/react-native-queue'; // Of course this line needs to be in the context of an async function, // otherwise use queueFactory.then((queue) => { console.log('add workers and jobs here'); }); @@ -281,7 +281,7 @@ import { Button } from 'react-native'; -import queueFactory from '@sourcetoad/react-native-queue'; +import queueFactory from 'hopdrive/react-native-queue'; export default class App extends Component<{}> { @@ -452,7 +452,7 @@ import { } from 'react-native'; import BackgroundTask from 'react-native-background-task' -import queueFactory from '@sourcetoad/react-native-queue'; +import queueFactory from 'hopdrive/react-native-queue'; BackgroundTask.define(async () => { diff --git a/package.json b/package.json index 04c2dee..8fa5538 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "@sourcetoad/react-native-queue", + "name": "hopdrive/react-native-queue", "version": "2.3.0", "description": "A React Native Job Queue", "main": "index.js", From f79974c6cf63aa235c54fae0848cd5401cac7bf8 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 8 Sep 2023 18:23:24 -0400 Subject: [PATCH 26/35] Try that again --- README.md | 14 +++++++------- package.json | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 71d2c63..c49e24a 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,8 @@ _Forked from [billmalarky/react-native-queue](https://github.com/billmalarky/rea #### Simple. Powerful. Persistent. -[![Node.js CI](https://github.com/sourcetoad/react-native-queue/actions/workflows/build.yml/badge.svg)](https://github.com/sourcetoad/react-native-queue/actions/workflows/build.yml) -[![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/sourcetoad/react-native-queue/blob/master/LICENSE) +[![Node.js CI](https://github.com/hopdrive/react-native-queue/actions/workflows/build.yml/badge.svg)](https://github.com/hopdrive/react-native-queue/actions/workflows/build.yml) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/hopdrive/react-native-queue/blob/master/LICENSE) A React Native at-least-once priority job queue / task queue backed by persistent Realm storage. Jobs will persist until completed, even if user closes and re-opens app. React Native Queue is easily integrated into OS background processes (services) so you can ensure the queue will continue to process until all jobs are completed even if app isn't in focus. It also plays well with Workers so your jobs can be thrown on the queue, then processed in dedicated worker threads for greatly improved processing performance. @@ -68,13 +68,13 @@ Need advanced task functionality like dedicated worker threads or OS services? E ## Installation ```bash -$ npm install --save hopdrive/react-native-queue +$ npm install --save @hopdrive/react-native-queue ``` Or ```bash -$ yarn add hopdrive/react-native-queue +$ yarn add @hopdrive/react-native-queue ``` ## Basic Usage @@ -89,7 +89,7 @@ Creating and processing jobs consists of: 4. Starting the queue (note this happens automatically on job creation, but sometimes the queue must be explicitly started such as in a OS background task or on app restart). Queue can be started with a lifespan in order to limit queue processing time. ```js -import queueFactory from 'hopdrive/react-native-queue'; +import queueFactory from '@hopdrive/react-native-queue'; // Of course this line needs to be in the context of an async function, // otherwise use queueFactory.then((queue) => { console.log('add workers and jobs here'); }); @@ -281,7 +281,7 @@ import { Button } from 'react-native'; -import queueFactory from 'hopdrive/react-native-queue'; +import queueFactory from '@hopdrive/react-native-queue'; export default class App extends Component<{}> { @@ -452,7 +452,7 @@ import { } from 'react-native'; import BackgroundTask from 'react-native-background-task' -import queueFactory from 'hopdrive/react-native-queue'; +import queueFactory from '@hopdrive/react-native-queue'; BackgroundTask.define(async () => { diff --git a/package.json b/package.json index 8fa5538..f9a1838 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "hopdrive/react-native-queue", + "name": "@hopdrive/react-native-queue", "version": "2.3.0", "description": "A React Native Job Queue", "main": "index.js", @@ -18,7 +18,7 @@ }, "repository": { "type": "git", - "url": "https://github.com/sourcetoad/react-native-queue/react-native-queue.git" + "url": "https://github.com/hopdrive/react-native-queue/react-native-queue.git" }, "keywords": [ "react", @@ -30,9 +30,9 @@ "author": "Reid Mayo", "license": "MIT", "bugs": { - "url": "https://github.com/sourcetoad/react-native-queue/issues" + "url": "https://github.com/hopdrive/react-native-queue/issues" }, - "homepage": "https://github.com/sourcetoad/react-native-queue#readme", + "homepage": "https://github.com/hopdrive/react-native-queue#readme", "dependencies": { "promise-reflect": "^1.1.0", "react-native-uuid": "^2.0.1", From 8ae025a46da0c701797485401b5bf6f57550becf Mon Sep 17 00:00:00 2001 From: RobNewton Date: Fri, 8 Sep 2023 19:14:25 -0400 Subject: [PATCH 27/35] Added publish registry of github @hopdrive --- package.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/package.json b/package.json index f9a1838..7025ac1 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,9 @@ "version": "2.3.0", "description": "A React Native Job Queue", "main": "index.js", + "publishConfig": { + "registry": "https://npm.pkg.github.com/@hopdrive" + }, "files": [ "config", "Models", From ea77439a93fdee6b1569f0da7cefe53d64f3885f Mon Sep 17 00:00:00 2001 From: RobNewton Date: Thu, 14 Sep 2023 12:48:24 -0400 Subject: [PATCH 28/35] Added get workers as array --- Models/Queue.js | 7 +++++++ package.json | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/Models/Queue.js b/Models/Queue.js index 5d73400..40ac4e2 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -68,6 +68,13 @@ export class Queue { this.worker.removeWorker(jobName); } + /** + * Get all of the registered workers. + */ + getWorkersAsArray() { + return this.worker.getWorkersAsArray(); + } + /** * Listen for changes in the queue status (starting and stopping). This method * returns a unsubscribe function to stop listening to events. Always ensure you diff --git a/package.json b/package.json index 7025ac1..e118a36 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@hopdrive/react-native-queue", - "version": "2.3.0", + "version": "2.3.1", "description": "A React Native Job Queue", "main": "index.js", "publishConfig": { From a534c9e6a767030be44d7e10f97e68af8fff758b Mon Sep 17 00:00:00 2001 From: dartushd Date: Tue, 24 Oct 2023 14:47:37 -0400 Subject: [PATCH 29/35] add delete job function --- Models/Queue.js | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/Models/Queue.js b/Models/Queue.js index 99e2ec7..66510df 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -363,6 +363,25 @@ export class Queue { } } + /** + * Delete a job from the queue. + * + * @param jobId {string} - Unique id associated with job. + * + */ + + deleteJob(jobId) { + this.realm.write(() => { + let job = this.realm.objects('Job').filtered('id == "' + jobId + '"'); + + if (job.length) { + this.realm.delete(job); + } else { + throw new Error('Job ' + jobId + ' does not exist.'); + } + }); + } + /** * * Delete jobs in the queue. From bda0fb24413d1473262c743e845ec83d0d14f67c Mon Sep 17 00:00:00 2001 From: dartushd Date: Fri, 10 Nov 2023 19:25:38 -0500 Subject: [PATCH 30/35] Add delete all failed job helper --- Models/Queue.js | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/Models/Queue.js b/Models/Queue.js index 66510df..eb6e437 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -382,6 +382,25 @@ export class Queue { }); } + /** + * + * Delete all failed jobs from the queue. + * + * + */ + + deleteAllFailedJobs() { + this.realm.write(() => { + let jobs = Array.from(this.realm.objects('Job') + .filtered('failed != null')); + + if (jobs.length) { + this.realm.delete(jobs); + } + }); + } + + /** * * Delete jobs in the queue. From a096a4d67ef4aba4d7597d761287924891a0de46 Mon Sep 17 00:00:00 2001 From: dartushd Date: Tue, 12 Dec 2023 11:05:03 -0500 Subject: [PATCH 31/35] Update package version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 772a1b3..04c2dee 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@sourcetoad/react-native-queue", - "version": "2.2.0", + "version": "2.3.0", "description": "A React Native Job Queue", "main": "index.js", "files": [ From b2bab104f94df4e7b7762bb733cd708e7ad2f27f Mon Sep 17 00:00:00 2001 From: dartushd Date: Tue, 12 Dec 2023 11:08:38 -0500 Subject: [PATCH 32/35] Update version to 2.4.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 04c2dee..ec50429 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@sourcetoad/react-native-queue", - "version": "2.3.0", + "version": "2.4.0", "description": "A React Native Job Queue", "main": "index.js", "files": [ From 98d5fd1d84b88fae98838c02fca8c295e55e6957 Mon Sep 17 00:00:00 2001 From: dartushd Date: Tue, 12 Dec 2023 11:24:07 -0500 Subject: [PATCH 33/35] Update version 2.4.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 772a1b3..ec50429 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@sourcetoad/react-native-queue", - "version": "2.2.0", + "version": "2.4.0", "description": "A React Native Job Queue", "main": "index.js", "files": [ From 1615851609eaecea3b8d2cedf5e83f6d475fad31 Mon Sep 17 00:00:00 2001 From: dartushd Date: Tue, 12 Dec 2023 11:28:19 -0500 Subject: [PATCH 34/35] weird name change --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index a31e215..a1aaa85 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "@sourcetoad/react-native-queue", + "name": "@hopdrive/react-native-queue", "version": "2.4.0", "description": "A React Native Job Queue", "main": "index.js", From c50a23e4757a02592b6f9c2f8e6aad3b293d02f9 Mon Sep 17 00:00:00 2001 From: RobNewton Date: Sat, 30 Dec 2023 19:07:46 -0500 Subject: [PATCH 35/35] Upgraded package to access realm path --- Models/Queue.js | 20 +++++++++++--------- package.json | 2 +- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/Models/Queue.js b/Models/Queue.js index 873a135..b714723 100644 --- a/Models/Queue.js +++ b/Models/Queue.js @@ -28,10 +28,12 @@ export class Queue { * * Initializes the queue by connecting to Realm database. * + * Specify an optional options object to override the default realmPath. + * */ - async init() { + async init(options = {}) { if (this.realm === null) { - this.realm = await Database.getRealmInstance(); + this.realm = await Database.getRealmInstance(options); } } @@ -568,9 +570,9 @@ export class Queue { /** * Delete a job from the queue. - * + * * @param jobId {string} - Unique id associated with job. - * + * */ deleteJob(jobId) { @@ -586,10 +588,10 @@ export class Queue { } /** - * + * * Delete all failed jobs from the queue. - * - * + * + * */ deleteAllFailedJobs() { @@ -637,9 +639,9 @@ export class Queue { * * @return {Queue} - A queue instance. */ -export default async function queueFactory() { +export default async function queueFactory(options = {}) { const queue = new Queue(); - await queue.init(); + await queue.init(options); return queue; } diff --git a/package.json b/package.json index a1aaa85..801669e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@hopdrive/react-native-queue", - "version": "2.4.0", + "version": "2.4.1", "description": "A React Native Job Queue", "main": "index.js", "publishConfig": {