Skip to content

Commit 1d4f71b

Browse files
committed
feat(loadBalancer)!: add support for load balancers
1 parent 1615be0 commit 1d4f71b

29 files changed

+690
-20
lines changed

src/cmap/connect.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ export interface HandshakeDocument extends Document {
149149
client: ClientMetadata;
150150
compression: string[];
151151
saslSupportedMechs?: string;
152+
loadBalanced?: boolean;
152153
}
153154

154155
function prepareHandshakeDocument(authContext: AuthContext, callback: Callback<HandshakeDocument>) {
@@ -158,7 +159,8 @@ function prepareHandshakeDocument(authContext: AuthContext, callback: Callback<H
158159
const handshakeDoc: HandshakeDocument = {
159160
ismaster: true,
160161
client: options.metadata || makeClientMetadata(options),
161-
compression: compressors
162+
compression: compressors,
163+
loadBalanced: options.loadBalanced
162164
};
163165

164166
const credentials = authContext.credentials;

src/cmap/connection.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import {
3030
OpQueryOptions,
3131
Msg
3232
} from './commands';
33-
import { BSONSerializeOptions, Document, Long, pluckBSONSerializeOptions } from '../bson';
33+
import { BSONSerializeOptions, Document, Long, pluckBSONSerializeOptions, ObjectId } from '../bson';
3434
import type { AutoEncrypter } from '../deps';
3535
import type { MongoCredentials } from './auth/mongo_credentials';
3636
import type { Stream } from './connect';
@@ -233,6 +233,10 @@ export class Connection extends EventEmitter {
233233
return this[kIsMaster];
234234
}
235235

236+
get serverId(): ObjectId {
237+
return ismaster().serverId;
238+
}
239+
236240
// the `connect` method stores the result of the handshake ismaster on the connection
237241
set ismaster(response: Document) {
238242
this[kDescription].receiveResponse(response);
@@ -617,8 +621,10 @@ export class CryptoConnection extends Connection {
617621
}
618622
}
619623

620-
function hasSessionSupport(conn: Connection) {
621-
return conn.description.logicalSessionTimeoutMinutes != null;
624+
/** @public */
625+
export function hasSessionSupport(conn: Connection): boolean {
626+
const description = conn.description;
627+
return description.logicalSessionTimeoutMinutes != null || !!description.loadBalanced;
622628
}
623629

624630
function supportsOpMsg(conn: Connection) {

src/cmap/connection_pool.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import {
1818
ConnectionCheckedInEvent,
1919
ConnectionPoolClearedEvent
2020
} from './events';
21-
import type { Document } from '../bson';
21+
import type { Document, ObjectId } from '../bson';
2222

2323
const kLogger = Symbol('logger');
2424
const kConnections = Symbol('connections');
@@ -271,6 +271,17 @@ export class ConnectionPool extends EventEmitter {
271271
this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this));
272272
}
273273

274+
/**
275+
* Close all connections in the pool for the provided serverId.
276+
*/
277+
closeConnnections(serverId: ObjectId): void {
278+
const indexes = this[kConnections].toArray().forEach((connection) => {
279+
if (connection.serverId === serverId) {
280+
destroyConnection(this, connection, 'serverError')
281+
}
282+
});
283+
}
284+
274285
/** Close the pool */
275286
close(callback: Callback<void>): void;
276287
close(options: CloseOptions, callback: Callback<void>): void;
@@ -410,6 +421,7 @@ function connectionIsIdle(pool: ConnectionPool, connection: Connection) {
410421
return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS);
411422
}
412423

424+
// TODO: Durran: In LB mode set the server id on the connection.
413425
function createConnection(pool: ConnectionPool, callback?: Callback<Connection>) {
414426
const connectOptions: ConnectionOptions = {
415427
...pool.options,

src/cmap/stream_description.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ const RESPONSE_FIELDS = [
1515
/** @public */
1616
export interface StreamDescriptionOptions {
1717
compressors?: CompressorName[];
18+
logicalSessionTimeoutMinutes?: number;
19+
loadBalanced?: boolean;
1820
}
1921

2022
/** @public */
@@ -29,6 +31,7 @@ export class StreamDescription {
2931
compressors: CompressorName[];
3032
compressor?: CompressorName;
3133
logicalSessionTimeoutMinutes?: number;
34+
loadBalanced?: boolean;
3235

3336
__nodejs_mock_server__?: boolean;
3437

@@ -42,6 +45,8 @@ export class StreamDescription {
4245
this.maxBsonObjectSize = 16777216;
4346
this.maxMessageSizeBytes = 48000000;
4447
this.maxWriteBatchSize = 100000;
48+
this.logicalSessionTimeoutMinutes = options ? options.logicalSessionTimeoutMinutes : undefined;
49+
this.loadBalanced = options ? !!options.loadBalanced : false;
4550
this.compressors =
4651
options && options.compressors && Array.isArray(options.compressors)
4752
? options.compressors

src/connection_string.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import { Logger, LoggerLevel } from './logger';
2929
import { PromiseProvider } from './promise_provider';
3030
import { createAutoEncrypter } from './operations/connect';
3131

32+
const VALID_TXT_RECORDS = ['authSource', 'replicaSet', 'loadBalanced'];
33+
3234
/**
3335
* Determines whether a provided address matches the provided parent domain in order
3436
* to avoid certain attack vectors.
@@ -94,9 +96,9 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback<HostA
9496

9597
const txtRecordOptions = new URLSearchParams(record[0].join(''));
9698
const txtRecordOptionKeys = [...txtRecordOptions.keys()];
97-
if (txtRecordOptionKeys.some(key => key !== 'authSource' && key !== 'replicaSet')) {
99+
if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) {
98100
return callback(
99-
new MongoParseError('Text record must only set `authSource` or `replicaSet`')
101+
new MongoParseError(`Text record must only set one of: ${VALID_TXT_RECORDS.join(', ')}`)
100102
);
101103
}
102104

@@ -116,6 +118,8 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback<HostA
116118
}
117119
}
118120

121+
validateLoadBalancedOptions(hostAddresses, options);
122+
119123
callback(undefined, hostAddresses);
120124
});
121125
});
@@ -426,6 +430,8 @@ export function parseOptions(
426430
throw new MongoParseError('directConnection not supported with SRV URI');
427431
}
428432

433+
validateLoadBalancedOptions(hosts, mongoOptions);
434+
429435
// Potential SRV Overrides
430436
mongoOptions.userSpecifiedAuthSource =
431437
objectOptions.has('authSource') || urlOptions.has('authSource');
@@ -435,6 +441,20 @@ export function parseOptions(
435441
return mongoOptions;
436442
}
437443

444+
function validateLoadBalancedOptions(hosts: HostAddress[] | string[], mongoOptions: MongoOptions) {
445+
if (mongoOptions.loadBalanced) {
446+
if (hosts.length > 1) {
447+
throw new MongoParseError('loadBalanced option only supported with a single host in the URI');
448+
}
449+
if (mongoOptions.replicaSet) {
450+
throw new MongoParseError('loadBalanced option not supported with a replicaSet option');
451+
}
452+
if (mongoOptions.directConnection) {
453+
throw new MongoParseError('loadBalanced option not supported when directConnection is true');
454+
}
455+
}
456+
}
457+
438458
function setOption(
439459
mongoOptions: any,
440460
key: string,
@@ -696,6 +716,10 @@ export const OPTIONS = {
696716
default: 120000,
697717
type: 'uint'
698718
},
719+
loadBalanced: {
720+
default: false,
721+
type: 'boolean'
722+
},
699723
localThresholdMS: {
700724
default: 15,
701725
type: 'uint'

src/mongo_client.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
170170
retryWrites?: boolean;
171171
/** Allow a driver to force a Single topology type with a connection string containing one host */
172172
directConnection?: boolean;
173+
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
174+
loadBalanced?: boolean;
173175

174176
/** The write concern w value */
175177
w?: W;
@@ -600,6 +602,7 @@ export interface MongoOptions
600602
| 'heartbeatFrequencyMS'
601603
| 'keepAlive'
602604
| 'keepAliveInitialDelay'
605+
| 'loadBalanced'
603606
| 'localThresholdMS'
604607
| 'logger'
605608
| 'maxIdleTimeMS'

src/sdam/common.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ export enum TopologyType {
1717
ReplicaSetNoPrimary = 'ReplicaSetNoPrimary',
1818
ReplicaSetWithPrimary = 'ReplicaSetWithPrimary',
1919
Sharded = 'Sharded',
20-
Unknown = 'Unknown'
20+
Unknown = 'Unknown',
21+
LoadBalanced = 'LoadBalanced'
2122
}
2223

2324
/**
@@ -33,7 +34,8 @@ export enum ServerType {
3334
RSArbiter = 'RSArbiter',
3435
RSOther = 'RSOther',
3536
RSGhost = 'RSGhost',
36-
Unknown = 'Unknown'
37+
Unknown = 'Unknown',
38+
LoadBalancer = 'LoadBalancer'
3739
}
3840

3941
/** @internal */

src/sdam/monitor.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export interface MonitorOptions
5353
connectTimeoutMS: number;
5454
heartbeatFrequencyMS: number;
5555
minHeartbeatFrequencyMS: number;
56+
loadBalanced?: boolean;
5657
}
5758

5859
/** @public */
@@ -61,7 +62,10 @@ export class Monitor extends EventEmitter {
6162
s: MonitorPrivate;
6263
address: string;
6364
options: Readonly<
64-
Pick<MonitorOptions, 'connectTimeoutMS' | 'heartbeatFrequencyMS' | 'minHeartbeatFrequencyMS'>
65+
Pick<
66+
MonitorOptions,
67+
'connectTimeoutMS' | 'heartbeatFrequencyMS' | 'minHeartbeatFrequencyMS' | 'loadBalanced'
68+
>
6569
>;
6670
connectOptions: ConnectionOptions;
6771
[kServer]: Server;
@@ -87,7 +91,8 @@ export class Monitor extends EventEmitter {
8791
this.options = Object.freeze({
8892
connectTimeoutMS: options.connectTimeoutMS ?? 10000,
8993
heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000,
90-
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500
94+
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500,
95+
loadBalanced: options.loadBalanced ?? false
9196
});
9297

9398
const cancellationToken = this[kCancellationToken];
@@ -211,14 +216,20 @@ function checkServer(monitor: Monitor, callback: Callback<Document>) {
211216
const connection = monitor[kConnection];
212217
if (connection && !connection.closed) {
213218
const connectTimeoutMS = monitor.options.connectTimeoutMS;
219+
const loadBalanced = monitor.options.loadBalanced;
214220
const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
215221
const topologyVersion = monitor[kServer].description.topologyVersion;
216222
const isAwaitable = topologyVersion != null;
217223

218224
const cmd =
219225
isAwaitable && topologyVersion
220-
? { ismaster: true, maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
221-
: { ismaster: true };
226+
? {
227+
ismaster: true,
228+
maxAwaitTimeMS,
229+
topologyVersion: makeTopologyVersion(topologyVersion),
230+
loadBalanced: loadBalanced
231+
}
232+
: { ismaster: true, loadBalanced: loadBalanced };
222233

223234
const options = isAwaitable
224235
? {

src/sdam/server.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ export class Server extends EventEmitter {
172172
this.emit(
173173
Server.DESCRIPTION_RECEIVED,
174174
new ServerDescription(this.description.hostAddress, event.reply, {
175-
roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration)
175+
roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration),
176+
loadBalanced: options.loadBalanced
176177
})
177178
);
178179

@@ -297,6 +298,7 @@ export class Server extends EventEmitter {
297298
return;
298299
}
299300

301+
// TODO: Durran: Alert the connection pool to the type of operation.
300302
this.s.pool.withConnection((err, conn, cb) => {
301303
if (err || !conn) {
302304
markServerUnknown(this, err);
@@ -322,6 +324,7 @@ export class Server extends EventEmitter {
322324
return;
323325
}
324326

327+
// TODO: Durran: Alert the connection pool to the type of operation.
325328
this.s.pool.withConnection((err, conn, cb) => {
326329
if (err || !conn) {
327330
markServerUnknown(this, err);
@@ -347,6 +350,7 @@ export class Server extends EventEmitter {
347350
return;
348351
}
349352

353+
// TODO: Durran: Alert the connection pool to the type of operation.
350354
this.s.pool.withConnection((err, conn, cb) => {
351355
if (err || !conn) {
352356
markServerUnknown(this, err);
@@ -380,6 +384,7 @@ export class Server extends EventEmitter {
380384
return;
381385
}
382386

387+
// TODO: Durran: Alert the connection pool to the type of operation.
383388
this.s.pool.withConnection((err, conn, cb) => {
384389
if (err || !conn) {
385390
markServerUnknown(this, err);

src/sdam/server_description.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@ import { arrayStrictEqual, errorStrictEqual, now, HostAddress } from '../utils';
22
import { ServerType } from './common';
33
import type { ObjectId, Long, Document } from '../bson';
44
import type { ClusterTime } from './common';
5+
import { MongoError } from '../error';
56

67
const WRITABLE_SERVER_TYPES = new Set([
78
ServerType.RSPrimary,
89
ServerType.Standalone,
9-
ServerType.Mongos
10+
ServerType.Mongos,
11+
ServerType.LoadBalancer
1012
]);
1113

1214
const DATA_BEARING_SERVER_TYPES = new Set([
1315
ServerType.RSPrimary,
1416
ServerType.RSSecondary,
1517
ServerType.Mongos,
16-
ServerType.Standalone
18+
ServerType.Standalone,
19+
ServerType.LoadBalancer
1720
]);
1821

1922
/** @public */
@@ -35,6 +38,9 @@ export interface ServerDescriptionOptions {
3538

3639
/** The topologyVersion */
3740
topologyVersion?: TopologyVersion;
41+
42+
/** If the client is in load balancing mode. */
43+
loadBalanced?: boolean;
3844
}
3945

4046
/**
@@ -89,7 +95,7 @@ export class ServerDescription {
8995
this._hostAddress = address;
9096
this.address = this._hostAddress.toString();
9197
}
92-
this.type = parseServerType(ismaster);
98+
this.type = parseServerType(ismaster, options);
9399
this.hosts = ismaster?.hosts?.map((host: string) => host.toLowerCase()) ?? [];
94100
this.passives = ismaster?.passives?.map((host: string) => host.toLowerCase()) ?? [];
95101
this.arbiters = ismaster?.arbiters?.map((host: string) => host.toLowerCase()) ?? [];
@@ -205,11 +211,23 @@ export class ServerDescription {
205211
}
206212

207213
// Parses an `ismaster` message and determines the server type
208-
export function parseServerType(ismaster?: Document): ServerType {
214+
export function parseServerType(
215+
ismaster?: Document,
216+
options?: ServerDescriptionOptions
217+
): ServerType {
209218
if (!ismaster || !ismaster.ok) {
210219
return ServerType.Unknown;
211220
}
212221

222+
if (options && options.loadBalanced) {
223+
if (!ismaster.serverId) {
224+
throw new MongoError(
225+
'Driver attempted to initialize in load balancing mode, but the server does not support it.'
226+
);
227+
}
228+
return ServerType.LoadBalancer;
229+
}
230+
213231
if (ismaster.isreplicaset) {
214232
return ServerType.RSGhost;
215233
}

0 commit comments

Comments
 (0)