From 68c6a4c46135abdac26384efb5c83aa0bfbc0d21 Mon Sep 17 00:00:00 2001 From: dblythy Date: Wed, 23 Nov 2022 17:39:12 +1100 Subject: [PATCH 01/12] fix: update LiveQuery publisher to redis 4 --- spec/ParseLiveQueryRedis.spec.js | 33 ++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 spec/ParseLiveQueryRedis.spec.js diff --git a/spec/ParseLiveQueryRedis.spec.js b/spec/ParseLiveQueryRedis.spec.js new file mode 100644 index 0000000000..c9c8110766 --- /dev/null +++ b/spec/ParseLiveQueryRedis.spec.js @@ -0,0 +1,33 @@ +if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') { + describe('ParseLiveQuery redis', () => { + it('can connect', async () => { + await reconfigureServer({ + startLiveQueryServer: true, + liveQuery: { + classNames: ['TestObject'], + redisURL: 'redis://localhost:6379', + }, + liveQueryServerOptions: { + redisURL: 'redis://localhost:6379', + }, + }); + const subscription = await new Parse.Query('TestObject').subscribe(); + const [, object] = await Promise.all([ + new Promise(resolve => + subscription.on('create', () => { + resolve(); + }) + ), + new Parse.Object('TestObject').save(), + ]); + await Promise.all([ + new Promise(resolve => + subscription.on('delete', () => { + resolve(); + }) + ), + object.destroy(), + ]); + }); + }); +} From a7f65b3c8e01b22f1c03435d6a81a1ede5161f33 Mon Sep 17 00:00:00 2001 From: dblythy Date: Wed, 23 Nov 2022 17:51:53 +1100 Subject: [PATCH 02/12] fix --- src/Adapters/PubSub/RedisPubSub.js | 4 +- src/LiveQuery/ParseCloudCodePublisher.js | 7 ++- src/LiveQuery/ParseLiveQueryServer.js | 64 ++++++++++++++---------- 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/Adapters/PubSub/RedisPubSub.js b/src/Adapters/PubSub/RedisPubSub.js index 7a886d5236..cc2b3a9792 100644 --- a/src/Adapters/PubSub/RedisPubSub.js +++ b/src/Adapters/PubSub/RedisPubSub.js @@ -2,12 +2,12 @@ import { createClient } from 'redis'; function createPublisher({ redisURL, redisOptions = {} }): any { redisOptions.no_ready_check = true; - return createClient(redisURL, redisOptions); + return createClient({ url: redisURL, ...redisOptions }); } function createSubscriber({ redisURL, redisOptions = {} }): any { redisOptions.no_ready_check = true; - return createClient(redisURL, redisOptions); + return createClient({ url: redisURL, ...redisOptions }); } const RedisPubSub = { diff --git a/src/LiveQuery/ParseCloudCodePublisher.js b/src/LiveQuery/ParseCloudCodePublisher.js index 3ecd740e12..9eb738012c 100644 --- a/src/LiveQuery/ParseCloudCodePublisher.js +++ b/src/LiveQuery/ParseCloudCodePublisher.js @@ -8,7 +8,12 @@ class ParseCloudCodePublisher { // config object of the publisher, right now it only contains the redisURL, // but we may extend it later. constructor(config: any = {}) { - this.parsePublisher = ParsePubSub.createPublisher(config); + (async () => { + this.parsePublisher = ParsePubSub.createPublisher(config); + if (typeof this.parsePublisher.connect === 'function') { + await this.parsePublisher.connect(); + } + })(); } onCloudCodeAfterSave(request: any): void { diff --git a/src/LiveQuery/ParseLiveQueryServer.js b/src/LiveQuery/ParseLiveQueryServer.js index 02c6ad30a1..b0202abe48 100644 --- a/src/LiveQuery/ParseLiveQueryServer.js +++ b/src/LiveQuery/ParseLiveQueryServer.js @@ -75,34 +75,44 @@ class ParseLiveQueryServer { ); // Initialize subscriber - this.subscriber = ParsePubSub.createSubscriber(config); - this.subscriber.subscribe(Parse.applicationId + 'afterSave'); - this.subscriber.subscribe(Parse.applicationId + 'afterDelete'); - this.subscriber.subscribe(Parse.applicationId + 'clearCache'); - // Register message handler for subscriber. When publisher get messages, it will publish message - // to the subscribers and the handler will be called. - this.subscriber.on('message', (channel, messageStr) => { - logger.verbose('Subscribe message %j', messageStr); - let message; - try { - message = JSON.parse(messageStr); - } catch (e) { - logger.error('unable to parse message', messageStr, e); - return; - } - if (channel === Parse.applicationId + 'clearCache') { - this._clearCachedRoles(message.userId); - return; - } - this._inflateParseObject(message); - if (channel === Parse.applicationId + 'afterSave') { - this._onAfterSave(message); - } else if (channel === Parse.applicationId + 'afterDelete') { - this._onAfterDelete(message); - } else { - logger.error('Get message %s from unknown channel %j', message, channel); + (async () => { + this.subscriber = ParsePubSub.createSubscriber(config); + if (typeof this.subscriber.connect === 'function') { + await this.subscriber.connect(); } - }); + const getMessageJSON = messageStr => { + logger.verbose('Subscribe message %j', messageStr); + let message; + try { + message = JSON.parse(messageStr); + } catch (e) { + logger.error('unable to parse message', messageStr, e); + return; + } + return message; + }; + const runTrigger = (messageStr, triggerName) => { + console.log({ triggerName }); + const message = getMessageJSON(messageStr); + this._inflateParseObject(message); + if (message) { + const nameCapitalized = triggerName.charAt(0).toUpperCase() + triggerName.slice(1); + this[`_on${nameCapitalized}`](message); + } + }; + this.subscriber.subscribe(`${Parse.applicationId}afterSave`, message => + runTrigger(message, 'afterSave') + ); + this.subscriber.subscribe(`${Parse.applicationId}afterDelete`, message => + runTrigger(message, 'afterDelete') + ); + this.subscriber.subscribe(`${Parse.applicationId}clearCache`, messageStr => { + const message = getMessageJSON(messageStr); + if (message?.userId) { + this._clearCachedRoles(message.userId); + } + }); + })(); } // Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes. From 014317dafda3e6fc16bfdd9e47a06a288b417167 Mon Sep 17 00:00:00 2001 From: dblythy Date: Wed, 23 Nov 2022 18:18:47 +1100 Subject: [PATCH 03/12] Update ParseLiveQueryServer.js --- src/LiveQuery/ParseLiveQueryServer.js | 37 ++++++++++++--------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/src/LiveQuery/ParseLiveQueryServer.js b/src/LiveQuery/ParseLiveQueryServer.js index b0202abe48..ec5868c12f 100644 --- a/src/LiveQuery/ParseLiveQueryServer.js +++ b/src/LiveQuery/ParseLiveQueryServer.js @@ -80,7 +80,7 @@ class ParseLiveQueryServer { if (typeof this.subscriber.connect === 'function') { await this.subscriber.connect(); } - const getMessageJSON = messageStr => { + const messageRecieved = (channel, messageStr) => { logger.verbose('Subscribe message %j', messageStr); let message; try { @@ -89,29 +89,24 @@ class ParseLiveQueryServer { logger.error('unable to parse message', messageStr, e); return; } - return message; - }; - const runTrigger = (messageStr, triggerName) => { - console.log({ triggerName }); - const message = getMessageJSON(messageStr); + if (channel === Parse.applicationId + 'clearCache') { + this._clearCachedRoles(message.userId); + return; + } this._inflateParseObject(message); - if (message) { - const nameCapitalized = triggerName.charAt(0).toUpperCase() + triggerName.slice(1); - this[`_on${nameCapitalized}`](message); + if (channel === Parse.applicationId + 'afterSave') { + this._onAfterSave(message); + } else if (channel === Parse.applicationId + 'afterDelete') { + this._onAfterDelete(message); + } else { + logger.error('Get message %s from unknown channel %j', message, channel); } }; - this.subscriber.subscribe(`${Parse.applicationId}afterSave`, message => - runTrigger(message, 'afterSave') - ); - this.subscriber.subscribe(`${Parse.applicationId}afterDelete`, message => - runTrigger(message, 'afterDelete') - ); - this.subscriber.subscribe(`${Parse.applicationId}clearCache`, messageStr => { - const message = getMessageJSON(messageStr); - if (message?.userId) { - this._clearCachedRoles(message.userId); - } - }); + this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr)); + for (const field of ['afterSave', 'afterDelete', 'clearCache']) { + const channel = `${Parse.applicationId}${field}`; + this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr)); + } })(); } From 554581733fde843a7c6276d0de266804eeca4e2d Mon Sep 17 00:00:00 2001 From: dblythy Date: Wed, 23 Nov 2022 18:29:58 +1100 Subject: [PATCH 04/12] Update RedisPubSub.spec.js --- spec/RedisPubSub.spec.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spec/RedisPubSub.spec.js b/spec/RedisPubSub.spec.js index c7def313cd..b68448ef51 100644 --- a/spec/RedisPubSub.spec.js +++ b/spec/RedisPubSub.spec.js @@ -15,7 +15,8 @@ describe('RedisPubSub', function () { }); const redis = require('redis'); - expect(redis.createClient).toHaveBeenCalledWith('redisAddress', { + expect(redis.createClient).toHaveBeenCalledWith({ + url: 'redisAddress', socket_keepalive: true, no_ready_check: true, }); @@ -28,7 +29,8 @@ describe('RedisPubSub', function () { }); const redis = require('redis'); - expect(redis.createClient).toHaveBeenCalledWith('redisAddress', { + expect(redis.createClient).toHaveBeenCalledWith({ + url: 'redisAddress', socket_keepalive: true, no_ready_check: true, }); From babb8dce51444f48b24f8e93283d436048b933de Mon Sep 17 00:00:00 2001 From: dblythy Date: Wed, 23 Nov 2022 19:16:56 +1100 Subject: [PATCH 05/12] tests --- spec/ParseLiveQueryRedis.spec.js | 4 ++++ src/LiveQuery/ParseCloudCodePublisher.js | 2 +- src/LiveQuery/ParseLiveQueryServer.js | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/spec/ParseLiveQueryRedis.spec.js b/spec/ParseLiveQueryRedis.spec.js index c9c8110766..3fe7f00e60 100644 --- a/spec/ParseLiveQueryRedis.spec.js +++ b/spec/ParseLiveQueryRedis.spec.js @@ -1,5 +1,9 @@ if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') { describe('ParseLiveQuery redis', () => { + afterEach(async () => { + const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient(); + client.close(); + }); it('can connect', async () => { await reconfigureServer({ startLiveQueryServer: true, diff --git a/src/LiveQuery/ParseCloudCodePublisher.js b/src/LiveQuery/ParseCloudCodePublisher.js index 9eb738012c..39f9718f69 100644 --- a/src/LiveQuery/ParseCloudCodePublisher.js +++ b/src/LiveQuery/ParseCloudCodePublisher.js @@ -11,7 +11,7 @@ class ParseCloudCodePublisher { (async () => { this.parsePublisher = ParsePubSub.createPublisher(config); if (typeof this.parsePublisher.connect === 'function') { - await this.parsePublisher.connect(); + await Promise.resolve(this.parsePublisher.connect()); } })(); } diff --git a/src/LiveQuery/ParseLiveQueryServer.js b/src/LiveQuery/ParseLiveQueryServer.js index ec5868c12f..b2b1feba02 100644 --- a/src/LiveQuery/ParseLiveQueryServer.js +++ b/src/LiveQuery/ParseLiveQueryServer.js @@ -78,7 +78,7 @@ class ParseLiveQueryServer { (async () => { this.subscriber = ParsePubSub.createSubscriber(config); if (typeof this.subscriber.connect === 'function') { - await this.subscriber.connect(); + await Promise.resolve(this.subscriber.connect()); } const messageRecieved = (channel, messageStr) => { logger.verbose('Subscribe message %j', messageStr); From 23836109e00dd140625e6c0cfae3e3de66200f7a Mon Sep 17 00:00:00 2001 From: dblythy Date: Thu, 24 Nov 2022 11:04:35 +1100 Subject: [PATCH 06/12] add connection methods --- spec/helper.js | 24 +++++---- src/Controllers/LiveQueryController.js | 4 ++ src/LiveQuery/ParseCloudCodePublisher.js | 14 +++-- src/LiveQuery/ParseLiveQueryServer.js | 68 ++++++++++++------------ src/ParseServer.js | 20 ++++--- 5 files changed, 75 insertions(+), 55 deletions(-) diff --git a/spec/helper.js b/spec/helper.js index 1afa0fc24c..40df5e627e 100644 --- a/spec/helper.js +++ b/spec/helper.js @@ -173,17 +173,19 @@ const reconfigureServer = (changedConfiguration = {}) => { port, }); cache.clear(); - parseServer = ParseServer.start(newConfiguration); - parseServer.expressApp.use('/1', err => { - console.error(err); - fail('should not call next'); - }); - server = parseServer.server; - server.on('connection', connection => { - const key = `${connection.remoteAddress}:${connection.remotePort}`; - openConnections[key] = connection; - connection.on('close', () => { - delete openConnections[key]; + ParseServer.start(newConfiguration).then(_parseServer => { + parseServer = _parseServer; + parseServer.expressApp.use('/1', err => { + console.error(err); + fail('should not call next'); + }); + server = parseServer.server; + server.on('connection', connection => { + const key = `${connection.remoteAddress}:${connection.remotePort}`; + openConnections[key] = connection; + connection.on('close', () => { + delete openConnections[key]; + }); }); }); } catch (error) { diff --git a/src/Controllers/LiveQueryController.js b/src/Controllers/LiveQueryController.js index 9a5b6d0ef1..b3ee7fcf65 100644 --- a/src/Controllers/LiveQueryController.js +++ b/src/Controllers/LiveQueryController.js @@ -21,6 +21,10 @@ export class LiveQueryController { this.liveQueryPublisher = new ParseCloudCodePublisher(config); } + connect() { + return this.liveQueryPublisher.connect(); + } + onAfterSave( className: string, currentObject: any, diff --git a/src/LiveQuery/ParseCloudCodePublisher.js b/src/LiveQuery/ParseCloudCodePublisher.js index 39f9718f69..0e0dce1417 100644 --- a/src/LiveQuery/ParseCloudCodePublisher.js +++ b/src/LiveQuery/ParseCloudCodePublisher.js @@ -8,12 +8,16 @@ class ParseCloudCodePublisher { // config object of the publisher, right now it only contains the redisURL, // but we may extend it later. constructor(config: any = {}) { - (async () => { - this.parsePublisher = ParsePubSub.createPublisher(config); - if (typeof this.parsePublisher.connect === 'function') { - await Promise.resolve(this.parsePublisher.connect()); + this.parsePublisher = ParsePubSub.createPublisher(config); + } + + async connect() { + if (typeof this.parsePublisher.connect === 'function') { + if (this.parsePublisher.isOpen) { + return; } - })(); + return Promise.resolve(this.parsePublisher.connect()); + } } onCloudCodeAfterSave(request: any): void { diff --git a/src/LiveQuery/ParseLiveQueryServer.js b/src/LiveQuery/ParseLiveQueryServer.js index b2b1feba02..509c8cfa73 100644 --- a/src/LiveQuery/ParseLiveQueryServer.js +++ b/src/LiveQuery/ParseLiveQueryServer.js @@ -73,41 +73,43 @@ class ParseLiveQueryServer { parseWebsocket => this._onConnect(parseWebsocket), config ); - - // Initialize subscriber - (async () => { - this.subscriber = ParsePubSub.createSubscriber(config); - if (typeof this.subscriber.connect === 'function') { - await Promise.resolve(this.subscriber.connect()); + this.subscriber = ParsePubSub.createSubscriber(config); + const messageRecieved = (channel, messageStr) => { + logger.verbose('Subscribe message %j', messageStr); + let message; + try { + message = JSON.parse(messageStr); + } catch (e) { + logger.error('unable to parse message', messageStr, e); + return; } - const messageRecieved = (channel, messageStr) => { - logger.verbose('Subscribe message %j', messageStr); - let message; - try { - message = JSON.parse(messageStr); - } catch (e) { - logger.error('unable to parse message', messageStr, e); - return; - } - if (channel === Parse.applicationId + 'clearCache') { - this._clearCachedRoles(message.userId); - return; - } - this._inflateParseObject(message); - if (channel === Parse.applicationId + 'afterSave') { - this._onAfterSave(message); - } else if (channel === Parse.applicationId + 'afterDelete') { - this._onAfterDelete(message); - } else { - logger.error('Get message %s from unknown channel %j', message, channel); - } - }; - this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr)); - for (const field of ['afterSave', 'afterDelete', 'clearCache']) { - const channel = `${Parse.applicationId}${field}`; - this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr)); + if (channel === Parse.applicationId + 'clearCache') { + this._clearCachedRoles(message.userId); + return; + } + this._inflateParseObject(message); + if (channel === Parse.applicationId + 'afterSave') { + this._onAfterSave(message); + } else if (channel === Parse.applicationId + 'afterDelete') { + this._onAfterDelete(message); + } else { + logger.error('Get message %s from unknown channel %j', message, channel); + } + }; + this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr)); + for (const field of ['afterSave', 'afterDelete', 'clearCache']) { + const channel = `${Parse.applicationId}${field}`; + this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr)); + } + } + + async connect() { + if (typeof this.subscriber.connect === 'function') { + if (this.subscriber.isOpen) { + return; } - })(); + return await Promise.resolve(this.subscriber.connect()); + } } // Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes. diff --git a/src/ParseServer.js b/src/ParseServer.js index 0b2c985d1e..da1296d5d2 100644 --- a/src/ParseServer.js +++ b/src/ParseServer.js @@ -77,7 +77,12 @@ class ParseServer { const allControllers = controllers.getControllers(options); - const { loggerController, databaseController, hooksController } = allControllers; + const { + loggerController, + databaseController, + hooksController, + liveQueryController, + } = allControllers; this.config = Config.put(Object.assign({}, options, allControllers)); logging.setLogger(loggerController); @@ -98,6 +103,7 @@ class ParseServer { ) { startupPromises.push(options.cacheAdapter.connect()); } + startupPromises.push(liveQueryController.connect()); await Promise.all(startupPromises); if (serverStartComplete) { serverStartComplete(); @@ -263,7 +269,7 @@ class ParseServer { * @param {Function} callback called when the server has started * @returns {ParseServer} the parse server instance */ - start(options: ParseServerOptions, callback: ?() => void) { + async start(options: ParseServerOptions, callback: ?() => void) { const app = express(); if (options.middleware) { let middleware; @@ -307,7 +313,7 @@ class ParseServer { this.server = server; if (options.startLiveQueryServer || options.liveQueryServerOptions) { - this.liveQueryServer = ParseServer.createLiveQueryServer( + this.liveQueryServer = await ParseServer.createLiveQueryServer( server, options.liveQueryServerOptions, options @@ -338,9 +344,9 @@ class ParseServer { * @param {Server} httpServer an optional http server to pass * @param {LiveQueryServerOptions} config options for the liveQueryServer * @param {ParseServerOptions} options options for the ParseServer - * @returns {ParseLiveQueryServer} the live query server instance + * @returns {Promise} the live query server instance */ - static createLiveQueryServer( + static async createLiveQueryServer( httpServer, config: LiveQueryServerOptions, options: ParseServerOptions @@ -350,7 +356,9 @@ class ParseServer { httpServer = require('http').createServer(app); httpServer.listen(config.port); } - return new ParseLiveQueryServer(httpServer, config, options); + const server = new ParseLiveQueryServer(httpServer, config, options); + await server.connect(); + return server; } static verifyServerUrl(callback) { From fbf9913639be94af19cb9cfbba8b596b52312032 Mon Sep 17 00:00:00 2001 From: dblythy Date: Thu, 24 Nov 2022 11:16:02 +1100 Subject: [PATCH 07/12] add async --- spec/ParseGraphQLServer.spec.js | 2 +- spec/ParseLiveQueryServer.spec.js | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spec/ParseGraphQLServer.spec.js b/spec/ParseGraphQLServer.spec.js index 6bdd3b10f5..9ac60f8ae6 100644 --- a/spec/ParseGraphQLServer.spec.js +++ b/spec/ParseGraphQLServer.spec.js @@ -432,7 +432,7 @@ describe('ParseGraphQLServer', () => { const expressApp = express(); httpServer = http.createServer(expressApp); expressApp.use('/parse', parseServer.app); - parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, { + parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, { port: 1338, }); parseGraphQLServer.applyGraphQL(expressApp); diff --git a/spec/ParseLiveQueryServer.spec.js b/spec/ParseLiveQueryServer.spec.js index 0d1a1e6387..dc7871be7c 100644 --- a/spec/ParseLiveQueryServer.spec.js +++ b/spec/ParseLiveQueryServer.spec.js @@ -94,24 +94,24 @@ describe('ParseLiveQueryServer', function () { expect(parseLiveQueryServer.subscriptions.size).toBe(0); }); - it('can be initialized from ParseServer', function () { + it('can be initialized from ParseServer', async () => { const httpServer = {}; - const parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {}); + const parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {}); expect(parseLiveQueryServer.clientId).toBeUndefined(); expect(parseLiveQueryServer.clients.size).toBe(0); expect(parseLiveQueryServer.subscriptions.size).toBe(0); }); - it('can be initialized from ParseServer without httpServer', function (done) { - const parseLiveQueryServer = ParseServer.createLiveQueryServer(undefined, { + it('can be initialized from ParseServer without httpServer', async () => { + const parseLiveQueryServer = await ParseServer.createLiveQueryServer(undefined, { port: 22345, }); expect(parseLiveQueryServer.clientId).toBeUndefined(); expect(parseLiveQueryServer.clients.size).toBe(0); expect(parseLiveQueryServer.subscriptions.size).toBe(0); - parseLiveQueryServer.server.close(done); + await new Promise(resolve => parseLiveQueryServer.server.close(resolve)); }); describe_only_db('mongo')('initialization', () => { From a243440d8113f4dc51e64b4e2db59604013d8ff2 Mon Sep 17 00:00:00 2001 From: dblythy Date: Thu, 24 Nov 2022 11:28:15 +1100 Subject: [PATCH 08/12] Update ParseLiveQueryServer.spec.js --- spec/ParseLiveQueryServer.spec.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/ParseLiveQueryServer.spec.js b/spec/ParseLiveQueryServer.spec.js index dc7871be7c..7ec1b30941 100644 --- a/spec/ParseLiveQueryServer.spec.js +++ b/spec/ParseLiveQueryServer.spec.js @@ -115,8 +115,8 @@ describe('ParseLiveQueryServer', function () { }); describe_only_db('mongo')('initialization', () => { - it('can be initialized through ParseServer without liveQueryServerOptions', function (done) { - const parseServer = ParseServer.start({ + it('can be initialized through ParseServer without liveQueryServerOptions', async function (done) { + const parseServer = await ParseServer.start({ appId: 'hello', masterKey: 'world', port: 22345, @@ -137,8 +137,8 @@ describe('ParseLiveQueryServer', function () { }); }); - it('can be initialized through ParseServer with liveQueryServerOptions', function (done) { - const parseServer = ParseServer.start({ + it('can be initialized through ParseServer with liveQueryServerOptions', async function (done) { + const parseServer = await ParseServer.start({ appId: 'hello', masterKey: 'world', port: 22346, From 454ef0e41c4be1783c7f3e5ccd044e7df830f9e5 Mon Sep 17 00:00:00 2001 From: dblythy Date: Thu, 24 Nov 2022 12:06:17 +1100 Subject: [PATCH 09/12] Update ParseLiveQueryRedis.spec.js --- spec/ParseLiveQueryRedis.spec.js | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/spec/ParseLiveQueryRedis.spec.js b/spec/ParseLiveQueryRedis.spec.js index 3fe7f00e60..0a089b4255 100644 --- a/spec/ParseLiveQueryRedis.spec.js +++ b/spec/ParseLiveQueryRedis.spec.js @@ -33,5 +33,24 @@ if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') { object.destroy(), ]); }); + + it('can call connect twice', async () => { + const server = await reconfigureServer({ + startLiveQueryServer: true, + liveQuery: { + classNames: ['TestObject'], + redisURL: 'redis://localhost:6379', + }, + liveQueryServerOptions: { + redisURL: 'redis://localhost:6379', + }, + }); + expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue(); + await server.config.liveQueryController.connect(); + expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue(); + expect(server.liveQueryServer.subscriber.isOpen).toBe(true); + await server.liveQueryServer.connect(); + expect(server.liveQueryServer.subscriber.isOpen).toBe(true); + }); }); } From 7e3011251beafd0c519da6e57138eda4d566e6b4 Mon Sep 17 00:00:00 2001 From: dblythy Date: Thu, 24 Nov 2022 12:37:14 +1100 Subject: [PATCH 10/12] Update ParseLiveQueryServer.js --- src/LiveQuery/ParseLiveQueryServer.js | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/LiveQuery/ParseLiveQueryServer.js b/src/LiveQuery/ParseLiveQueryServer.js index 509c8cfa73..814bed8108 100644 --- a/src/LiveQuery/ParseLiveQueryServer.js +++ b/src/LiveQuery/ParseLiveQueryServer.js @@ -74,6 +74,18 @@ class ParseLiveQueryServer { config ); this.subscriber = ParsePubSub.createSubscriber(config); + if (!this.subscriber.connect) { + this.connect(); + } + } + + async connect() { + if (typeof this.subscriber.connect === 'function') { + if (this.subscriber.isOpen) { + return; + } + await Promise.resolve(this.subscriber.connect()); + } const messageRecieved = (channel, messageStr) => { logger.verbose('Subscribe message %j', messageStr); let message; @@ -103,15 +115,6 @@ class ParseLiveQueryServer { } } - async connect() { - if (typeof this.subscriber.connect === 'function') { - if (this.subscriber.isOpen) { - return; - } - return await Promise.resolve(this.subscriber.connect()); - } - } - // Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes. // Message.originalParseObject is the original ParseObject JSON. _inflateParseObject(message: any): void { From 9e3a4423eeacbb078653c07811f38f9add5cf71b Mon Sep 17 00:00:00 2001 From: dblythy Date: Thu, 24 Nov 2022 12:48:40 +1100 Subject: [PATCH 11/12] Update ParseLiveQueryServer.js --- src/LiveQuery/ParseLiveQueryServer.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/LiveQuery/ParseLiveQueryServer.js b/src/LiveQuery/ParseLiveQueryServer.js index 814bed8108..78cdf55e07 100644 --- a/src/LiveQuery/ParseLiveQueryServer.js +++ b/src/LiveQuery/ParseLiveQueryServer.js @@ -80,12 +80,17 @@ class ParseLiveQueryServer { } async connect() { + if (this.subscriber.isOpen) { + return; + } if (typeof this.subscriber.connect === 'function') { - if (this.subscriber.isOpen) { - return; - } await Promise.resolve(this.subscriber.connect()); + } else { + this.subscriber.isOpen = true; } + this._createSubscribers(); + } + _createSubscribers() { const messageRecieved = (channel, messageStr) => { logger.verbose('Subscribe message %j', messageStr); let message; From fbb7a11fff1ee3b713945175d1f344182f46901e Mon Sep 17 00:00:00 2001 From: dblythy Date: Sat, 26 Nov 2022 13:33:01 +1100 Subject: [PATCH 12/12] Update ParseLiveQueryRedis.spec.js --- spec/ParseLiveQueryRedis.spec.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/ParseLiveQueryRedis.spec.js b/spec/ParseLiveQueryRedis.spec.js index 0a089b4255..3187d23cc8 100644 --- a/spec/ParseLiveQueryRedis.spec.js +++ b/spec/ParseLiveQueryRedis.spec.js @@ -16,13 +16,13 @@ if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') { }, }); const subscription = await new Parse.Query('TestObject').subscribe(); - const [, object] = await Promise.all([ + const [object] = await Promise.all([ + new Parse.Object('TestObject').save(), new Promise(resolve => subscription.on('create', () => { resolve(); }) ), - new Parse.Object('TestObject').save(), ]); await Promise.all([ new Promise(resolve =>