@@ -8,8 +8,10 @@ import { SubscriptionServer } from 'subscriptions-transport-ws';
88import { handleParseErrors , handleParseHeaders } from '../middlewares' ;
99import requiredParameter from '../requiredParameter' ;
1010import defaultLogger from '../logger' ;
11+ import { ParseLiveQueryServer } from '../LiveQuery/ParseLiveQueryServer' ;
1112import { ParseGraphQLSchema } from './ParseGraphQLSchema' ;
1213import ParseGraphQLController , { ParseGraphQLConfig } from '../Controllers/ParseGraphQLController' ;
14+ import { WSSAdapter } from '../Adapters/WebSocketServer/WSSAdapter' ;
1315
1416class ParseGraphQLServer {
1517 parseGraphQLController : ParseGraphQLController ;
@@ -29,6 +31,8 @@ class ParseGraphQLServer {
2931 log : this . log ,
3032 graphQLCustomTypeDefs : this . config . graphQLCustomTypeDefs ,
3133 appId : this . parseServer . config . appId ,
34+ liveQueryClassNames :
35+ this . parseServer . config . liveQuery && this . parseServer . config . liveQuery . classNames ,
3236 } ) ;
3337 }
3438
@@ -114,12 +118,132 @@ class ParseGraphQLServer {
114118 }
115119
116120 createSubscriptions ( server ) {
121+ const wssAdapter = new WSSAdapter ( ) ;
122+
123+ new ParseLiveQueryServer (
124+ undefined ,
125+ {
126+ ...this . parseServer . config . liveQueryServerOptions ,
127+ wssAdapter,
128+ } ,
129+ this . parseServer . config
130+ ) ;
131+
117132 SubscriptionServer . create (
118133 {
119134 execute,
120135 subscribe,
121- onOperation : async ( _message , params , webSocket ) =>
122- Object . assign ( { } , params , await this . _getGraphQLOptions ( webSocket . upgradeReq ) ) ,
136+ onConnect : async connectionParams => {
137+ const keyPairs = {
138+ applicationId : connectionParams [ 'X-Parse-Application-Id' ] ,
139+ sessionToken : connectionParams [ 'X-Parse-Session-Token' ] ,
140+ masterKey : connectionParams [ 'X-Parse-Master-Key' ] ,
141+ installationId : connectionParams [ 'X-Parse-Installation-Id' ] ,
142+ clientKey : connectionParams [ 'X-Parse-Client-Key' ] ,
143+ javascriptKey : connectionParams [ 'X-Parse-Javascript-Key' ] ,
144+ windowsKey : connectionParams [ 'X-Parse-Windows-Key' ] ,
145+ restAPIKey : connectionParams [ 'X-Parse-REST-API-Key' ] ,
146+ } ;
147+
148+ const listeners = [ ] ;
149+
150+ let connectResolve , connectReject ;
151+ let connectIsResolved = false ;
152+ const connectPromise = new Promise ( ( resolve , reject ) => {
153+ connectResolve = resolve ;
154+ connectReject = reject ;
155+ } ) ;
156+
157+ const liveQuery = {
158+ OPEN : 'OPEN' ,
159+ readyState : 'OPEN' ,
160+ on : ( ) => { } ,
161+ ping : ( ) => { } ,
162+ onmessage : ( ) => { } ,
163+ onclose : ( ) => { } ,
164+ send : message => {
165+ message = JSON . parse ( message ) ;
166+ if ( message . op === 'connected' ) {
167+ connectResolve ( ) ;
168+ connectIsResolved = true ;
169+ return ;
170+ } else if ( message . op === 'error' && ! connectIsResolved ) {
171+ connectReject ( {
172+ code : message . code ,
173+ message : message . error ,
174+ } ) ;
175+ return ;
176+ }
177+ const requestId = message && message . requestId ;
178+ if (
179+ requestId &&
180+ typeof requestId === 'number' &&
181+ requestId > 0 &&
182+ requestId <= listeners . length
183+ ) {
184+ const listener = listeners [ requestId - 1 ] ;
185+ if ( listener ) {
186+ listener ( message ) ;
187+ }
188+ }
189+ } ,
190+ subscribe : async ( query , sessionToken , listener ) => {
191+ await connectPromise ;
192+ listeners . push ( listener ) ;
193+ liveQuery . onmessage (
194+ JSON . stringify ( {
195+ op : 'subscribe' ,
196+ requestId : listeners . length ,
197+ query,
198+ sessionToken,
199+ } )
200+ ) ;
201+ } ,
202+ unsubscribe : async listener => {
203+ await connectPromise ;
204+ const index = listeners . indexOf ( listener ) ;
205+ if ( index > 0 ) {
206+ liveQuery . onmessage (
207+ JSON . stringify ( {
208+ op : 'unsubscribe' ,
209+ requestId : index + 1 ,
210+ } )
211+ ) ;
212+ listeners [ index ] = null ;
213+ }
214+ } ,
215+ } ;
216+
217+ wssAdapter . onConnection ( liveQuery ) ;
218+
219+ liveQuery . onmessage (
220+ JSON . stringify ( {
221+ op : 'connect' ,
222+ ...keyPairs ,
223+ } )
224+ ) ;
225+
226+ await connectPromise ;
227+
228+ return { liveQuery, keyPairs } ;
229+ } ,
230+ onDisconnect : ( _webSocket , context ) => {
231+ const { liveQuery } = context ;
232+
233+ if ( liveQuery ) {
234+ liveQuery . onclose ( ) ;
235+ }
236+ } ,
237+ onOperation : async ( _message , params ) => {
238+ return {
239+ ...params ,
240+ schema : await this . parseGraphQLSchema . load ( ) ,
241+ formatError : error => {
242+ // Allow to console.log here to debug
243+ return error ;
244+ } ,
245+ } ;
246+ } ,
123247 } ,
124248 {
125249 server,
0 commit comments