Skip to content

Commit 82b4a23

Browse files
authored
refactor(NODE-4632): async await in MongoClient, ClientSession, and AbstractCursor (#3428)
1 parent 9e3ba81 commit 82b4a23

28 files changed

+672
-896
lines changed

src/change_stream.ts

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import type { Readable } from 'stream';
2-
import { promisify } from 'util';
32

43
import type { Binary, Document, Timestamp } from './bson';
54
import { Collection } from './collection';
@@ -239,7 +238,7 @@ export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
239238
operationType: 'insert';
240239
/** This key will contain the document being inserted */
241240
fullDocument: TSchema;
242-
/** Namespace the insert event occured on */
241+
/** Namespace the insert event occurred on */
243242
ns: ChangeStreamNameSpace;
244243
}
245244

@@ -262,7 +261,7 @@ export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
262261
fullDocument?: TSchema;
263262
/** Contains a description of updated and removed fields in this operation */
264263
updateDescription: UpdateDescription<TSchema>;
265-
/** Namespace the update event occured on */
264+
/** Namespace the update event occurred on */
266265
ns: ChangeStreamNameSpace;
267266
/**
268267
* Contains the pre-image of the modified or deleted document if the
@@ -285,7 +284,7 @@ export interface ChangeStreamReplaceDocument<TSchema extends Document = Document
285284
operationType: 'replace';
286285
/** The fullDocument of a replace event represents the document after the insert of the replacement document */
287286
fullDocument: TSchema;
288-
/** Namespace the replace event occured on */
287+
/** Namespace the replace event occurred on */
289288
ns: ChangeStreamNameSpace;
290289
/**
291290
* Contains the pre-image of the modified or deleted document if the
@@ -307,7 +306,7 @@ export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
307306
ChangeStreamDocumentCollectionUUID {
308307
/** Describes the type of operation represented in this change notification */
309308
operationType: 'delete';
310-
/** Namespace the delete event occured on */
309+
/** Namespace the delete event occurred on */
311310
ns: ChangeStreamNameSpace;
312311
/**
313312
* Contains the pre-image of the modified or deleted document if the
@@ -328,7 +327,7 @@ export interface ChangeStreamDropDocument
328327
ChangeStreamDocumentCollectionUUID {
329328
/** Describes the type of operation represented in this change notification */
330329
operationType: 'drop';
331-
/** Namespace the drop event occured on */
330+
/** Namespace the drop event occurred on */
332331
ns: ChangeStreamNameSpace;
333332
}
334333

@@ -343,7 +342,7 @@ export interface ChangeStreamRenameDocument
343342
operationType: 'rename';
344343
/** The new name for the `ns.coll` collection */
345344
to: { db: string; coll: string };
346-
/** The "from" namespace that the rename occured on */
345+
/** The "from" namespace that the rename occurred on */
347346
ns: ChangeStreamNameSpace;
348347
}
349348

@@ -918,36 +917,31 @@ export class ChangeStream<
918917
}
919918
}
920919

921-
/**
922-
* @internal
923-
*
924-
* TODO(NODE-4320): promisify selectServer and refactor this code to be async
925-
*
926-
* we promisify _processErrorIteratorModeCallback until we have a promisifed version of selectServer.
927-
*/
928-
// eslint-disable-next-line @typescript-eslint/unbound-method
929-
private _processErrorIteratorMode = promisify(this._processErrorIteratorModeCallback);
930-
931920
/** @internal */
932-
private _processErrorIteratorModeCallback(changeStreamError: AnyError, callback: Callback) {
921+
private async _processErrorIteratorMode(changeStreamError: AnyError) {
933922
if (this[kClosed]) {
934923
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
935-
return callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
924+
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
936925
}
937926

938-
if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
939-
this.cursor.close().catch(() => null);
940-
941-
const topology = getTopology(this.parent);
942-
topology.selectServer(this.cursor.readPreference, {}, serverSelectionError => {
943-
// if the topology can't reconnect, close the stream
944-
if (serverSelectionError) return this.close(() => callback(changeStreamError));
927+
if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
928+
try {
929+
await this.close();
930+
} catch {
931+
// ignore errors from close
932+
}
933+
throw changeStreamError;
934+
}
945935

946-
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
947-
callback();
948-
});
949-
} else {
950-
this.close(() => callback(changeStreamError));
936+
await this.cursor.close().catch(() => null);
937+
const topology = getTopology(this.parent);
938+
try {
939+
await topology.selectServerAsync(this.cursor.readPreference, {});
940+
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
941+
} catch {
942+
// if the topology can't reconnect, close the stream
943+
await this.close();
944+
throw changeStreamError;
951945
}
952946
}
953947
}

