Skip to content

Commit bc7dbae

Browse files
committed
feat(loadBalancer)!: add support for load balancers
1 parent 6cd982f commit bc7dbae

File tree

216 files changed

+7844
-246
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

216 files changed

+7844
-246
lines changed

.evergreen/config.yml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,40 @@ functions:
131131
MONGODB_API_VERSION="${MONGODB_API_VERSION}" \
132132
NODE_VERSION=${NODE_VERSION} SKIP_DEPS=${SKIP_DEPS|1} NO_EXIT=${NO_EXIT|1} \
133133
bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh
134+
start-load-balancer:
135+
- command: shell.exec
136+
params:
137+
script: |
138+
DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh start
139+
- command: expansions.update
140+
params:
141+
file: lb-expansion.yml
142+
stop-load-balancer:
143+
- command: shell.exec
144+
params:
145+
script: |
146+
DRIVERS_TOOLS=${DRIVERS_TOOLS} bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh stop
147+
run-lb-tests:
148+
- command: shell.exec
149+
type: test
150+
params:
151+
working_dir: src
152+
timeout_secs: 60
153+
script: |
154+
${PREPARE_SHELL}
155+
156+
MONGODB_URI="${MONGODB_URI}" \
157+
AUTH=${AUTH} \
158+
SSL=${SSL} \
159+
UNIFIED=${UNIFIED} \
160+
MONGODB_API_VERSION="${MONGODB_API_VERSION}" \
161+
NODE_VERSION=${NODE_VERSION} \
162+
SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}" \
163+
MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}" \
164+
TOPOLOGY="${TOPOLOGY}" \
165+
SKIP_DEPS=${SKIP_DEPS|1} \
166+
NO_EXIT=${NO_EXIT|1} \
167+
bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh
134168
run checks:
135169
- command: shell.exec
136170
type: test

.evergreen/run-load-balancer.sh

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#!/bin/bash
2+
3+
set -o errexit # Exit the script with error if any of the commands fail
4+
5+
start() {
6+
echo "Starting HAProxy..."
7+
8+
cat <<EOF_HAPROXY_CONFIG >> $DRIVERS_TOOLS/haproxy.conf
9+
defaults
10+
mode tcp
11+
timeout connect 10s
12+
timeout client 30m
13+
timeout server 30m
14+
15+
frontend mongos_frontend
16+
bind *:8000
17+
use_backend mongos_backend
18+
19+
frontend mongoses_frontend
20+
bind *:8001
21+
use_backend mongoses_backend
22+
23+
backend mongos_backend
24+
mode tcp
25+
server mongos 127.0.0.1:27017 check
26+
27+
backend mongoses_backend
28+
mode tcp
29+
server mongos_one 127.0.0.1:27017 check
30+
server mongos_two 127.0.0.1:27018 check
31+
EOF_HAPROXY_CONFIG
32+
33+
PREFIX=$(echo $MONGODB_URI | grep -Eo "(.*?)@" | cat)
34+
SUFFIX=$(echo $MONGODB_URI | grep -Eo "\?(.*)" | cat)
35+
36+
if [[ $PREFIX = "" ]]
37+
then
38+
# No auth then just set the URI
39+
SINGLE_MONGOS_LB_URI="mongodb://127.0.0.1:8000/"
40+
MULTI_MONGOS_LB_URI="mongodb://127.0.0.1:8001/"
41+
else
42+
# We have auth so append the lb host:port
43+
SINGLE_MONGOS_LB_URI="${PREFIX}127.0.0.1:8000/"
44+
MULTI_MONGOS_LB_URI="${PREFIX}127.0.0.1:8001/"
45+
fi
46+
47+
if [[ $SUFFIX = "" ]]
48+
then
49+
# If there are no query params then add only the load balanced option.
50+
SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}?loadBalanced=true"
51+
MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}?loadBalanced=true"
52+
else
53+
# If there are query params then append the load balanced option to them.
54+
SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}${SUFFIX}&loadBalanced=true"
55+
MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}${SUFFIX}&loadBalanced=true"
56+
fi
57+
58+
echo "Single Mongos LB: $SINGLE_MONGOS_LB_URI"
59+
echo "Multiple Mongos LB: $MULTI_MONGOS_LB_URI"
60+
61+
/usr/sbin/haproxy -D -f $DRIVERS_TOOLS/haproxy.conf -p $DRIVERS_TOOLS/haproxy.pid
62+
63+
echo 'SINGLE_MONGOS_LB_URI: "'$SINGLE_MONGOS_LB_URI'"' > lb-expansion.yml
64+
echo 'MULTI_MONGOS_LB_URI: "'$MULTI_MONGOS_LB_URI'"' >> lb-expansion.yml
65+
}
66+
67+
stop() {
68+
echo "Stopping HAProxy..."
69+
kill -USR1 $(cat $DRIVERS_TOOLS/haproxy.pid)
70+
rm $DRIVERS_TOOLS/haproxy.conf $DRIVERS_TOOLS/haproxy.pid
71+
}
72+
73+
case "$1" in
74+
start)
75+
start
76+
;;
77+
stop)
78+
stop
79+
;;
80+
*)
81+
echo "Usage: load-balancer.sh (start|stop)"
82+
exit 1
83+
esac

