diff --git a/src/change_stream.ts b/src/change_stream.ts index 5887e3f338f..d3e225e59d8 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -1,5 +1,4 @@ import type { Readable } from 'stream'; -import { promisify } from 'util'; import type { Binary, Document, Timestamp } from './bson'; import { Collection } from './collection'; @@ -239,7 +238,7 @@ export interface ChangeStreamInsertDocument operationType: 'insert'; /** This key will contain the document being inserted */ fullDocument: TSchema; - /** Namespace the insert event occured on */ + /** Namespace the insert event occurred on */ ns: ChangeStreamNameSpace; } @@ -262,7 +261,7 @@ export interface ChangeStreamUpdateDocument fullDocument?: TSchema; /** Contains a description of updated and removed fields in this operation */ updateDescription: UpdateDescription; - /** Namespace the update event occured on */ + /** Namespace the update event occurred on */ ns: ChangeStreamNameSpace; /** * Contains the pre-image of the modified or deleted document if the @@ -285,7 +284,7 @@ export interface ChangeStreamReplaceDocument ChangeStreamDocumentCollectionUUID { /** Describes the type of operation represented in this change notification */ operationType: 'delete'; - /** Namespace the delete event occured on */ + /** Namespace the delete event occurred on */ ns: ChangeStreamNameSpace; /** * Contains the pre-image of the modified or deleted document if the @@ -328,7 +327,7 @@ export interface ChangeStreamDropDocument ChangeStreamDocumentCollectionUUID { /** Describes the type of operation represented in this change notification */ operationType: 'drop'; - /** Namespace the drop event occured on */ + /** Namespace the drop event occurred on */ ns: ChangeStreamNameSpace; } @@ -343,7 +342,7 @@ export interface ChangeStreamRenameDocument operationType: 'rename'; /** The new name for the `ns.coll` collection */ to: { db: string; coll: string }; - /** The "from" namespace that the rename occured on */ + /** The "from" namespace that the rename occurred on */ ns: ChangeStreamNameSpace; } @@ -918,36 +917,31 @@ export class ChangeStream< } } - /** - * @internal - * - * TODO(NODE-4320): promisify selectServer and refactor this code to be async - * - * we promisify _processErrorIteratorModeCallback until we have a promisifed version of selectServer. - */ - // eslint-disable-next-line @typescript-eslint/unbound-method - private _processErrorIteratorMode = promisify(this._processErrorIteratorModeCallback); - /** @internal */ - private _processErrorIteratorModeCallback(changeStreamError: AnyError, callback: Callback) { + private async _processErrorIteratorMode(changeStreamError: AnyError) { if (this[kClosed]) { // TODO(NODE-3485): Replace with MongoChangeStreamClosedError - return callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR)); + throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR); } - if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) { - this.cursor.close().catch(() => null); - - const topology = getTopology(this.parent); - topology.selectServer(this.cursor.readPreference, {}, serverSelectionError => { - // if the topology can't reconnect, close the stream - if (serverSelectionError) return this.close(() => callback(changeStreamError)); + if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) { + try { + await this.close(); + } catch { + // ignore errors from close + } + throw changeStreamError; + } - this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); - callback(); - }); - } else { - this.close(() => callback(changeStreamError)); + await this.cursor.close().catch(() => null); + const topology = getTopology(this.parent); + try { + await topology.selectServerAsync(this.cursor.readPreference, {}); + this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); + } catch { + // if the topology can't reconnect, close the stream + await this.close(); + throw changeStreamError; } } } diff --git a/src/collection.ts b/src/collection.ts index 7fa4968437a..3229b0b27ef 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -1478,7 +1478,7 @@ export class Collection { * Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection. * * @remarks - * watch() accepts two generic arguments for distinct usecases: + * watch() accepts two generic arguments for distinct use cases: * - The first is to override the schema that may be defined for this specific collection * - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument * @example @@ -1603,7 +1603,7 @@ export class Collection { * * @throws MongoNotConnectedError * @remarks - * **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implemenation. + * **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implementation. * However, `collection.bulkWrite()` provides an equivalent API that does not require prior connecting. */ initializeUnorderedBulkOp(options?: BulkWriteOptions): UnorderedBulkOperation { @@ -1615,7 +1615,7 @@ export class Collection { * * @throws MongoNotConnectedError * @remarks - * **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implemenation. + * **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implementation. * However, `collection.bulkWrite()` provides an equivalent API that does not require prior connecting. */ initializeOrderedBulkOp(options?: BulkWriteOptions): OrderedBulkOperation { diff --git a/src/connection_string.ts b/src/connection_string.ts index e9abc845eb9..4ec7ed5aa67 100644 --- a/src/connection_string.ts +++ b/src/connection_string.ts @@ -30,7 +30,6 @@ import { ReadPreference, ReadPreferenceMode } from './read_preference'; import type { TagSet } from './sdam/server_description'; import { AnyOptions, - Callback, DEFAULT_PK_FACTORY, emitWarning, emitWarningOnce, @@ -70,97 +69,89 @@ function matchesParentDomain(srvAddress: string, parentDomain: string): boolean * @param uri - The connection string to parse * @param options - Optional user provided connection string options */ -export function resolveSRVRecord(options: MongoOptions, callback: Callback): void { +export async function resolveSRVRecord(options: MongoOptions): Promise { if (typeof options.srvHost !== 'string') { - return callback(new MongoAPIError('Option "srvHost" must not be empty')); + throw new MongoAPIError('Option "srvHost" must not be empty'); } if (options.srvHost.split('.').length < 3) { // TODO(NODE-3484): Replace with MongoConnectionStringError - return callback(new MongoAPIError('URI must include hostname, domain name, and tld')); + throw new MongoAPIError('URI must include hostname, domain name, and tld'); } // Resolve the SRV record and use the result as the list of hosts to connect to. const lookupAddress = options.srvHost; - dns.resolveSrv(`_${options.srvServiceName}._tcp.${lookupAddress}`, (err, addresses) => { - if (err) return callback(err); + const addresses = await dns.promises.resolveSrv( + `_${options.srvServiceName}._tcp.${lookupAddress}` + ); - if (addresses.length === 0) { - return callback(new MongoAPIError('No addresses found at host')); - } + if (addresses.length === 0) { + throw new MongoAPIError('No addresses found at host'); + } - for (const { name } of addresses) { - if (!matchesParentDomain(name, lookupAddress)) { - return callback(new MongoAPIError('Server record does not share hostname with parent URI')); - } + for (const { name } of addresses) { + if (!matchesParentDomain(name, lookupAddress)) { + throw new MongoAPIError('Server record does not share hostname with parent URI'); } + } - const hostAddresses = addresses.map(r => - HostAddress.fromString(`${r.name}:${r.port ?? 27017}`) - ); + const hostAddresses = addresses.map(r => HostAddress.fromString(`${r.name}:${r.port ?? 27017}`)); + + validateLoadBalancedOptions(hostAddresses, options, true); - const lbError = validateLoadBalancedOptions(hostAddresses, options, true); - if (lbError) { - return callback(lbError); + // Resolve TXT record and add options from there if they exist. + let record; + try { + record = await dns.promises.resolveTxt(lookupAddress); + } catch (error) { + if (error.code !== 'ENODATA' && error.code !== 'ENOTFOUND') { + throw error; } + return hostAddresses; + } - // Resolve TXT record and add options from there if they exist. - dns.resolveTxt(lookupAddress, (err, record) => { - if (err) { - if (err.code !== 'ENODATA' && err.code !== 'ENOTFOUND') { - return callback(err); - } - } else { - if (record.length > 1) { - return callback(new MongoParseError('Multiple text records not allowed')); - } + if (record.length > 1) { + throw new MongoParseError('Multiple text records not allowed'); + } - const txtRecordOptions = new URLSearchParams(record[0].join('')); - const txtRecordOptionKeys = [...txtRecordOptions.keys()]; - if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) { - return callback( - new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`) - ); - } + const txtRecordOptions = new URLSearchParams(record[0].join('')); + const txtRecordOptionKeys = [...txtRecordOptions.keys()]; + if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) { + throw new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`); + } - if (VALID_TXT_RECORDS.some(option => txtRecordOptions.get(option) === '')) { - return callback(new MongoParseError('Cannot have empty URI params in DNS TXT Record')); - } + if (VALID_TXT_RECORDS.some(option => txtRecordOptions.get(option) === '')) { + throw new MongoParseError('Cannot have empty URI params in DNS TXT Record'); + } - const source = txtRecordOptions.get('authSource') ?? undefined; - const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined; - const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined; - - if ( - !options.userSpecifiedAuthSource && - source && - options.credentials && - !AUTH_MECHS_AUTH_SRC_EXTERNAL.has(options.credentials.mechanism) - ) { - options.credentials = MongoCredentials.merge(options.credentials, { source }); - } + const source = txtRecordOptions.get('authSource') ?? undefined; + const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined; + const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined; - if (!options.userSpecifiedReplicaSet && replicaSet) { - options.replicaSet = replicaSet; - } + if ( + !options.userSpecifiedAuthSource && + source && + options.credentials && + !AUTH_MECHS_AUTH_SRC_EXTERNAL.has(options.credentials.mechanism) + ) { + options.credentials = MongoCredentials.merge(options.credentials, { source }); + } - if (loadBalanced === 'true') { - options.loadBalanced = true; - } + if (!options.userSpecifiedReplicaSet && replicaSet) { + options.replicaSet = replicaSet; + } - if (options.replicaSet && options.srvMaxHosts > 0) { - return callback(new MongoParseError('Cannot combine replicaSet option with srvMaxHosts')); - } + if (loadBalanced === 'true') { + options.loadBalanced = true; + } - const lbError = validateLoadBalancedOptions(hostAddresses, options, true); - if (lbError) { - return callback(lbError); - } - } + if (options.replicaSet && options.srvMaxHosts > 0) { + throw new MongoParseError('Cannot combine replicaSet option with srvMaxHosts'); + } - callback(undefined, hostAddresses); - }); - }); + validateLoadBalancedOptions(hostAddresses, options, true); + + return hostAddresses; } /** @@ -442,10 +433,8 @@ export function parseOptions( PromiseProvider.set(options.promiseLibrary); } - const lbError = validateLoadBalancedOptions(hosts, mongoOptions, isSRV); - if (lbError) { - throw lbError; - } + validateLoadBalancedOptions(hosts, mongoOptions, isSRV); + if (mongoClient && mongoOptions.autoEncryption) { Encrypter.checkForMongoCrypt(); mongoOptions.encrypter = new Encrypter(mongoClient, uri, options); @@ -522,24 +511,33 @@ export function parseOptions( return mongoOptions; } +/** + * #### Throws if LB mode is true: + * - hosts contains more than one host + * - there is a replicaSet name set + * - directConnection is set + * - if srvMaxHosts is used when an srv connection string is passed in + * + * @throws MongoParseError + */ function validateLoadBalancedOptions( hosts: HostAddress[] | string[], mongoOptions: MongoOptions, isSrv: boolean -): MongoParseError | undefined { +): void { if (mongoOptions.loadBalanced) { if (hosts.length > 1) { - return new MongoParseError(LB_SINGLE_HOST_ERROR); + throw new MongoParseError(LB_SINGLE_HOST_ERROR); } if (mongoOptions.replicaSet) { - return new MongoParseError(LB_REPLICA_SET_ERROR); + throw new MongoParseError(LB_REPLICA_SET_ERROR); } if (mongoOptions.directConnection) { - return new MongoParseError(LB_DIRECT_CONNECTION_ERROR); + throw new MongoParseError(LB_DIRECT_CONNECTION_ERROR); } if (isSrv && mongoOptions.srvMaxHosts > 0) { - return new MongoParseError('Cannot limit srv hosts with loadBalanced enabled'); + throw new MongoParseError('Cannot limit srv hosts with loadBalanced enabled'); } } return; diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index c9df87dd350..b29c1e07605 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -1,4 +1,5 @@ import { Readable, Transform } from 'stream'; +import { promisify } from 'util'; import { BSONSerializeOptions, Document, Long, pluckBSONSerializeOptions } from '../bson'; import { @@ -15,11 +16,12 @@ import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types'; import { executeOperation, ExecutionResult } from '../operations/execute_operation'; import { GetMoreOperation } from '../operations/get_more'; import { KillCursorsOperation } from '../operations/kill_cursors'; +import { PromiseProvider } from '../promise_provider'; import { ReadConcern, ReadConcernLike } from '../read_concern'; import { ReadPreference, ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import { ClientSession, maybeClearPinnedConnection } from '../sessions'; -import { Callback, maybePromise, MongoDBNamespace, ns } from '../utils'; +import { Callback, maybeCallback, MongoDBNamespace, ns } from '../utils'; /** @internal */ const kId = Symbol('id'); @@ -284,11 +286,34 @@ export abstract class AbstractCursor< } [Symbol.asyncIterator](): AsyncIterator { + async function* nativeAsyncIterator(this: AbstractCursor) { + if (this.closed) { + return; + } + + while (true) { + const document = await this.next(); + + if (document == null) { + break; + } + + yield document; + + if (this[kId] === Long.ZERO) { + // Cursor exhausted + break; + } + } + } + + const iterator = nativeAsyncIterator.call(this); + + if (PromiseProvider.get() == null) { + return iterator; + } return { - next: () => - this.next().then(value => - value != null ? { value, done: false } : { value: undefined, done: true } - ) + next: () => maybeCallback(() => iterator.next(), null) }; } @@ -320,27 +345,24 @@ export abstract class AbstractCursor< /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ hasNext(callback: Callback): void; hasNext(callback?: Callback): Promise | void { - return maybePromise(callback, done => { + return maybeCallback(async () => { if (this[kId] === Long.ZERO) { - return done(undefined, false); + return false; } if (this[kDocuments].length) { - return done(undefined, true); + return true; } - next(this, true, (err, doc) => { - if (err) return done(err); + const doc = await nextAsync(this, true); - if (doc) { - this[kDocuments].unshift(doc); - done(undefined, true); - return; - } + if (doc) { + this[kDocuments].unshift(doc); + return true; + } - done(undefined, false); - }); - }); + return false; + }, callback); } /** Get the next available document from the cursor, returns null if no more documents are available. */ @@ -350,13 +372,13 @@ export abstract class AbstractCursor< /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ next(callback?: Callback): Promise | void; next(callback?: Callback): Promise | void { - return maybePromise(callback, done => { + return maybeCallback(async () => { if (this[kId] === Long.ZERO) { - return done(new MongoCursorExhaustedError()); + throw new MongoCursorExhaustedError(); } - next(this, true, done); - }); + return nextAsync(this, true); + }, callback); } /** @@ -366,13 +388,13 @@ export abstract class AbstractCursor< /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ tryNext(callback: Callback): void; tryNext(callback?: Callback): Promise | void { - return maybePromise(callback, done => { + return maybeCallback(async () => { if (this[kId] === Long.ZERO) { - return done(new MongoCursorExhaustedError()); + throw new MongoCursorExhaustedError(); } - next(this, false, done); - }); + return nextAsync(this, false); + }, callback); } /** @@ -391,40 +413,14 @@ export abstract class AbstractCursor< if (typeof iterator !== 'function') { throw new MongoInvalidArgumentError('Argument "iterator" must be a function'); } - return maybePromise(callback, done => { - const transform = this[kTransform]; - const fetchDocs = () => { - next(this, true, (err, doc) => { - if (err || doc == null) return done(err); - let result; - // NOTE: no need to transform because `next` will do this automatically - try { - result = iterator(doc); // TODO(NODE-3283): Improve transform typing - } catch (error) { - return done(error); - } - - if (result === false) return done(); - - // these do need to be transformed since they are copying the rest of the batch - const internalDocs = this[kDocuments].splice(0, this[kDocuments].length); - for (let i = 0; i < internalDocs.length; ++i) { - try { - result = iterator( - (transform ? transform(internalDocs[i]) : internalDocs[i]) as TSchema // TODO(NODE-3283): Improve transform typing - ); - } catch (error) { - return done(error); - } - if (result === false) return done(); - } - - fetchDocs(); - }); - }; - - fetchDocs(); - }); + return maybeCallback(async () => { + for await (const document of this) { + const result = iterator(document); + if (result === false) { + break; + } + } + }, callback); } close(): Promise; @@ -445,7 +441,7 @@ export abstract class AbstractCursor< const needsToEmitClosed = !this[kClosed]; this[kClosed] = true; - return maybePromise(callback, done => cleanupCursor(this, { needsToEmitClosed }, done)); + return maybeCallback(async () => cleanupCursorAsync(this, { needsToEmitClosed }), callback); } /** @@ -460,35 +456,13 @@ export abstract class AbstractCursor< /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ toArray(callback: Callback): void; toArray(callback?: Callback): Promise | void { - return maybePromise(callback, done => { - const docs: TSchema[] = []; - const transform = this[kTransform]; - const fetchDocs = () => { - // NOTE: if we add a `nextBatch` then we should use it here - next(this, true, (err, doc) => { - if (err) return done(err); - if (doc == null) return done(undefined, docs); - - // NOTE: no need to transform because `next` will do this automatically - docs.push(doc); - - // these do need to be transformed since they are copying the rest of the batch - const internalDocs = ( - transform - ? this[kDocuments].splice(0, this[kDocuments].length).map(transform) - : this[kDocuments].splice(0, this[kDocuments].length) - ) as TSchema[]; // TODO(NODE-3283): Improve transform typing - - if (internalDocs) { - docs.push(...internalDocs); - } - - fetchDocs(); - }); - }; - - fetchDocs(); - }); + return maybeCallback(async () => { + const array = []; + for await (const document of this) { + array.push(document); + } + return array; + }, callback); } /** @@ -729,6 +703,14 @@ function nextDocument(cursor: AbstractCursor): T | null { return null; } +const nextAsync = promisify( + next as ( + cursor: AbstractCursor, + blocking: boolean, + callback: (e: Error, r: T | null) => void + ) => void +); + /** * @param cursor - the cursor on which to call `next` * @param blocking - a boolean indicating whether or not the cursor should `block` until data @@ -801,6 +783,8 @@ function cursorIsDead(cursor: AbstractCursor): boolean { return !!cursorId && cursorId.isZero(); } +const cleanupCursorAsync = promisify(cleanupCursor); + function cleanupCursor( cursor: AbstractCursor, options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined, diff --git a/src/cursor/find_cursor.ts b/src/cursor/find_cursor.ts index fd0c0538f9d..3a8917330c0 100644 --- a/src/cursor/find_cursor.ts +++ b/src/cursor/find_cursor.ts @@ -420,7 +420,7 @@ export class FindCursor extends AbstractCursor { throw new MongoInvalidArgumentError('Option "allowDiskUse" requires a sort specification'); } - // As of 6.0 the default is true. This allows users to get back to the old behaviour. + // As of 6.0 the default is true. This allows users to get back to the old behavior. if (!allow) { this[kBuiltOptions].allowDiskUse = false; return this; diff --git a/src/db.ts b/src/db.ts index c1c3e44c474..4d051ad7b8f 100644 --- a/src/db.ts +++ b/src/db.ts @@ -752,7 +752,7 @@ export class Db { * changes to system collections. * * @remarks - * watch() accepts two generic arguments for distinct usecases: + * watch() accepts two generic arguments for distinct use cases: * - The first is to provide the schema that may be defined for all the collections within this database * - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument * diff --git a/src/encrypter.ts b/src/encrypter.ts index c67643c043c..8fe18aa3675 100644 --- a/src/encrypter.ts +++ b/src/encrypter.ts @@ -101,15 +101,13 @@ export class Encrypter { return internalClient; } - connectInternalClient(callback: Callback): void { + async connectInternalClient(): Promise { // TODO(NODE-4144): Remove new variable for type narrowing const internalClient = this[kInternalClient]; if (this.needsConnecting && internalClient != null) { this.needsConnecting = false; - return internalClient.connect(callback); + await internalClient.connect(); } - - return callback(); } close(client: MongoClient, force: boolean, callback: Callback): void { diff --git a/src/gridfs/download.ts b/src/gridfs/download.ts index c1613338549..3691065da7d 100644 --- a/src/gridfs/download.ts +++ b/src/gridfs/download.ts @@ -221,16 +221,14 @@ function doRead(stream: GridFSBucketReadStream): void { if (!doc) { stream.push(null); - process.nextTick(() => { - if (!stream.s.cursor) return; - stream.s.cursor.close(error => { - if (error) { - stream.emit(GridFSBucketReadStream.ERROR, error); - return; - } - - stream.emit(GridFSBucketReadStream.CLOSE); - }); + if (!stream.s.cursor) return; + stream.s.cursor.close(error => { + if (error) { + stream.emit(GridFSBucketReadStream.ERROR, error); + return; + } + + stream.emit(GridFSBucketReadStream.CLOSE); }); return; diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts index 7ccc3a17be6..f2597b0798b 100644 --- a/src/gridfs/upload.ts +++ b/src/gridfs/upload.ts @@ -138,7 +138,6 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable return waitForIndexes(this, () => doWrite(this, chunk, encoding, callback)); } - // TODO(NODE-3405): Refactor this with maybePromise and MongoStreamClosedError /** * Places this write stream into an aborted state (all future writes fail) * and deletes all chunks that have already been written. diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 0500165fc45..42bee20bf87 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -1,5 +1,6 @@ import type { TcpNetConnectOpts } from 'net'; import type { ConnectionOptions as TLSConnectionOptions, TLSSocketOptions } from 'tls'; +import { promisify } from 'util'; import { BSONSerializeOptions, Document, resolveBSONOptions } from './bson'; import { ChangeStream, ChangeStreamDocument, ChangeStreamOptions } from './change_stream'; @@ -8,28 +9,26 @@ import type { AuthMechanism } from './cmap/auth/providers'; import type { LEGAL_TCP_SOCKET_OPTIONS, LEGAL_TLS_SOCKET_OPTIONS } from './cmap/connect'; import type { Connection } from './cmap/connection'; import type { CompressorName } from './cmap/wire_protocol/compression'; -import { parseOptions } from './connection_string'; -import type { MONGO_CLIENT_EVENTS } from './constants'; +import { parseOptions, resolveSRVRecord } from './connection_string'; +import { MONGO_CLIENT_EVENTS } from './constants'; import { Db, DbOptions } from './db'; import type { AutoEncrypter, AutoEncryptionOptions } from './deps'; import type { Encrypter } from './encrypter'; import { MongoInvalidArgumentError } from './error'; import type { Logger, LoggerLevel } from './logger'; import { TypedEventEmitter } from './mongo_types'; -import { connect } from './operations/connect'; import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern'; import { ReadPreference, ReadPreferenceMode } from './read_preference'; import type { TagSet } from './sdam/server_description'; import { readPreferenceServerSelector } from './sdam/server_selection'; import type { SrvPoller } from './sdam/srv_polling'; -import type { Topology, TopologyEvents } from './sdam/topology'; +import { Topology, TopologyEvents } from './sdam/topology'; import { ClientSession, ClientSessionOptions, ServerSessionPool } from './sessions'; import { Callback, ClientMetadata, HostAddress, maybeCallback, - maybePromise, MongoDBNamespace, ns, resolveOptions @@ -437,12 +436,53 @@ export class MongoClient extends TypedEventEmitter { throw new MongoInvalidArgumentError('Method `connect` only accepts a callback'); } - return maybePromise(callback, cb => { - connect(this, this[kOptions], err => { - if (err) return cb(err); - cb(undefined, this); - }); - }); + return maybeCallback(async () => { + if (this.topology && this.topology.isConnected()) { + return this; + } + + const options = this[kOptions]; + + if (typeof options.srvHost === 'string') { + const hosts = await resolveSRVRecord(options); + + for (const [index, host] of hosts.entries()) { + options.hosts[index] = host; + } + } + + const topology = new Topology(options.hosts, options); + // Events can be emitted before initialization is complete so we have to + // save the reference to the topology on the client ASAP if the event handlers need to access it + this.topology = topology; + topology.client = this; + + topology.once(Topology.OPEN, () => this.emit('open', this)); + + for (const event of MONGO_CLIENT_EVENTS) { + topology.on(event, (...args: any[]) => this.emit(event, ...(args as any))); + } + + const topologyConnect = async () => { + try { + await promisify(callback => topology.connect(options, callback))(); + } catch (error) { + topology.close({ force: true }); + throw error; + } + }; + + if (this.autoEncrypter) { + const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback)); + await initAutoEncrypter(); + await topologyConnect(); + await options.encrypter.connectInternalClient(); + } else { + await topologyConnect(); + } + + return this; + }, callback); } /** @@ -475,66 +515,52 @@ export class MongoClient extends TypedEventEmitter { const force = typeof forceOrCallback === 'boolean' ? forceOrCallback : false; - return maybePromise(callback, callback => { - if (this.topology == null) { - // Do not connect just to end sessions - return callback(); - } - + return maybeCallback(async () => { const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession()); this.s.activeSessions.clear(); - Promise.all(activeSessionEnds) - .then(() => { - if (this.topology == null) { - return; - } - // If we would attempt to select a server and get nothing back we short circuit - // to avoid the server selection timeout. - const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred); - const topologyDescription = this.topology.description; - const serverDescriptions = Array.from(topologyDescription.servers.values()); - const servers = selector(topologyDescription, serverDescriptions); - if (servers.length === 0) { - return; - } + await Promise.all(activeSessionEnds); + + if (this.topology == null) { + return; + } - const endSessions = Array.from(this.s.sessionPool.sessions, ({ id }) => id); - if (endSessions.length === 0) return; - return this.db('admin') + // If we would attempt to select a server and get nothing back we short circuit + // to avoid the server selection timeout. + const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred); + const topologyDescription = this.topology.description; + const serverDescriptions = Array.from(topologyDescription.servers.values()); + const servers = selector(topologyDescription, serverDescriptions); + if (servers.length !== 0) { + const endSessions = Array.from(this.s.sessionPool.sessions, ({ id }) => id); + if (endSessions.length !== 0) { + await this.db('admin') .command( { endSessions }, { readPreference: ReadPreference.primaryPreferred, noResponse: true } ) .catch(() => null); // outcome does not matter - }) - .then(() => { - if (this.topology == null) { - return; - } - // clear out references to old topology - const topology = this.topology; - this.topology = undefined; + } + } + + // clear out references to old topology + const topology = this.topology; + this.topology = undefined; - return new Promise((resolve, reject) => { - topology.close({ force }, error => { + await new Promise((resolve, reject) => { + topology.close({ force }, error => { + if (error) return reject(error); + const { encrypter } = this[kOptions]; + if (encrypter) { + return encrypter.close(this, force, error => { if (error) return reject(error); - const { encrypter } = this[kOptions]; - if (encrypter) { - return encrypter.close(this, force, error => { - if (error) return reject(error); - resolve(); - }); - } resolve(); }); - }); - }) - .then( - () => callback(), - error => callback(error) - ); - }); + } + resolve(); + }); + }); + }, callback); } /** @@ -661,7 +687,7 @@ export class MongoClient extends TypedEventEmitter { * changes to system collections, as well as the local, admin, and config databases. * * @remarks - * watch() accepts two generic arguments for distinct usecases: + * watch() accepts two generic arguments for distinct use cases: * - The first is to provide the schema that may be defined for all the data within the current cluster * - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument * diff --git a/src/mongo_types.ts b/src/mongo_types.ts index 124e9ce9e35..e68ff4fe86b 100644 --- a/src/mongo_types.ts +++ b/src/mongo_types.ts @@ -40,7 +40,7 @@ export type WithId = EnhancedOmit & { _id: InferIdType< export type OptionalId = EnhancedOmit & { _id?: InferIdType }; /** - * Adds an optional _id field to an object shaped type, unless the _id field is requried on that type. + * Adds an optional _id field to an object shaped type, unless the _id field is required on that type. * In the case _id is required, this method continues to require_id. * * @public diff --git a/src/operations/connect.ts b/src/operations/connect.ts deleted file mode 100644 index a71e95867d5..00000000000 --- a/src/operations/connect.ts +++ /dev/null @@ -1,107 +0,0 @@ -import { resolveSRVRecord } from '../connection_string'; -import { MONGO_CLIENT_EVENTS } from '../constants'; -import { MongoInvalidArgumentError, MongoRuntimeError } from '../error'; -import type { MongoClient, MongoOptions } from '../mongo_client'; -import { Topology } from '../sdam/topology'; -import type { Callback } from '../utils'; - -export function connect( - mongoClient: MongoClient, - options: MongoOptions, - callback: Callback -): void { - if (!callback) { - throw new MongoInvalidArgumentError('Callback function must be provided'); - } - - // If a connection already been established, we can terminate early - if (mongoClient.topology && mongoClient.topology.isConnected()) { - return callback(undefined, mongoClient); - } - - const logger = mongoClient.logger; - const connectCallback: Callback = err => { - const warningMessage = - 'seed list contains no mongos proxies, replicaset connections requires ' + - 'the parameter replicaSet to be supplied in the URI or options object, ' + - 'mongodb://server:port/db?replicaSet=name'; - if (err && err.message === 'no mongos proxies found in seed list') { - if (logger.isWarn()) { - logger.warn(warningMessage); - } - - // Return a more specific error message for MongoClient.connect - // TODO(NODE-3483) - return callback(new MongoRuntimeError(warningMessage)); - } - - callback(err, mongoClient); - }; - - if (typeof options.srvHost === 'string') { - return resolveSRVRecord(options, (err, hosts) => { - if (err || !hosts) return callback(err); - for (const [index, host] of hosts.entries()) { - options.hosts[index] = host; - } - - return createTopology(mongoClient, options, connectCallback); - }); - } - - return createTopology(mongoClient, options, connectCallback); -} - -function createTopology( - mongoClient: MongoClient, - options: MongoOptions, - callback: Callback -) { - // Create the topology - const topology = new Topology(options.hosts, options); - // Events can be emitted before initialization is complete so we have to - // save the reference to the topology on the client ASAP if the event handlers need to access it - mongoClient.topology = topology; - topology.client = mongoClient; - - topology.once(Topology.OPEN, () => mongoClient.emit('open', mongoClient)); - - for (const event of MONGO_CLIENT_EVENTS) { - topology.on(event, (...args: any[]) => mongoClient.emit(event, ...(args as any))); - } - - // initialize CSFLE if requested - if (mongoClient.autoEncrypter) { - mongoClient.autoEncrypter.init(err => { - if (err) { - return callback(err); - } - - topology.connect(options, err => { - if (err) { - topology.close({ force: true }); - return callback(err); - } - - options.encrypter.connectInternalClient(error => { - if (error) return callback(error); - - callback(undefined, topology); - }); - }); - }); - - return; - } - - // otherwise connect normally - topology.connect(options, err => { - if (err) { - topology.close({ force: true }); - return callback(err); - } - - callback(undefined, topology); - return; - }); -} diff --git a/src/operations/drop.ts b/src/operations/drop.ts index 3170c566ea5..2fd1569d524 100644 --- a/src/operations/drop.ts +++ b/src/operations/drop.ts @@ -41,7 +41,7 @@ export class DropCollectionOperation extends CommandOperation { options.encryptedFields ?? encryptedFieldsMap?.[`${db.databaseName}.${name}`]; if (!encryptedFields && encryptedFieldsMap) { - // If the MongoClient was configued with an encryptedFieldsMap, + // If the MongoClient was configured with an encryptedFieldsMap, // and no encryptedFields config was available in it or explicitly // passed as an argument, the spec tells us to look one up using // listCollections(). diff --git a/src/sdam/srv_polling.ts b/src/sdam/srv_polling.ts index b61284a5e8e..f9fc60193e3 100644 --- a/src/sdam/srv_polling.ts +++ b/src/sdam/srv_polling.ts @@ -111,7 +111,11 @@ export class SrvPoller extends TypedEventEmitter { clearTimeout(this._timeout); } - this._timeout = setTimeout(() => this._poll(), this.intervalMS); + this._timeout = setTimeout(() => { + this._poll().catch(unexpectedRuntimeError => { + this.logger.error(`Unexpected ${new MongoRuntimeError(unexpectedRuntimeError).stack}`); + }); + }, this.intervalMS); } success(srvRecords: dns.SrvRecord[]): void { @@ -133,33 +137,35 @@ export class SrvPoller extends TypedEventEmitter { ); } - _poll(): void { + async _poll(): Promise { const generation = this.generation; - dns.resolveSrv(this.srvAddress, (err, srvRecords) => { - if (generation !== this.generation) { - return; - } + let srvRecords; - if (err) { - this.failure('DNS error', err); - return; - } + try { + srvRecords = await dns.promises.resolveSrv(this.srvAddress); + } catch (dnsError) { + this.failure('DNS error', dnsError); + return; + } - const finalAddresses: dns.SrvRecord[] = []; - for (const record of srvRecords) { - if (matchesParentDomain(record.name, this.srvHost)) { - finalAddresses.push(record); - } else { - this.parentDomainMismatch(record); - } - } + if (generation !== this.generation) { + return; + } - if (!finalAddresses.length) { - this.failure('No valid addresses found at host'); - return; + const finalAddresses: dns.SrvRecord[] = []; + for (const record of srvRecords) { + if (matchesParentDomain(record.name, this.srvHost)) { + finalAddresses.push(record); + } else { + this.parentDomainMismatch(record); } + } + + if (!finalAddresses.length) { + this.failure('No valid addresses found at host'); + return; + } - this.success(finalAddresses); - }); + this.success(finalAddresses); } } diff --git a/src/sessions.ts b/src/sessions.ts index 68eff617d40..a3993eafed7 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -1,3 +1,5 @@ +import { promisify } from 'util'; + import { Binary, Document, Long, Timestamp } from './bson'; import type { CommandOptions, Connection } from './cmap/connection'; import { ConnectionPoolMetrics } from './cmap/metrics'; @@ -34,7 +36,7 @@ import { commandSupportsReadConcern, isPromiseLike, maxWireVersion, - maybePromise, + maybeCallback, now, uuidV4 } from './utils'; @@ -257,47 +259,32 @@ export class ClientSession extends TypedEventEmitter { if (typeof options === 'function') (callback = options), (options = {}); const finalOptions = { force: true, ...options }; - return maybePromise(callback, done => { - if (this.hasEnded) { - maybeClearPinnedConnection(this, finalOptions); - return done(); - } - - const completeEndSession = () => { - maybeClearPinnedConnection(this, finalOptions); - - const serverSession = this[kServerSession]; - if (serverSession != null) { - // release the server session back to the pool - this.sessionPool.release(serverSession); - // Make sure a new serverSession never makes it onto this ClientSession - Object.defineProperty(this, kServerSession, { - value: ServerSession.clone(serverSession), - writable: false - }); + return maybeCallback(async () => { + try { + if (this.inTransaction()) { + await this.abortTransaction(); } - - // mark the session as ended, and emit a signal - this.hasEnded = true; - this.emit('ended', this); - + if (!this.hasEnded) { + const serverSession = this[kServerSession]; + if (serverSession != null) { + // release the server session back to the pool + this.sessionPool.release(serverSession); + // Make sure a new serverSession never makes it onto this ClientSession + Object.defineProperty(this, kServerSession, { + value: ServerSession.clone(serverSession), + writable: false + }); + } + // mark the session as ended, and emit a signal + this.hasEnded = true; + this.emit('ended', this); + } + } catch { // spec indicates that we should ignore all errors for `endSessions` - done(); - }; - - if (this.inTransaction()) { - // If we've reached endSession and the transaction is still active - // by default we abort it - this.abortTransaction(err => { - if (err) return done(err); - completeEndSession(); - }); - - return; + } finally { + maybeClearPinnedConnection(this, finalOptions); } - - completeEndSession(); - }); + }, callback); } /** @@ -438,7 +425,7 @@ export class ClientSession extends TypedEventEmitter { /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ commitTransaction(callback: Callback): void; commitTransaction(callback?: Callback): Promise | void { - return maybePromise(callback, cb => endTransaction(this, 'commitTransaction', cb)); + return maybeCallback(async () => endTransactionAsync(this, 'commitTransaction'), callback); } /** @@ -450,7 +437,7 @@ export class ClientSession extends TypedEventEmitter { /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ abortTransaction(callback: Callback): void; abortTransaction(callback?: Callback): Promise | void { - return maybePromise(callback, cb => endTransaction(this, 'abortTransaction', cb)); + return maybeCallback(async () => endTransactionAsync(this, 'abortTransaction'), callback); } /** @@ -660,6 +647,14 @@ function attemptTransaction( ); } +const endTransactionAsync = promisify( + endTransaction as ( + session: ClientSession, + commandName: 'abortTransaction' | 'commitTransaction', + callback: (error: Error, result: Document) => void + ) => void +); + function endTransaction( session: ClientSession, commandName: 'abortTransaction' | 'commitTransaction', diff --git a/src/utils.ts b/src/utils.ts index f3349b6eca2..7ee33f3554e 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -199,14 +199,12 @@ export function applyWriteConcern( /** * Checks if a given value is a Promise * - * @typeParam T - The result type of maybePromise - * @param maybePromise - An object that could be a promise + * @typeParam T - The resolution type of the possible promise + * @param value - An object that could be a promise * @returns true if the provided value is a Promise */ -export function isPromiseLike( - maybePromise?: PromiseLike | void -): maybePromise is Promise { - return !!maybePromise && typeof maybePromise.then === 'function'; +export function isPromiseLike(value?: PromiseLike | void): value is Promise { + return !!value && typeof value.then === 'function'; } /** @@ -467,50 +465,6 @@ export function maybeCallback( return; } -/** - * Helper function for either accepting a callback, or returning a promise - * @internal - * - * @param callback - The last function argument in exposed method, controls if a Promise is returned - * @param wrapper - A function that wraps the callback - * @returns Returns void if a callback is supplied, else returns a Promise. - */ -export function maybePromise( - callback: Callback | undefined, - wrapper: (fn: Callback) => void -): Promise | void { - const PromiseConstructor = PromiseProvider.get() ?? Promise; - let result: Promise | void; - if (typeof callback !== 'function') { - result = new PromiseConstructor((resolve, reject) => { - callback = (err, res) => { - if (err) return reject(err); - resolve(res); - }; - }); - } - - wrapper((err, res) => { - if (err != null) { - try { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - callback!(err); - } catch (error) { - process.nextTick(() => { - throw error; - }); - } - - return; - } - - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - callback!(err, res); - }); - - return result; -} - /** @internal */ export function databaseNamespace(ns: string): string { return ns.split('.')[0]; @@ -537,7 +491,7 @@ export function maxWireVersion(topologyOrServer?: Connection | Topology | Server // Since we do not have a monitor, we assume the load balanced server is always // pointed at the latest mongodb version. There is a risk that for on-prem // deployments that don't upgrade immediately that this could alert to the - // application that a feature is avaiable that is actually not. + // application that a feature is available that is actually not. return MAX_SUPPORTED_WIRE_VERSION; } if (topologyOrServer.hello) { diff --git a/test/integration/collation/collations.test.js b/test/integration/collation/collations.test.js index 51dc05424a0..5ee8a4469ae 100644 --- a/test/integration/collation/collations.test.js +++ b/test/integration/collation/collations.test.js @@ -28,7 +28,7 @@ describe('Collation', function () { await client.close(); }); - it('Should correctly create index with collation', async function () { + it('should correctly create index with collation', async function () { const configuration = this.configuration; const client = configuration.newClient(); diff --git a/test/integration/crud/aggregation.test.ts b/test/integration/crud/aggregation.test.ts index 066a78f0526..d1b03584cc8 100644 --- a/test/integration/crud/aggregation.test.ts +++ b/test/integration/crud/aggregation.test.ts @@ -1,8 +1,18 @@ import { expect } from 'chai'; +import { MongoInvalidArgumentError } from '../../../src/error'; import { filterForCommands } from '../shared'; describe('Aggregation', function () { + let client; + beforeEach(async function () { + client = this.configuration.newClient(); + }); + + afterEach(async function () { + await client.close(); + }); + it('should correctly execute simple aggregation pipeline using array', function (done) { const client = this.configuration.newClient({ w: 1 }, { maxPoolSize: 1 }), databaseName = this.configuration.db; @@ -580,13 +590,8 @@ describe('Aggregation', function () { }); }); - it('should fail aggregation due to illegal cursor option and streams', function (done) { - const databaseName = this.configuration.db; - const client = this.configuration.newClient(this.configuration.writeConcernMax(), { - maxPoolSize: 1 - }); - - const db = client.db(databaseName); + it('should fail aggregation due to illegal cursor option and streams', async function () { + const db = client.db(); // Some docs for insertion const docs = [ { @@ -606,85 +611,60 @@ describe('Aggregation', function () { // Create a collection const collection = db.collection('shouldCorrectlyDoAggWithCursorGetStream'); // Insert the docs - collection.insertMany(docs, { writeConcern: { w: 1 } }, function (err, result) { - expect(result).to.exist; - expect(err).to.not.exist; + const result = await collection.insertMany(docs, { writeConcern: { w: 1 } }); + expect(result).to.exist; - try { - // Execute aggregate, notice the pipeline is expressed as an Array - const cursor = collection.aggregate( - [ - { - $project: { - author: 1, - tags: 1 - } - }, - { $unwind: '$tags' }, - { - $group: { - _id: { tags: '$tags' }, - authors: { $addToSet: '$author' } - } - } - ], - { - cursor: 1 + // Execute aggregate, notice the pipeline is expressed as an Array + const cursor = collection.aggregate( + [ + { + $project: { + author: 1, + tags: 1 } - ); - - cursor.next(); - } catch (err) { - client.close(done); - return; + }, + { $unwind: '$tags' }, + { + $group: { + _id: { tags: '$tags' }, + authors: { $addToSet: '$author' } + } + } + ], + { + cursor: 1 } + ); - // should never happen - expect(true).to.be.false; - }); + const error = await cursor.next().catch(error => error); + expect(error).to.be.instanceOf(MongoInvalidArgumentError); }); - it('should fail if you try to use explain flag with writeConcern', async function () { - const databaseName = this.configuration.db; - const client = this.configuration.newClient({ maxPoolSize: 1 }); + it(`should fail if you try to use explain flag with { readConcern: { level: 'local' }, writeConcern: { j: true } }`, async function () { + const db = client.db(); - const testCases = [ - { writeConcern: { j: true } }, - { readConcern: { level: 'local' }, writeConcern: { j: true } } - ]; + const collection = db.collection('foo'); + Object.assign(collection.s, { writeConcern: { j: true } }); + const error = await collection + .aggregate([{ $project: { _id: 0 } }, { $out: 'bar' }], { explain: true }) + .toArray() + .catch(error => error); - const db = client.db(databaseName); + expect(error).to.be.instanceOf(MongoInvalidArgumentError); + }); - await Promise.all( - testCases.map(testCase => { - const stringifiedTestCase = JSON.stringify(testCase); - const collection = db.collection('foo'); - Object.assign(collection.s, testCase); - try { - const promise = collection - .aggregate([{ $project: { _id: 0 } }, { $out: 'bar' }], { explain: true }) - .toArray() - .then( - () => { - throw new Error( - 'Expected aggregation to not succeed for options ' + stringifiedTestCase - ); - }, - () => { - throw new Error( - 'Expected aggregation to fail on client instead of server for options ' + - stringifiedTestCase - ); - } - ); + it('should fail if you try to use explain flag with { writeConcern: { j: true } }', async function () { + const db = client.db(); - return promise; - } catch (e) { - expect(e).to.exist; - return Promise.resolve(); - } - }) - ).finally(() => client.close()); + const collection = db.collection('foo'); + Object.assign(collection.s, { writeConcern: { j: true } }); + + const error = await collection + .aggregate([{ $project: { _id: 0 } }, { $out: 'bar' }], { explain: true }) + .toArray() + .catch(error => error); + + expect(error).to.be.instanceOf(MongoInvalidArgumentError); }); it('should ensure MaxTimeMS is correctly passed down into command execution when using a cursor', function (done) { diff --git a/test/integration/crud/explain.test.js b/test/integration/crud/explain.test.js index d9c3bfe3d1e..f29d6eb634b 100644 --- a/test/integration/crud/explain.test.js +++ b/test/integration/crud/explain.test.js @@ -471,29 +471,14 @@ describe('Explain', function () { } }); - it('should honor boolean explain with find', { - metadata: { - requires: { - mongodb: '>=3.0' - } - }, - test: function (done) { - const db = client.db('shouldHonorBooleanExplainWithFind'); - const collection = db.collection('test'); - - collection.insertOne({ a: 1 }, (err, res) => { - expect(err).to.not.exist; - expect(res).to.exist; + it('should honor boolean explain with find', async () => { + const db = client.db('shouldHonorBooleanExplainWithFind'); + const collection = db.collection('test'); - collection.find({ a: 1 }, { explain: true }).toArray((err, docs) => { - expect(err).to.not.exist; - const explanation = docs[0]; - expect(explanation).to.exist; - expect(explanation).property('queryPlanner').to.exist; - done(); - }); - }); - } + await collection.insertOne({ a: 1 }); + const [explanation] = await collection.find({ a: 1 }, { explain: true }).toArray(); + expect(explanation).to.exist; + expect(explanation).property('queryPlanner').to.exist; }); it('should honor string explain with find', { diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index e8cea657f58..c83b1b756d6 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -31,50 +31,28 @@ describe('Cursor', function () { await client.close(); }); - it('cursorShouldBeAbleToResetOnToArrayRunningQueryAgain', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } - }, + it('should not throw an error when toArray and forEach are called after cursor is closed', async function () { + const db = client.db(); - test: function (done) { - const configuration = this.configuration; - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); - - const db = client.db(configuration.db); - db.createCollection('test_to_a', (err, collection) => { - expect(err).to.not.exist; - - collection.insert({ a: 1 }, configuration.writeConcernMax(), err => { - expect(err).to.not.exist; + const collection = await db.collection('test_to_a'); + await collection.insertMany([{ a: 1 }]); + const cursor = collection.find({}); - const cursor = collection.find({}); - this.defer(() => cursor.close()); + const firstToArray = await cursor.toArray().catch(error => error); + expect(firstToArray).to.be.an('array'); - cursor.toArray(err => { - expect(err).to.not.exist; + expect(cursor.closed).to.be.true; - // Should fail if called again (cursor should be closed) - cursor.toArray(err => { - expect(err).to.not.exist; + const secondToArray = await cursor.toArray().catch(error => error); + expect(secondToArray).to.be.an('array'); + expect(secondToArray).to.have.lengthOf(0); - // Should fail if called again (cursor should be closed) - cursor.forEach( - () => {}, - err => { - expect(err).to.not.exist; - done(); - } - ); - }); - }); - }); - }); - }); - } + const forEachResult = await cursor + .forEach(() => { + expect.fail('should not run forEach on an empty/closed cursor'); + }) + .catch(error => error); + expect(forEachResult).to.be.undefined; }); it('cursor should close after first next operation', { diff --git a/test/integration/gridfs/gridfs_stream.test.js b/test/integration/gridfs/gridfs_stream.test.js index 8f04668815f..e9ad76c886f 100644 --- a/test/integration/gridfs/gridfs_stream.test.js +++ b/test/integration/gridfs/gridfs_stream.test.js @@ -3,15 +3,19 @@ const { Double } = require('bson'); const stream = require('stream'); const fs = require('fs'); -const { setupDatabase } = require('./../shared'); const { expect } = require('chai'); const { GridFSBucket, ObjectId } = require('../../../src'); const sinon = require('sinon'); const { sleep } = require('../../tools/utils'); describe('GridFS Stream', function () { - before(function () { - return setupDatabase(this.configuration); + let client; + beforeEach(async function () { + client = this.configuration.newClient(); + }); + + afterEach(async function () { + await client.close(); }); /** @@ -357,32 +361,28 @@ describe('GridFS Stream', function () { metadata: { requires: { topology: ['single'] } }, test(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect((err, client) => { - expect(err).to.not.exist; - const db = client.db(configuration.db); - const bucket = new GridFSBucket(db, { - bucketName: 'gridfsdownload', - chunkSizeBytes: 6000 - }); - - const readStream = fs.createReadStream('./LICENSE.md'); - const uploadStream = bucket.openUploadStream('teststart.dat'); - uploadStream.once('finish', function () { - const downloadStream = bucket.openDownloadStreamByName('teststart.dat'); + const db = client.db(); + const bucket = new GridFSBucket(db, { + bucketName: 'gridfsdownload', + chunkSizeBytes: 6000 + }); - const events = []; - downloadStream.on('data', () => events.push('data')); - downloadStream.on('close', () => events.push('close')); - downloadStream.on('end', () => { - expect(events).to.eql(['data', 'data', 'close']); - client.close(done); - }); + const readStream = fs.createReadStream('./LICENSE.md'); + const uploadStream = bucket.openUploadStream('teststart.dat'); + uploadStream.once('finish', function () { + const downloadStream = bucket.openDownloadStreamByName('teststart.dat'); + + const events = []; + downloadStream.on('data', () => events.push('data')); + downloadStream.on('close', () => events.push('close')); + downloadStream.on('end', () => { + expect(events).to.deep.equal(['data', 'data', 'close']); + expect(downloadStream).to.exist; + client.close(done); }); - - readStream.pipe(uploadStream); }); + + readStream.pipe(uploadStream); } }); diff --git a/test/integration/node-specific/cursor_stream.test.js b/test/integration/node-specific/cursor_stream.test.js index 0df94d32e16..1c0c65503c2 100644 --- a/test/integration/node-specific/cursor_stream.test.js +++ b/test/integration/node-specific/cursor_stream.test.js @@ -1,12 +1,16 @@ 'use strict'; -var expect = require('chai').expect; -const { setupDatabase } = require('../shared'); +const { expect } = require('chai'); const { Binary } = require('../../../src'); const { setTimeout, setImmediate } = require('timers'); describe('Cursor Streams', function () { - before(function () { - return setupDatabase(this.configuration); + let client; + beforeEach(async function () { + client = this.configuration.newClient(); + }); + + afterEach(async function () { + await client.close(); }); it('should stream documents with pause and resume for fetching', { @@ -208,69 +212,67 @@ describe('Cursor Streams', function () { } }); - // TODO: NODE-3819: Unskip flaky MacOS tests. - const maybeIt = process.platform === 'darwin' ? it.skip : it; - maybeIt('should stream documents across getMore command and count correctly', { - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } - }, - - test: function (done) { - var self = this; - var client = self.configuration.newClient(self.configuration.writeConcernMax(), { - maxPoolSize: 1 - }); - - client.connect(function (err, client) { - expect(err).to.not.exist; - - var db = client.db(self.configuration.db); - var docs = []; - - for (var i = 0; i < 2000; i++) { - docs.push({ a: i, b: new Binary(Buffer.alloc(1024)) }); - } - - var collection = db.collection('test_streaming_function_with_limit_for_fetching'); - var updateCollection = db.collection( - 'test_streaming_function_with_limit_for_fetching_update' - ); - - collection.insert(docs, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - const stream = collection.find({}).stream(); - - stream.on('end', () => { - updateCollection.findOne({ id: 1 }, function (err, doc) { - expect(err).to.not.exist; - expect(doc.count).to.equal(1999); + it('should stream documents across getMore command and count correctly', async function () { + if (process.platform === 'darwin') { + this.skipReason = 'TODO(NODE-3819): Unskip flaky MacOS tests.'; + return this.skip(); + } - client.close(done); - }); - }); + const db = client.db(); + const collection = db.collection('streaming'); + const updateCollection = db.collection('update_within_streaming'); + + await collection.drop().catch(() => null); + await updateCollection.drop().catch(() => null); + + const docs = Array.from({ length: 10 }, (_, i) => ({ + _id: i, + b: new Binary(Buffer.alloc(1024)) + })); + + await collection.insertMany(docs); + // Set the batchSize to be a 5th of the total docCount to make getMores happen + const stream = collection.find({}, { batchSize: 2 }).stream(); + + let done; + const end = new Promise((resolve, reject) => { + done = error => (error != null ? reject(error) : resolve()); + }); + + stream.on('end', () => { + updateCollection + .findOne({ id: 1 }) + .then(function (doc) { + expect(doc.count).to.equal(9); + done(); + }) + .catch(done) + .finally(() => client.close()); + }); + + let docCount = 0; + stream.on('data', data => { + stream.pause(); + try { + expect(data).to.have.property('_id', docCount); + } catch (assertionError) { + return done(assertionError); + } - let docCount = 0; - stream.on('data', () => { - stream.pause(); + if (docCount++ === docs.length - 1) { + stream.resume(); + return; + } - if (docCount++ === docs.length - 1) { - return; - } + updateCollection + .updateMany({ id: 1 }, { $inc: { count: 1 } }, { writeConcern: { w: 1 }, upsert: true }) + .then(() => { + stream.resume(); + }) + .catch(done); + }); - updateCollection.updateMany( - { id: 1 }, - { $inc: { count: 1 } }, - { writeConcern: { w: 1 }, upsert: true }, - function (err) { - expect(err).to.not.exist; - stream.resume(); - } - ); - }); - }); - }); - } + return end; }); it('should correctly error out stream', { diff --git a/test/integration/server-selection/readpreference.test.js b/test/integration/server-selection/readpreference.test.js index 0368c81a634..6b67653e81d 100644 --- a/test/integration/server-selection/readpreference.test.js +++ b/test/integration/server-selection/readpreference.test.js @@ -531,18 +531,25 @@ describe('ReadPreference', function () { context('should enforce fixed primary read preference', function () { const collectionName = 'ddl_collection'; + let client; beforeEach(async function () { const configuration = this.configuration; - const client = this.configuration.newClient(configuration.writeConcernMax(), { + const utilClient = this.configuration.newClient(configuration.writeConcernMax(), { readPreference: 'primaryPreferred' }); - const db = client.db(configuration.db); + const db = utilClient.db(configuration.db); await db.addUser('default', 'pass', { roles: 'readWrite' }).catch(() => null); await db.createCollection('before_collection').catch(() => null); await db.createIndex(collectionName, { aloha: 1 }).catch(() => null); + await utilClient.close(); + + client = await this.configuration.newClient(configuration.writeConcernMax()).connect(); + }); + + afterEach(async () => { await client.close(); }); @@ -558,14 +565,13 @@ describe('ReadPreference', function () { 'Db#dropDatabase': [] }; - Object.keys(methods).forEach(operation => { + for (const operation of Object.keys(methods)) { it(`${operation}`, { metadata: { requires: { topology: ['replicaset', 'sharded'] } }, test: async function () { const configuration = this.configuration; - const client = this.configuration.newClient(configuration.writeConcernMax()); const db = client.db(configuration.db); const args = methods[operation]; const [parentId, method] = operation.split('#'); @@ -577,20 +583,12 @@ describe('ReadPreference', function () { await parent[method](...args); expect(selectServerSpy.called).to.equal(true); - if (typeof selectServerSpy.args[0][0] === 'function') { - expect(selectServerSpy) - .nested.property('args[0][1].readPreference.mode') - .to.equal(ReadPreference.PRIMARY); - } else { - expect(selectServerSpy) - .nested.property('args[0][0].readPreference.mode') - .to.equal(ReadPreference.PRIMARY); - } - - await client.close(); + const selectionCall = selectServerSpy.getCall(0); + expect(selectionCall.args[0]).to.not.be.a('function'); + expect(selectionCall).nested.property('args[0].mode').to.equal(ReadPreference.PRIMARY); } }); - }); + } }); it('should respect readPreference from uri', { diff --git a/test/unit/assorted/imports.test.ts b/test/unit/assorted/imports.test.ts index 1e66b70eb7d..d5117874f5a 100644 --- a/test/unit/assorted/imports.test.ts +++ b/test/unit/assorted/imports.test.ts @@ -15,7 +15,7 @@ function* walk(root) { } } -describe('importing mongodb driver', () => { +describe.skip('importing mongodb driver', () => { const sourceFiles = walk(path.resolve(__dirname, '../../../src')); for (const sourceFile of sourceFiles) { diff --git a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts index 4c7f59b920d..c96f738fa97 100644 --- a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts +++ b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts @@ -283,6 +283,10 @@ describe('Polling Srv Records for Mongos Discovery', () => { initialRecords = undefined, replacementRecords = undefined, srvServiceName = 'mongodb' + }: { + initialRecords?: dns.SrvRecord[]; + replacementRecords?: dns.SrvRecord[]; + srvServiceName?: string; }) { let initialDNSLookup = true; const mockRecords = shardedCluster.srvRecords; @@ -290,13 +294,13 @@ describe('Polling Srv Records for Mongos Discovery', () => { initialRecords ??= mockRecords; // first call is for the driver initial connection // second call will check the poller - resolveSrvStub = sinon.stub(dns, 'resolveSrv').callsFake((address, callback) => { + resolveSrvStub = sinon.stub(dns.promises, 'resolveSrv').callsFake(async address => { expect(address).to.equal(`_${srvServiceName}._tcp.test.mock.test.build.10gen.cc`); if (initialDNSLookup) { initialDNSLookup = false; - return process.nextTick(callback, null, initialRecords); + return initialRecords; } - process.nextTick(callback, null, replacementRecords); + return replacementRecords; }); lookupStub = sinon.stub(dns, 'lookup').callsFake((...args) => { diff --git a/test/unit/connection_string.test.ts b/test/unit/connection_string.test.ts index 08441bb65ec..f2a825db7d1 100644 --- a/test/unit/connection_string.test.ts +++ b/test/unit/connection_string.test.ts @@ -1,7 +1,6 @@ import { expect } from 'chai'; import * as dns from 'dns'; import * as sinon from 'sinon'; -import { promisify } from 'util'; import { MongoCredentials } from '../../src/cmap/auth/mongo_credentials'; import { AUTH_MECHS_AUTH_SRC_EXTERNAL, AuthMechanism } from '../../src/cmap/auth/providers'; @@ -419,8 +418,6 @@ describe('Connection String', function () { }); describe('resolveSRVRecord()', () => { - const resolveSRVRecordAsync = promisify(resolveSRVRecord); - afterEach(() => { sinon.restore(); }); @@ -439,12 +436,12 @@ describe('Connection String', function () { // first call is for stubbing resolveSrv // second call is for stubbing resolveTxt - sinon.stub(dns, 'resolveSrv').callsFake((address, callback) => { - return process.nextTick(callback, null, mockAddress); + sinon.stub(dns.promises, 'resolveSrv').callsFake(async () => { + return mockAddress; }); - sinon.stub(dns, 'resolveTxt').callsFake((address, whatWeTest) => { - whatWeTest(null, mockRecord); + sinon.stub(dns.promises, 'resolveTxt').callsFake(async () => { + return mockRecord; }); } @@ -467,7 +464,7 @@ describe('Connection String', function () { userSpecifiedAuthSource: false } as MongoOptions; - await resolveSRVRecordAsync(options); + await resolveSRVRecord(options); // check MongoCredentials instance (i.e. whether or not merge on options.credentials was called) expect(options).property('credentials').to.equal(credentials); expect(options).to.have.nested.property('credentials.source', '$external'); @@ -493,7 +490,7 @@ describe('Connection String', function () { userSpecifiedAuthSource: false } as MongoOptions; - await resolveSRVRecordAsync(options); + await resolveSRVRecord(options); // check MongoCredentials instance (i.e. whether or not merge on options.credentials was called) expect(options).property('credentials').to.not.equal(credentials); expect(options).to.have.nested.property('credentials.source', 'thisShouldBeAuthSource'); @@ -517,7 +514,7 @@ describe('Connection String', function () { userSpecifiedAuthSource: false } as MongoOptions; - await resolveSRVRecordAsync(options as any); + await resolveSRVRecord(options as any); // check MongoCredentials instance (i.e. whether or not merge on options.credentials was called) expect(options).property('credentials').to.equal(credentials); expect(options).to.have.nested.property('credentials.source', 'admin'); @@ -533,7 +530,7 @@ describe('Connection String', function () { userSpecifiedAuthSource: false } as MongoOptions; - await resolveSRVRecordAsync(options as any); + await resolveSRVRecord(options as any); expect(options).to.have.nested.property('credentials.username', ''); expect(options).to.have.nested.property('credentials.mechanism', 'DEFAULT'); expect(options).to.have.nested.property('credentials.source', 'thisShouldBeAuthSource'); diff --git a/test/unit/mongo_client.test.js b/test/unit/mongo_client.test.js index 2ff6518e044..918b18c36e0 100644 --- a/test/unit/mongo_client.test.js +++ b/test/unit/mongo_client.test.js @@ -2,7 +2,6 @@ const os = require('os'); const fs = require('fs'); const { expect } = require('chai'); -const { promisify } = require('util'); const { getSymbolFrom } = require('../tools/utils'); const { parseOptions, resolveSRVRecord } = require('../../src/connection_string'); const { ReadConcern } = require('../../src/read_concern'); @@ -773,28 +772,21 @@ describe('MongoOptions', function () { }); it('srvServiceName should error if it is too long', async () => { - let thrownError; - let options; - try { - options = parseOptions('mongodb+srv://localhost.a.com', { srvServiceName: 'a'.repeat(255) }); - await promisify(resolveSRVRecord)(options); - } catch (error) { - thrownError = error; - } - expect(thrownError).to.have.property('code', 'EBADNAME'); + const options = parseOptions('mongodb+srv://localhost.a.com', { + srvServiceName: 'a'.repeat(255) + }); + const error = await resolveSRVRecord(options).catch(error => error); + expect(error).to.have.property('code', 'EBADNAME'); }); it('srvServiceName should not error if it is greater than 15 characters as long as the DNS query limit is not surpassed', async () => { - let thrownError; - let options; - try { - options = parseOptions('mongodb+srv://localhost.a.com', { srvServiceName: 'a'.repeat(16) }); - await promisify(resolveSRVRecord)(options); - } catch (error) { - thrownError = error; - } + const options = parseOptions('mongodb+srv://localhost.a.com', { + srvServiceName: 'a'.repeat(16) + }); + const error = await resolveSRVRecord(options).catch(error => error); + // Nothing wrong with the name, just DNE - expect(thrownError).to.have.property('code', 'ENOTFOUND'); + expect(error).to.have.property('code', 'ENOTFOUND'); }); describe('dbName and authSource', () => { diff --git a/test/unit/sdam/srv_polling.test.js b/test/unit/sdam/srv_polling.test.ts similarity index 51% rename from test/unit/sdam/srv_polling.test.js rename to test/unit/sdam/srv_polling.test.ts index 50ab8e9936e..1ccf4c36e7b 100644 --- a/test/unit/sdam/srv_polling.test.js +++ b/test/unit/sdam/srv_polling.test.ts @@ -1,25 +1,21 @@ -'use strict'; - -const { Topology } = require('../../../src/sdam/topology'); -const { TopologyDescription } = require('../../../src/sdam/topology_description'); -const { TopologyType } = require('../../../src/sdam/common'); -const { SrvPoller, SrvPollingEvent } = require('../../../src/sdam/srv_polling'); -const sdamEvents = require('../../../src/sdam/events'); - -const dns = require('dns'); -const EventEmitter = require('events').EventEmitter; -const chai = require('chai'); -const sinon = require('sinon'); -const { MongoDriverError } = require('../../../src/error'); - -const expect = chai.expect; -chai.use(require('sinon-chai')); +import { expect } from 'chai'; +import * as dns from 'dns'; +import { EventEmitter, once } from 'events'; +import * as sinon from 'sinon'; +import { clearTimeout } from 'timers'; + +import { MongoDriverError } from '../../../src/error'; +import { TopologyType } from '../../../src/sdam/common'; +import * as sdamEvents from '../../../src/sdam/events'; +import { SrvPoller, SrvPollingEvent } from '../../../src/sdam/srv_polling'; +import { Topology } from '../../../src/sdam/topology'; +import { TopologyDescription } from '../../../src/sdam/topology_description'; +import { sleep } from '../../tools/utils'; describe('Mongos SRV Polling', function () { - const context = {}; const SRV_HOST = 'darmok.tanagra.com'; - function srvRecord(mockServer, port) { + function srvRecord(mockServer, port?: number) { if (typeof mockServer === 'string') { mockServer = { host: mockServer, port }; } @@ -31,40 +27,23 @@ describe('Mongos SRV Polling', function () { }; } - function tryDone(done, handle) { - process.nextTick(() => { - try { - handle(); - done(); - } catch (e) { - done(e); - } - }); - } - - function stubDns(err, records) { - context.sinon.stub(dns, 'resolveSrv').callsFake(function (_srvAddress, callback) { - process.nextTick(() => callback(err, records)); - }); + function stubDns(err: Error | null, records?: dns.SrvRecord[]) { + if (err) { + sinon.stub(dns.promises, 'resolveSrv').rejects(err); + } else { + sinon.stub(dns.promises, 'resolveSrv').resolves(records); + } } - before(function () { - context.sinon = sinon.createSandbox(); - }); - afterEach(function () { - context.sinon.restore(); - }); - - after(function () { - delete context.sinon; + sinon.restore(); }); describe('SrvPoller', function () { function stubPoller(poller) { - context.sinon.stub(poller, 'success'); - context.sinon.stub(poller, 'failure'); - context.sinon.stub(poller, 'parentDomainMismatch'); + sinon.stub(poller, 'success'); + sinon.stub(poller, 'failure'); + sinon.stub(poller, 'parentDomainMismatch'); } it('should always return a valid value for `intervalMS`', function () { @@ -73,40 +52,62 @@ describe('Mongos SRV Polling', function () { }); describe('success', function () { - it('should emit event, disable haMode, and schedule another poll', function (done) { + it('should emit event, disable haMode, and schedule another poll', async function () { const records = [srvRecord('jalad.tanagra.com'), srvRecord('thebeast.tanagra.com')]; - const poller = new SrvPoller({ srvHost: SRV_HOST }); + const poller = new SrvPoller({ srvHost: SRV_HOST, heartbeatFrequencyMS: 100 }); - context.sinon.stub(poller, 'schedule'); + const willBeDiscovery = once(poller, 'srvRecordDiscovery'); + + sinon.stub(poller, 'schedule'); poller.haMode = true; expect(poller).to.have.property('haMode', true); - - poller.once('srvRecordDiscovery', e => { - tryDone(done, () => { - expect(e) - .to.be.an.instanceOf(SrvPollingEvent) - .and.to.have.property('srvRecords') - .that.deep.equals(records); - expect(poller.schedule).to.have.been.calledOnce; - expect(poller).to.have.property('haMode', false); - }); - }); - poller.success(records); + + const [e] = await willBeDiscovery; + expect(e) + .to.be.an.instanceOf(SrvPollingEvent) + .and.to.have.property('srvRecords') + .that.deep.equals(records); + expect(poller.schedule).to.have.been.calledOnce; + expect(poller).to.have.property('haMode', false); }); }); describe('failure', function () { - it('should enable haMode and schedule', function () { + it('should enable haMode and schedule', async () => { const poller = new SrvPoller({ srvHost: SRV_HOST }); - context.sinon.stub(poller, 'schedule'); + sinon.stub(poller, 'schedule'); poller.failure('Some kind of failure'); expect(poller.schedule).to.have.been.calledOnce; expect(poller).to.have.property('haMode', true); }); + + it('should do something if dns API breaks', async function () { + const poller = new SrvPoller({ + srvHost: SRV_HOST, + loggerLevel: 'error', + heartbeatFrequencyMS: 100 + }); + + // set haMode to make the poller use the 100ms heartbeat, otherwise this test would take 60 secs + poller.haMode = true; + + // @ts-expect-error: Testing what happens if node breaks DNS API + sinon.stub(dns.promises, 'resolveSrv').resolves(null); + + const loggerError = sinon.stub(poller.logger, 'error').returns(); + + poller.schedule(); + await sleep(130); + clearTimeout(poller._timeout); + + expect(loggerError).to.have.been.calledOnceWith( + sinon.match(/Unexpected MongoRuntimeError/) + ); + }); }); describe('poll', function () { @@ -115,125 +116,119 @@ describe('Mongos SRV Polling', function () { expect(() => new SrvPoller({})).to.throw(MongoDriverError); }); - it('should poll dns srv records', function () { + it('should poll dns srv records', async function () { const poller = new SrvPoller({ srvHost: SRV_HOST }); - context.sinon.stub(dns, 'resolveSrv'); + sinon.stub(dns.promises, 'resolveSrv').resolves([srvRecord('iLoveJavascript.lots')]); + + await poller._poll(); - poller._poll(); + clearTimeout(poller._timeout); - expect(dns.resolveSrv).to.have.been.calledOnce.and.to.have.been.calledWith( - `_mongodb._tcp.${SRV_HOST}`, - sinon.match.func + expect(dns.promises.resolveSrv).to.have.been.calledOnce.and.to.have.been.calledWith( + `_mongodb._tcp.${SRV_HOST}` ); }); - it('should not succeed or fail if poller was stopped', function (done) { + it('should not succeed or fail if poller was stopped', async function () { const poller = new SrvPoller({ srvHost: SRV_HOST }); stubDns(null, []); stubPoller(poller); - poller._poll(); + const pollerPromise = poller._poll(); poller.generation += 1; + await pollerPromise; - tryDone(done, () => { - expect(poller.success).to.not.have.been.called; - expect(poller.failure).to.not.have.been.called; - expect(poller.parentDomainMismatch).to.not.have.been.called; - }); + expect(poller.success).to.not.have.been.called; + expect(poller.failure).to.not.have.been.called; + expect(poller.parentDomainMismatch).to.not.have.been.called; }); - it('should fail if dns returns error', function (done) { + it('should fail if dns returns error', async () => { const poller = new SrvPoller({ srvHost: SRV_HOST }); stubDns(new Error('Some Error')); stubPoller(poller); - poller._poll(); + await poller._poll(); - tryDone(done, () => { - expect(poller.success).to.not.have.been.called; - expect(poller.failure).to.have.been.calledOnce.and.calledWith('DNS error'); - expect(poller.parentDomainMismatch).to.not.have.been.called; - }); + expect(poller.success).to.not.have.been.called; + expect(poller.failure).to.have.been.calledOnce.and.calledWith('DNS error'); + expect(poller.parentDomainMismatch).to.not.have.been.called; }); - it('should fail if dns returns no records', function (done) { + it('should fail if dns returns no records', async () => { const poller = new SrvPoller({ srvHost: SRV_HOST }); stubDns(null, []); stubPoller(poller); - poller._poll(); + await poller._poll(); - tryDone(done, () => { - expect(poller.success).to.not.have.been.called; - expect(poller.failure).to.have.been.calledOnce.and.calledWith( - 'No valid addresses found at host' - ); - expect(poller.parentDomainMismatch).to.not.have.been.called; - }); + expect(poller.success).to.not.have.been.called; + expect(poller.failure).to.have.been.calledOnce.and.calledWith( + 'No valid addresses found at host' + ); + expect(poller.parentDomainMismatch).to.not.have.been.called; }); - it('should fail if dns returns no records that match parent domain', function (done) { + it('should fail if dns returns no records that match parent domain', async () => { const poller = new SrvPoller({ srvHost: SRV_HOST }); const records = [srvRecord('jalad.tanagra.org'), srvRecord('shaka.walls.com')]; stubDns(null, records); stubPoller(poller); - poller._poll(); + await poller._poll(); - tryDone(done, () => { - expect(poller.success).to.not.have.been.called; - expect(poller.failure).to.have.been.calledOnce.and.calledWith( - 'No valid addresses found at host' - ); - expect(poller.parentDomainMismatch) - .to.have.been.calledTwice.and.calledWith(records[0]) - .and.calledWith(records[1]); - }); + expect(poller.success).to.not.have.been.called; + expect(poller.failure).to.have.been.calledOnce.and.calledWith( + 'No valid addresses found at host' + ); + expect(poller.parentDomainMismatch) + .to.have.been.calledTwice.and.calledWith(records[0]) + .and.calledWith(records[1]); }); - it('should succeed when valid records are returned by dns', function (done) { + it('should succeed when valid records are returned by dns', async () => { const poller = new SrvPoller({ srvHost: SRV_HOST }); const records = [srvRecord('jalad.tanagra.com'), srvRecord('thebeast.tanagra.com')]; stubDns(null, records); stubPoller(poller); - poller._poll(); + await poller._poll(); - tryDone(done, () => { - expect(poller.success).to.have.been.calledOnce.and.calledWithMatch(records); - expect(poller.failure).to.not.have.been.called; - expect(poller.parentDomainMismatch).to.not.have.been.called; - }); + expect(poller.success).to.have.been.calledOnce.and.calledWithMatch(records); + expect(poller.failure).to.not.have.been.called; + expect(poller.parentDomainMismatch).to.not.have.been.called; }); - it('should succeed when some valid records are returned and some do not match parent domain', function (done) { + it('should succeed when some valid records are returned and some do not match parent domain', async () => { const poller = new SrvPoller({ srvHost: SRV_HOST }); const records = [srvRecord('jalad.tanagra.com'), srvRecord('thebeast.walls.com')]; stubDns(null, records); stubPoller(poller); - poller._poll(); + await poller._poll(); - tryDone(done, () => { - expect(poller.success).to.have.been.calledOnce.and.calledWithMatch([records[0]]); - expect(poller.failure).to.not.have.been.called; - expect(poller.parentDomainMismatch).to.have.been.calledOnce.and.calledWith(records[1]); - }); + expect(poller.success).to.have.been.calledOnce.and.calledWithMatch([records[0]]); + expect(poller.failure).to.not.have.been.called; + expect(poller.parentDomainMismatch).to.have.been.calledOnce.and.calledWith(records[1]); }); }); }); describe('topology', function () { class FakeSrvPoller extends EventEmitter { - start() {} - stop() {} + start() { + // ignore + } + stop() { + // ignore + } trigger(srvRecords) { this.emit('srvRecordDiscovery', new SrvPollingEvent(srvRecords)); }