src/collection.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,7 +1478,7 @@ export class Collection<TSchema extends Document = Document> {
14781478
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection.
14791479
*
14801480
* @remarks
1481-
* watch() accepts two generic arguments for distinct usecases:
1481+
* watch() accepts two generic arguments for distinct use cases:
14821482
* - The first is to override the schema that may be defined for this specific collection
14831483
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
14841484
* @example
@@ -1603,7 +1603,7 @@ export class Collection<TSchema extends Document = Document> {
16031603
*
16041604
* @throws MongoNotConnectedError
16051605
* @remarks
1606-
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implemenation.
1606+
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implementation.
16071607
* However, `collection.bulkWrite()` provides an equivalent API that does not require prior connecting.
16081608
*/
16091609
initializeUnorderedBulkOp(options?: BulkWriteOptions): UnorderedBulkOperation {
@@ -1615,7 +1615,7 @@ export class Collection<TSchema extends Document = Document> {
16151615
*
16161616
* @throws MongoNotConnectedError
16171617
* @remarks
1618-
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implemenation.
1618+
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implementation.
16191619
* However, `collection.bulkWrite()` provides an equivalent API that does not require prior connecting.
16201620
*/
16211621
initializeOrderedBulkOp(options?: BulkWriteOptions): OrderedBulkOperation {

src/connection_string.ts

Lines changed: 75 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import { ReadPreference, ReadPreferenceMode } from './read_preference';
3030
import type { TagSet } from './sdam/server_description';
3131
import {
3232
AnyOptions,
33-
Callback,
3433
DEFAULT_PK_FACTORY,
3534
emitWarning,
3635
emitWarningOnce,
@@ -70,97 +69,89 @@ function matchesParentDomain(srvAddress: string, parentDomain: string): boolean
7069
* @param uri - The connection string to parse
7170
* @param options - Optional user provided connection string options
7271
*/
73-
export function resolveSRVRecord(options: MongoOptions, callback: Callback<HostAddress[]>): void {
72+
export async function resolveSRVRecord(options: MongoOptions): Promise<HostAddress[]> {
7473
if (typeof options.srvHost !== 'string') {
75-
return callback(new MongoAPIError('Option "srvHost" must not be empty'));
74+
throw new MongoAPIError('Option "srvHost" must not be empty');
7675
}
7776

7877
if (options.srvHost.split('.').length < 3) {
7978
// TODO(NODE-3484): Replace with MongoConnectionStringError
80-
return callback(new MongoAPIError('URI must include hostname, domain name, and tld'));
79+
throw new MongoAPIError('URI must include hostname, domain name, and tld');
8180
}
8281

8382
// Resolve the SRV record and use the result as the list of hosts to connect to.
8483
const lookupAddress = options.srvHost;
85-
dns.resolveSrv(`_${options.srvServiceName}._tcp.${lookupAddress}`, (err, addresses) => {
86-
if (err) return callback(err);
84+
const addresses = await dns.promises.resolveSrv(
85+
`_${options.srvServiceName}._tcp.${lookupAddress}`
86+
);
8787

88-
if (addresses.length === 0) {
89-
return callback(new MongoAPIError('No addresses found at host'));
90-
}
88+
if (addresses.length === 0) {
89+
throw new MongoAPIError('No addresses found at host');
90+
}
9191

92-
for (const { name } of addresses) {
93-
if (!matchesParentDomain(name, lookupAddress)) {
94-
return callback(new MongoAPIError('Server record does not share hostname with parent URI'));
95-
}
92+
for (const { name } of addresses) {
93+
if (!matchesParentDomain(name, lookupAddress)) {
94+
throw new MongoAPIError('Server record does not share hostname with parent URI');
9695
}
96+
}
9797

98-
const hostAddresses = addresses.map(r =>
99-
HostAddress.fromString(`${r.name}:${r.port ?? 27017}`)
100-
);
98+
const hostAddresses = addresses.map(r => HostAddress.fromString(`${r.name}:${r.port ?? 27017}`));
99+
100+
validateLoadBalancedOptions(hostAddresses, options, true);
101101

102-
const lbError = validateLoadBalancedOptions(hostAddresses, options, true);
103-
if (lbError) {
104-
return callback(lbError);
102+
// Resolve TXT record and add options from there if they exist.
103+
let record;
104+
try {
105+
record = await dns.promises.resolveTxt(lookupAddress);
106+
} catch (error) {
107+
if (error.code !== 'ENODATA' && error.code !== 'ENOTFOUND') {
108+
throw error;
105109
}
110+
return hostAddresses;
111+
}
106112

107-
// Resolve TXT record and add options from there if they exist.
108-
dns.resolveTxt(lookupAddress, (err, record) => {
109-
if (err) {
110-
if (err.code !== 'ENODATA' && err.code !== 'ENOTFOUND') {
111-
return callback(err);
112-
}
113-
} else {
114-
if (record.length > 1) {
115-
return callback(new MongoParseError('Multiple text records not allowed'));
116-
}
113+
if (record.length > 1) {
114+
throw new MongoParseError('Multiple text records not allowed');
115+
}
117116

118-
const txtRecordOptions = new URLSearchParams(record[0].join(''));
119-
const txtRecordOptionKeys = [...txtRecordOptions.keys()];
120-
if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) {
121-
return callback(
122-
new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`)
123-
);
124-
}
117+
const txtRecordOptions = new URLSearchParams(record[0].join(''));
118+
const txtRecordOptionKeys = [...txtRecordOptions.keys()];
119+
if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) {
120+
throw new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`);
121+
}
125122

126-
if (VALID_TXT_RECORDS.some(option => txtRecordOptions.get(option) === '')) {
127-
return callback(new MongoParseError('Cannot have empty URI params in DNS TXT Record'));
128-
}
123+
if (VALID_TXT_RECORDS.some(option => txtRecordOptions.get(option) === '')) {
124+
throw new MongoParseError('Cannot have empty URI params in DNS TXT Record');
125+
}
129126

130-
const source = txtRecordOptions.get('authSource') ?? undefined;
131-
const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined;
132-
const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined;
133-
134-
if (
135-
!options.userSpecifiedAuthSource &&
136-
source &&
137-
options.credentials &&
138-
!AUTH_MECHS_AUTH_SRC_EXTERNAL.has(options.credentials.mechanism)
139-
) {
140-
options.credentials = MongoCredentials.merge(options.credentials, { source });
141-
}
127+
const source = txtRecordOptions.get('authSource') ?? undefined;
128+
const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined;
129+
const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined;
142130

143-
if (!options.userSpecifiedReplicaSet && replicaSet) {
144-
options.replicaSet = replicaSet;
145-
}
131+
if (
132+
!options.userSpecifiedAuthSource &&
133+
source &&
134+
options.credentials &&
135+
!AUTH_MECHS_AUTH_SRC_EXTERNAL.has(options.credentials.mechanism)
136+
) {
137+
options.credentials = MongoCredentials.merge(options.credentials, { source });
138+
}
146139

147-
if (loadBalanced === 'true') {
148-
options.loadBalanced = true;
149-
}
140+
if (!options.userSpecifiedReplicaSet && replicaSet) {
141+
options.replicaSet = replicaSet;
142+
}
150143

151-
if (options.replicaSet && options.srvMaxHosts > 0) {
152-
return callback(new MongoParseError('Cannot combine replicaSet option with srvMaxHosts'));
153-
}
144+
if (loadBalanced === 'true') {
145+
options.loadBalanced = true;
146+
}
154147

155-
const lbError = validateLoadBalancedOptions(hostAddresses, options, true);
156-
if (lbError) {
157-
return callback(lbError);
158-
}
159-
}
148+
if (options.replicaSet && options.srvMaxHosts > 0) {
149+
throw new MongoParseError('Cannot combine replicaSet option with srvMaxHosts');
150+
}
160151

161-
callback(undefined, hostAddresses);
162-
});
163-
});
152+
validateLoadBalancedOptions(hostAddresses, options, true);
153+
154+
return hostAddresses;
164155
}
165156

166157
/**
@@ -442,10 +433,8 @@ export function parseOptions(
442433
PromiseProvider.set(options.promiseLibrary);
443434
}
444435

445-
const lbError = validateLoadBalancedOptions(hosts, mongoOptions, isSRV);
446-
if (lbError) {
447-
throw lbError;
448-
}
436+
validateLoadBalancedOptions(hosts, mongoOptions, isSRV);
437+
449438
if (mongoClient && mongoOptions.autoEncryption) {
450439
Encrypter.checkForMongoCrypt();
451440
mongoOptions.encrypter = new Encrypter(mongoClient, uri, options);
@@ -522,24 +511,33 @@ export function parseOptions(
522511
return mongoOptions;
523512
}
524513

514+
/**
515+
* #### Throws if LB mode is true:
516+
* - hosts contains more than one host
517+
* - there is a replicaSet name set
518+
* - directConnection is set
519+
* - if srvMaxHosts is used when an srv connection string is passed in
520+
*
521+
* @throws MongoParseError
522+
*/
525523
function validateLoadBalancedOptions(
526524
hosts: HostAddress[] | string[],
527525
mongoOptions: MongoOptions,
528526
isSrv: boolean
529-
): MongoParseError | undefined {
527+
): void {
530528
if (mongoOptions.loadBalanced) {
531529
if (hosts.length > 1) {
532-
return new MongoParseError(LB_SINGLE_HOST_ERROR);
530+
throw new MongoParseError(LB_SINGLE_HOST_ERROR);
533531
}
534532
if (mongoOptions.replicaSet) {
535-
return new MongoParseError(LB_REPLICA_SET_ERROR);
533+
throw new MongoParseError(LB_REPLICA_SET_ERROR);
536534
}
537535
if (mongoOptions.directConnection) {
538-
return new MongoParseError(LB_DIRECT_CONNECTION_ERROR);
536+
throw new MongoParseError(LB_DIRECT_CONNECTION_ERROR);
539537
}
540538

541539
if (isSrv && mongoOptions.srvMaxHosts > 0) {
542-
return new MongoParseError('Cannot limit srv hosts with loadBalanced enabled');
540+
throw new MongoParseError('Cannot limit srv hosts with loadBalanced enabled');
543541
}
544542
}
545543
return;

0 commit comments

Comments
 (0)