Skip to content

Commit ba73889

Browse files
committed
Merge branch 'housekeeping' of github.com:powersync-ja/powersync-service into housekeeping
2 parents f58cdb7 + abea24f commit ba73889

24 files changed

+540
-218
lines changed

.changeset/olive-games-destroy.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/service-sync-rules': minor
3+
'@powersync/service-image': minor
4+
---
5+
6+
Add the `fixed_json_extract` compatibility option. When enabled, JSON-extracting operators are updated to match SQLite more closely.

.changeset/rich-fans-care.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/service-rsocket-router': minor
3+
'@powersync/service-core': minor
4+
'@powersync/service-image': minor
5+
'@powersync/service-types': patch
6+
---
7+
8+
Enable permessage-deflate for websockets.

packages/rsocket-router/src/router/ReactiveSocketRouter.ts

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ import * as http from 'http';
88
import { Payload, RSocketServer } from 'rsocket-core';
99
import * as ws from 'ws';
1010
import { SocketRouterObserver } from './SocketRouterListener.js';
11+
import { WebsocketDuplexConnection } from './transport/WebsocketDuplexConnection.js';
1112
import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js';
1213
import {
1314
CommonParams,
1415
IReactiveStream,
1516
IReactiveStreamInput,
16-
RS_ENDPOINT_TYPE,
1717
ReactiveSocketRouterOptions,
18+
RS_ENDPOINT_TYPE,
1819
SocketResponder
1920
} from './types.js';
2021

@@ -24,6 +25,7 @@ export interface ReactiveStreamRequest {
2425
dataMimeType: string;
2526
initialN: number;
2627
responder: SocketResponder;
28+
connection: WebsocketDuplexConnection;
2729
}
2830