src/cmap/connect.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
MIN_SUPPORTED_WIRE_VERSION,
1212
MIN_SUPPORTED_SERVER_VERSION
1313
} from './wire_protocol/constants';
14-
import type { Document } from '../bson';
14+
import type { Document, ObjectId } from '../bson';
1515

1616
import type { Socket, SocketConnectOpts } from 'net';
1717
import type { TLSSocket, ConnectionOptions as TLSConnectionOpts } from 'tls';
@@ -149,6 +149,7 @@ export interface HandshakeDocument extends Document {
149149
client: ClientMetadata;
150150
compression: string[];
151151
saslSupportedMechs?: string;
152+
loadBalanced?: boolean;
152153
}
153154

154155
function prepareHandshakeDocument(authContext: AuthContext, callback: Callback<HandshakeDocument>) {
@@ -158,7 +159,8 @@ function prepareHandshakeDocument(authContext: AuthContext, callback: Callback<H
158159
const handshakeDoc: HandshakeDocument = {
159160
ismaster: true,
160161
client: options.metadata || makeClientMetadata(options),
161-
compression: compressors
162+
compression: compressors,
163+
loadBalanced: options.loadBalanced
162164
};
163165

164166
const credentials = authContext.credentials;
@@ -295,7 +297,7 @@ function makeConnection(options: ConnectionOptions, _callback: CallbackWithType<
295297
socket.setNoDelay(noDelay);
296298

297299
const connectEvent = useTLS ? 'secureConnect' : 'connect';
298-
let cancellationHandler: (err: Error) => void;
300+
let cancellationHandler: (err: Error, serviceId?: ObjectId) => void;
299301
function errorHandler(eventName: ErrorHandlerEventName) {
300302
return (err: Error) => {
301303
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));

src/cmap/connection.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import {
3434
OpQueryOptions,
3535
Msg
3636
} from './commands';
37-
import { BSONSerializeOptions, Document, Long, pluckBSONSerializeOptions } from '../bson';
37+
import { BSONSerializeOptions, Document, Long, pluckBSONSerializeOptions, ObjectId } from '../bson';
3838
import type { AutoEncrypter } from '../deps';
3939
import type { MongoCredentials } from './auth/mongo_credentials';
4040
import type { Stream } from './connect';
@@ -220,6 +220,10 @@ export class Connection extends EventEmitter {
220220
this[kIsMaster] = response;
221221
}
222222

223+
get serviceIdd(): ObjectId {
224+
return this.ismaster.serviceId;
225+
}
226+
223227
get generation(): number {
224228
return this[kGeneration] || 0;
225229
}
@@ -632,8 +636,10 @@ export class CryptoConnection extends Connection {
632636
}
633637
}
634638

635-
function hasSessionSupport(conn: Connection) {
636-
return conn.description.logicalSessionTimeoutMinutes != null;
639+
/** @public */
640+
export function hasSessionSupport(conn: Connection): boolean {
641+
const description = conn.description;
642+
return description.logicalSessionTimeoutMinutes != null || !!description.loadBalanced;
637643
}
638644

639645
function supportsOpMsg(conn: Connection) {

src/cmap/connection_pool.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
ConnectionCheckedInEvent,
1919
ConnectionPoolClearedEvent
2020
} from './connection_pool_events';
21+
import type { ObjectId } from '../bson';
2122

2223
const kLogger = Symbol('logger');
2324
const kConnections = Symbol('connections');
@@ -270,6 +271,17 @@ export class ConnectionPool extends EventEmitter {
270271
this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this));
271272
}
272273

274+
/**
275+
* Close all connections in the pool for the provided serviceId.
276+
*/
277+
closeConnections(serviceId: ObjectId): void {
278+
// cancel in flight matching connections.
279+
this[kCancellationToken].emit('cancel', serviceId);
280+
281+
// destroy each matching connection
282+
// this.destroyConnections('serverError', {}, serviceId, callback);
283+
}
284+
273285
/** Close the pool */
274286
close(callback: Callback<void>): void;
275287
close(options: CloseOptions, callback: Callback<void>): void;
@@ -314,19 +326,25 @@ export class ConnectionPool extends EventEmitter {
314326

315327
// mark the pool as closed immediately
316328
this.closed = true;
317-
318329
eachAsync<Connection>(
319330
this[kConnections].toArray(),
320331
(conn, cb) => {
332+
// Destroy the connection in the case of closing the entire pool
333+
// or if the connection matches the server id.
334+
//if (!serviceId || serviceId === conn.serviceId) {
321335
this.emit(
322336
ConnectionPool.CONNECTION_CLOSED,
323337
new ConnectionClosedEvent(this, conn, 'poolClosed')
324338
);
325339
conn.destroy(options, cb);
340+
//}
326341
},
327342
err => {
343+
// Dont close the entire pool for error on single server.
344+
//if (!serviceId) {
328345
this[kConnections].clear();
329346
this.emit(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
347+
//}
330348
callback(err);
331349
}
332350
);
@@ -382,6 +400,7 @@ function connectionIsIdle(pool: ConnectionPool, connection: Connection) {
382400
return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS);
383401
}
384402

403+
// TODO: Durran: In LB mode set the server id on the connection.
385404
function createConnection(pool: ConnectionPool, callback?: Callback<Connection>) {
386405
const connectOptions: ConnectionOptions = {
387406
...pool.options,

src/cmap/stream_description.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ const RESPONSE_FIELDS = [
1515
/** @public */
1616
export interface StreamDescriptionOptions {
1717
compressors?: CompressorName[];
18+
logicalSessionTimeoutMinutes?: number;
19+
loadBalanced?: boolean;
1820
}
1921

2022
/** @public */
@@ -29,6 +31,7 @@ export class StreamDescription {
2931
compressors: CompressorName[];
3032
compressor?: CompressorName;
3133
logicalSessionTimeoutMinutes?: number;
34+
loadBalanced?: boolean;
3235

3336
__nodejs_mock_server__?: boolean;
3437

@@ -42,6 +45,8 @@ export class StreamDescription {
4245
this.maxBsonObjectSize = 16777216;
4346
this.maxMessageSizeBytes = 48000000;
4447
this.maxWriteBatchSize = 100000;
48+
this.logicalSessionTimeoutMinutes = options ? options.logicalSessionTimeoutMinutes : undefined;
49+
this.loadBalanced = options ? !!options.loadBalanced : false;
4550
this.compressors =
4651
options && options.compressors && Array.isArray(options.compressors)
4752
? options.compressors

0 commit comments

Comments
 (0)