2931
export interface SocketBaseContext {
@@ -51,7 +53,30 @@ export class ReactiveSocketRouter<C extends SocketBaseContext> {
5153
* This follows a similar pattern to the Journey Micro
5254
* web sockets router.
5355
*/
54-
const wss = new ws.WebSocketServer({ noServer: true });
56+
const wss = new ws.WebSocketServer({
57+
noServer: true,
58+
perMessageDeflate: {
59+
zlibDeflateOptions: {
60+
chunkSize: 128 * 1024, // default is 16kb - increased for better efficiency
61+
memLevel: 7, // default is 8
62+
level: 3
63+
},
64+
zlibInflateOptions: {
65+
// for decompressing messages from the client
66+
chunkSize: 32 * 1024
67+
},
68+
// don't keep client context between messages
69+
clientNoContextTakeover: true,
70+
// keep context between messages from the server
71+
serverNoContextTakeover: false,
72+
// bigger window uses more memory and potentially more cpu. 10-15 is a good range.
73+
serverMaxWindowBits: 12,
74+
// Limit concurrent compression threads
75+
concurrencyLimit: 8,
76+
// Size (in bytes) below which messages should not be compressed _if context takeover is disabled_.
77+
threshold: 1024
78+
}
79+
});
5580
server.on('upgrade', (request, socket, head) => {
5681
wss.handleUpgrade(request, socket as any, head, (ws) => {
5782
wss.emit('connection', ws, request);
@@ -66,7 +91,9 @@ export class ReactiveSocketRouter<C extends SocketBaseContext> {
6691
const rSocketServer = new RSocketServer({
6792
transport,
6893
acceptor: {
69-
accept: async (payload) => {
94+
accept: async (payload, rsocket) => {
95+
const connection = (rsocket as any).connection as WebsocketDuplexConnection;
96+
7097
const { max_concurrent_connections } = this.options ?? {};
7198
logger.info(`Currently have ${wss.clients.size} active WebSocket connection(s)`);
7299
// wss.clients.size includes this connection, so we check for greater than
@@ -104,7 +131,7 @@ export class ReactiveSocketRouter<C extends SocketBaseContext> {
104131
// TODO: Consider limiting the number of active streams per connection to prevent abuse
105132
handleReactiveStream(
106133
context,
107-
{ payload, initialN, responder, dataMimeType, metadataMimeType },
134+
{ payload, initialN, responder, dataMimeType, metadataMimeType, connection },
108135
observer,
109136
abortController,
110137
params
@@ -191,6 +218,7 @@ export async function handleReactiveStream<Context extends SocketBaseContext>(
191218
context,
192219
observer,
193220
signal: abortController.signal,
221+
connection: request.connection,
194222
responder
195223
});
196224
if (!isAuthorized.authorized) {
@@ -207,6 +235,7 @@ export async function handleReactiveStream<Context extends SocketBaseContext>(
207235
observer,
208236
signal: abortController.signal,
209237
responder,
238+
connection: request.connection,
210239
initialN
211240
});
212241
} catch (ex) {

packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
import { logger } from '@powersync/lib-services-framework';
19+
import { Socket } from 'net';
1920
import {
2021
Closeable,
2122
Deferred,
@@ -33,6 +34,7 @@ import WebSocket from 'ws';
3334

3435
export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound {
3536
readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler;
37+
readonly tracker: WebSocketTracker;
3638

3739
constructor(
3840
private websocketDuplex: Duplex,
@@ -50,6 +52,7 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
5052
websocketDuplex.on('data', this.handleMessage);
5153

5254
this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this);
55+
this.tracker = new WebSocketTracker(this.rawSocket);
5356
}
5457

5558
get availability(): number {
@@ -97,7 +100,9 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
97100
};
98101

99102
private handleError = (e: WebSocket.ErrorEvent): void => {
100-
logger.error(`Error in WebSocket duplex connection: ${e}`);
103+
// Example:
104+
// Error: The socket was closed while data was being compressed
105+
logger.warn(`Error in WebSocket duplex connection: ${e}`);
101106
if (!this.done) {
102107
this.close(e.error);
103108
}
@@ -149,3 +154,33 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
149154
});
150155
}
151156
}
157+
158+
/**
159+
* Tracks encoding and bytes written on a websocket connection, catering for compressed data.
160+
*/
161+
export class WebSocketTracker {
162+
private lastBytesWritten: number;
163+
private socket: Socket;
164+
readonly encoding: 'permessage-deflate' | undefined;
165+
166+
constructor(ws: WebSocket) {
167+
this.socket = (ws as any)._socket;
168+
this.lastBytesWritten = this.socket.bytesWritten;
169+
170+
// Crude check, but this is the only extension that would actually be used
171+
if (ws.extensions.includes('permessage-deflate')) {
172+
this.encoding = 'permessage-deflate';
173+
} else {
174+
this.encoding = undefined;
175+
}
176+
}
177+
178+
/**
179+
* Consumes and returns the number of bytes sent.
180+
*/
181+
getBytesWritten(): number {
182+
const written = this.socket.bytesWritten - this.lastBytesWritten;
183+
this.lastBytesWritten = this.socket.bytesWritten;
184+
return written;
185+
}
186+
}

packages/rsocket-router/src/router/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { router } from '@powersync/lib-services-framework';
44
import { OnExtensionSubscriber, OnNextSubscriber, OnTerminalSubscriber } from 'rsocket-core';
55

66
import { SocketRouterObserver } from './SocketRouterListener.js';
7+
import { WebsocketDuplexConnection } from './transport/WebsocketDuplexConnection.js';
78

89
export enum RS_ENDPOINT_TYPE {
910
// Other methods are supported by RSocket, but are not yet mapped here
@@ -26,6 +27,10 @@ export type CommonStreamPayload = {
2627
observer: SocketRouterObserver;
2728
responder: SocketResponder;
2829
signal: AbortSignal;
30+
/**
31+
* The underlying websocket connection. Should not be used directly apart from tracking metadata.
32+
*/
33+
connection: WebsocketDuplexConnection;
2934
};
3035

3136
export type ReactiveStreamPayload<O> = CommonStreamPayload & {

packages/service-core/src/routes/endpoints/socket-route.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { APIMetric } from '@powersync/service-types';
1111
export const syncStreamReactive: SocketRouteGenerator = (router) =>
1212
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
1313
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
14-
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => {
14+
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal, connection }) => {
1515
const { service_context, logger } = context;
1616
const { routerEngine, metricsEngine, syncContext } = service_context;
1717

@@ -84,6 +84,10 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
8484

8585
metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1);
8686
const tracker = new sync.RequestTracker(metricsEngine);
87+
if (connection.tracker.encoding) {
88+
// Must be set before we start the stream
89+
tracker.setCompressed(connection.tracker.encoding);
90+
}
8791
try {
8892
for await (const data of sync.streamResponse({
8993
syncContext: syncContext,
@@ -153,6 +157,17 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
153157
responder.onComplete();
154158
removeStopHandler();
155159
disposer();
160+
if (connection.tracker.encoding) {
161+
// Technically, this may not be unique to this specific stream, since there could be multiple
162+
// rsocket streams on the same websocket connection. We don't have a way to track compressed bytes
163+
// on individual streams, and we generally expect 1 stream per connection, so this is a reasonable
164+
// approximation.
165+
// If there are multiple streams, bytes written would be split arbitrarily across them, but the
166+
// total should be correct.
167+
// For non-compressed cases, this is tracked by the stream itself.
168+
const socketBytes = connection.tracker.getBytesWritten();
169+
tracker.addCompressedDataSent(socketBytes);
170+
}
156171
logger.info(`Sync stream complete`, {
157172
...tracker.getLogMeta(),
158173
stream_ms: Date.now() - streamStart,

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ export class SqlBucketDescriptor implements BucketSource {
6565
if (this.bucketParameters == null) {
6666
throw new Error('Bucket parameters must be defined');
6767
}
68-
const dataRows = SqlDataQuery.fromSql(this.name, this.bucketParameters, sql, options);
68+
const dataRows = SqlDataQuery.fromSql(this.name, this.bucketParameters, sql, options, this.compatibility);
6969

7070
this.dataQueries.push(dataRows);
7171

packages/sync-rules/src/SqlDataQuery.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,26 @@ import { SqlRuleError } from './errors.js';
55
import { ExpressionType } from './ExpressionType.js';
66
import { SourceTableInterface } from './SourceTableInterface.js';
77
import { SqlTools } from './sql_filters.js';
8-
import { castAsText } from './sql_functions.js';
98
import { checkUnsupportedFeatures, isClauseError } from './sql_support.js';
109
import { SyncRulesOptions } from './SqlSyncRules.js';
1110
import { TablePattern } from './TablePattern.js';
1211
import { TableQuerySchema } from './TableQuerySchema.js';
1312
import { BucketIdTransformer, EvaluationResult, ParameterMatchClause, QuerySchema, SqliteRow } from './types.js';
1413
import { getBucketId, isSelectStatement } from './utils.js';
14+
import { CompatibilityContext } from './compatibility.js';
1515

1616
export interface SqlDataQueryOptions extends BaseSqlDataQueryOptions {
1717
filter: ParameterMatchClause;
1818
}
1919

2020
export class SqlDataQuery extends BaseSqlDataQuery {
21-
static fromSql(descriptorName: string, bucketParameters: string[], sql: string, options: SyncRulesOptions) {
21+
static fromSql(
22+
descriptorName: string,
23+
bucketParameters: string[],
24+
sql: string,
25+
options: SyncRulesOptions,
26+
compatibility: CompatibilityContext
27+
) {
2228
const parsed = parse(sql, { locationTracking: true });
2329
const schema = options.schema;
2430

@@ -67,6 +73,7 @@ export class SqlDataQuery extends BaseSqlDataQuery {
6773
table: alias,
6874
parameterTables: ['bucket'],
6975
valueTables: [alias],
76+
compatibilityContext: compatibility,
7077
sql,
7178
schema: querySchema
7279
});

packages/sync-rules/src/SqlParameterQuery.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ export class SqlParameterQuery {
123123
sql,
124124
supportsExpandingParameters: true,
125125
supportsParameterExpressions: true,
126+
compatibilityContext: options.compatibility,
126127
schema: querySchema
127128
});
128129
tools.checkSpecificNameCase(tableRef);

packages/sync-rules/src/StaticSqlParameterQuery.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export class StaticSqlParameterQuery {
4545
table: undefined,
4646
parameterTables: ['token_parameters', 'user_parameters'],
4747
supportsParameterExpressions: true,
48+
compatibilityContext: options.compatibility,
4849
sql
4950
});
5051
const where = q.where;

0 commit comments

Comments
 (0)