Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion test/functional/sessions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ describe('Sessions - functional', function () {
'Server returns an error on listCollections with snapshot',
'Server returns an error on listDatabases with snapshot',
'Server returns an error on listIndexes with snapshot',
'Server returns an error on runCommand with snapshot'
'Server returns an error on runCommand with snapshot',
'Server returns an error on findOneAndUpdate with snapshot',
'Server returns an error on deleteOne with snapshot',
'Server returns an error on updateOne with snapshot'
]
};
const testsToSkip = skipTestMap[sessionTests.description] || [];
Expand Down
144 changes: 120 additions & 24 deletions test/functional/unified-spec-runner/entities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,20 @@ import { WriteConcern } from '../../../src/write_concern';
import { ReadPreference } from '../../../src/read_preference';
import { ClientSession } from '../../../src/sessions';
import { ChangeStream } from '../../../src/change_stream';
import { FindCursor } from '../../../src/cursor/find_cursor';
import type { ClientEntity, EntityDescription } from './schema';
import type {
ConnectionPoolCreatedEvent,
ConnectionPoolClosedEvent,
ConnectionCreatedEvent,
ConnectionReadyEvent,
ConnectionClosedEvent,
ConnectionCheckOutStartedEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckedOutEvent,
ConnectionCheckedInEvent,
ConnectionPoolClearedEvent
} from '../../../src/cmap/connection_pool_events';
import type {
CommandFailedEvent,
CommandStartedEvent,
Expand All @@ -26,6 +39,17 @@ interface UnifiedChangeStream extends ChangeStream {
}

export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent;
export type CmapEvent =
| ConnectionPoolCreatedEvent
| ConnectionPoolClosedEvent
| ConnectionCreatedEvent
| ConnectionReadyEvent
| ConnectionClosedEvent
| ConnectionCheckOutStartedEvent
| ConnectionCheckOutFailedEvent
| ConnectionCheckedOutEvent
| ConnectionCheckedInEvent
| ConnectionPoolClearedEvent;

function serverApiConfig() {
if (process.env.MONGODB_API_VERSION) {
Expand All @@ -38,52 +62,105 @@ function getClient(address) {
return new MongoClient(`mongodb://${address}`, serverApi ? { serverApi } : {});
}

type PushFunction = (e: CommandEvent | CmapEvent) => void;

export class UnifiedMongoClient extends MongoClient {
events: CommandEvent[];
commandEvents: CommandEvent[];
cmapEvents: CmapEvent[];
failPoints: Document[];
ignoredEvents: string[];
observedEvents: ('commandStarted' | 'commandSucceeded' | 'commandFailed')[];
observedCommandEvents: ('commandStarted' | 'commandSucceeded' | 'commandFailed')[];
observedCmapEvents: (
| 'connectionPoolCreated'
| 'connectionPoolClosed'
| 'connectionPoolCleared'
| 'connectionCreated'
| 'connectionReady'
| 'connectionClosed'
| 'connectionCheckOutStarted'
| 'connectionCheckOutFailed'
| 'connectionCheckedOut'
| 'connectionCheckedIn'
)[];

static EVENT_NAME_LOOKUP = {
static COMMAND_EVENT_NAME_LOOKUP = {
commandStartedEvent: 'commandStarted',
commandSucceededEvent: 'commandSucceeded',
commandFailedEvent: 'commandFailed'
} as const;

static CMAP_EVENT_NAME_LOOKUP = {
poolCreatedEvent: 'connectionPoolCreated',
poolClosedEvent: 'connectionPoolClosed',
poolClearedEvent: 'connectionPoolCleared',
connectionCreatedEvent: 'connectionCreated',
connectionReadyEvent: 'connectionReady',
connectionClosedEvent: 'connectionClosed',
connectionCheckOutStartedEvent: 'connectionCheckOutStarted',
connectionCheckOutFailedEvent: 'connectionCheckOutFailed',
connectionCheckedOutEvent: 'connectionCheckedOut',
connectionCheckedInEvent: 'connectionCheckedIn'
} as const;

constructor(url: string, description: ClientEntity) {
super(url, {
monitorCommands: true,
...description.uriOptions,
serverApi: description.serverApi ? description.serverApi : serverApiConfig()
});
this.events = [];
this.commandEvents = [];
this.cmapEvents = [];
this.failPoints = [];
this.ignoredEvents = [
...(description.ignoreCommandMonitoringEvents ?? []),
'configureFailPoint'
];
// apm
this.observedEvents = (description.observeEvents ?? []).map(
e => UnifiedMongoClient.EVENT_NAME_LOOKUP[e]
);
for (const eventName of this.observedEvents) {
this.on(eventName, this.pushEvent);
this.observedCommandEvents = (description.observeEvents ?? [])
.map(e => UnifiedMongoClient.COMMAND_EVENT_NAME_LOOKUP[e])
.filter(e => !!e);
this.observedCmapEvents = (description.observeEvents ?? [])
.map(e => UnifiedMongoClient.CMAP_EVENT_NAME_LOOKUP[e])
.filter(e => !!e);
for (const eventName of this.observedCommandEvents) {
this.on(eventName, this.pushCommandEvent);
}
for (const eventName of this.observedCmapEvents) {
this.on(eventName, this.pushCmapEvent);
}
}

// NOTE: pushEvent must be an arrow function
pushEvent: (e: CommandEvent) => void = e => {
if (!this.ignoredEvents.includes(e.commandName)) {
this.events.push(e);
isIgnored(e: CommandEvent | CmapEvent): boolean {
return this.ignoredEvents.includes(e.commandName);
}

// NOTE: pushCommandEvent must be an arrow function
pushCommandEvent: (e: CommandEvent) => void = e => {
if (!this.isIgnored(e)) {
this.commandEvents.push(e);
}
};

/** Disables command monitoring for the client and returns a list of the captured events. */
stopCapturingEvents(): CommandEvent[] {
for (const eventName of this.observedEvents) {
this.off(eventName, this.pushEvent);
// NOTE: pushCmapEvent must be an arrow function
pushCmapEvent: (e: CmapEvent) => void = e => {
this.cmapEvents.push(e);
};

stopCapturingEvents(pushFn: PushFunction): void {
const observedEvents = this.observedCommandEvents.concat(this.observedCmapEvents);
for (const eventName of observedEvents) {
this.off(eventName, pushFn);
}
return this.events;
}

/** Disables command monitoring for the client and returns a list of the captured events. */
stopCapturingCommandEvents(): CommandEvent[] {
this.stopCapturingEvents(this.pushCommandEvent);
return this.commandEvents;
}

stopCapturingCmapEvents(): CmapEvent[] {
this.stopCapturingEvents(this.pushCmapEvent);
return this.cmapEvents;
}
}

Expand Down Expand Up @@ -137,6 +214,7 @@ export type Entity =
| Db
| Collection
| ClientSession
| FindCursor
| UnifiedChangeStream
| GridFSBucket
| Document; // Results from operations
Expand All @@ -147,16 +225,25 @@ export type EntityCtor =
| typeof Collection
| typeof ClientSession
| typeof ChangeStream
| typeof FindCursor
| typeof GridFSBucket;

export type EntityTypeId = 'client' | 'db' | 'collection' | 'session' | 'bucket' | 'stream';
export type EntityTypeId =
| 'client'
| 'db'
| 'collection'
| 'session'
| 'bucket'
| 'cursor'
| 'stream';

const ENTITY_CTORS = new Map<EntityTypeId, EntityCtor>();
ENTITY_CTORS.set('client', UnifiedMongoClient);
ENTITY_CTORS.set('db', Db);
ENTITY_CTORS.set('collection', Collection);
ENTITY_CTORS.set('session', ClientSession);
ENTITY_CTORS.set('bucket', GridFSBucket);
ENTITY_CTORS.set('cursor', FindCursor);
ENTITY_CTORS.set('stream', ChangeStream);

export class EntitiesMap<E = Entity> extends Map<string, E> {
Expand All @@ -172,6 +259,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
mapOf(type: 'collection'): EntitiesMap<Collection>;
mapOf(type: 'session'): EntitiesMap<ClientSession>;
mapOf(type: 'bucket'): EntitiesMap<GridFSBucket>;
mapOf(type: 'cursor'): EntitiesMap<FindCursor>;
mapOf(type: 'stream'): EntitiesMap<UnifiedChangeStream>;
mapOf(type: EntityTypeId): EntitiesMap<Entity> {
const ctor = ENTITY_CTORS.get(type);
Expand All @@ -186,6 +274,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
getEntity(type: 'collection', key: string, assertExists?: boolean): Collection;
getEntity(type: 'session', key: string, assertExists?: boolean): ClientSession;
getEntity(type: 'bucket', key: string, assertExists?: boolean): GridFSBucket;
getEntity(type: 'cursor', key: string, assertExists?: boolean): FindCursor;
getEntity(type: 'stream', key: string, assertExists?: boolean): UnifiedChangeStream;
getEntity(type: EntityTypeId, key: string, assertExists = true): Entity {
const entity = this.get(key);
Expand All @@ -205,11 +294,17 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {

async cleanup(): Promise<void> {
await this.failPoints.disableFailPoints();
for (const [, client] of this.mapOf('client')) {
await client.close();
for (const [, cursor] of this.mapOf('cursor')) {
await cursor.close();
}
for (const [, stream] of this.mapOf('stream')) {
await stream.close();
}
for (const [, session] of this.mapOf('session')) {
await session.endSession();
await session.endSession({ force: true });
}
for (const [, client] of this.mapOf('client')) {
await client.close();
}
this.clear();
}
Expand All @@ -222,7 +317,8 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
for (const entity of entities ?? []) {
if ('client' in entity) {
const useMultipleMongoses =
config.topologyType === 'Sharded' && entity.client.useMultipleMongoses;
(config.topologyType === 'LoadBalanced' || config.topologyType === 'Sharded') &&
entity.client.useMultipleMongoses;
const uri = config.url({ useMultipleMongoses });
const client = new UnifiedMongoClient(uri, entity.client);
await client.connect();
Expand Down
79 changes: 65 additions & 14 deletions test/functional/unified-spec-runner/match.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
import { expect } from 'chai';
import { inspect } from 'util';
import { Binary, Document, Long, ObjectId, MongoError } from '../../../src';
import {
CommandFailedEvent,
CommandStartedEvent,
CommandSucceededEvent
} from '../../../src/cmap/command_monitoring_events';
import { CommandEvent, EntitiesMap } from './entities';
import { ExpectedError, ExpectedEvent } from './schema';
import {
ConnectionPoolCreatedEvent,
ConnectionPoolClosedEvent,
ConnectionCreatedEvent,
ConnectionReadyEvent,
ConnectionClosedEvent,
ConnectionCheckOutStartedEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckedOutEvent,
ConnectionCheckedInEvent,
ConnectionPoolClearedEvent
} from '../../../src/cmap/connection_pool_events';
import { CommandEvent, CmapEvent, EntitiesMap } from './entities';
import { ExpectedCmapEvent, ExpectedCommandEvent, ExpectedError } from './schema';

export interface ExistsOperator {
$$exists: boolean;
Expand Down Expand Up @@ -235,32 +248,70 @@ export function specialCheck(
}
}

// CMAP events where the payload does not matter.
const EMPTY_CMAP_EVENTS = {
poolCreatedEvent: ConnectionPoolCreatedEvent,
poolClosedEvent: ConnectionPoolClosedEvent,
connectionCreatedEvent: ConnectionCreatedEvent,
connectionReadyEvent: ConnectionReadyEvent,
connectionCheckOutStartedEvent: ConnectionCheckOutStartedEvent,
connectionCheckOutFailedEvent: ConnectionCheckOutFailedEvent,
connectionCheckedOutEvent: ConnectionCheckedOutEvent,
connectionCheckedInEvent: ConnectionCheckedInEvent
};

function validEmptyCmapEvent(
expected: ExpectedCommandEvent | ExpectedCmapEvent,
actual: CommandEvent | CmapEvent
) {
return Object.values(EMPTY_CMAP_EVENTS).some(value => {
return actual instanceof value;
});
}

export function matchesEvents(
expected: ExpectedEvent[],
actual: CommandEvent[],
expected: (ExpectedCommandEvent & ExpectedCmapEvent)[],
actual: (CommandEvent & CmapEvent)[],
entities: EntitiesMap
): void {
// TODO: NodeJS Driver has extra events
// expect(actual).to.have.lengthOf(expected.length);
if (actual.length !== expected.length) {
const actualNames = actual.map(a => a.constructor.name);
const expectedNames = expected.map(e => Object.keys(e)[0]);
expect.fail(
`Expected event count mismatch, expected ${inspect(expectedNames)} but got ${inspect(
actualNames
)}`
);
}

for (const [index, actualEvent] of actual.entries()) {
const expectedEvent = expected[index];

if (expectedEvent.commandStartedEvent && actualEvent instanceof CommandStartedEvent) {
if (expectedEvent.commandStartedEvent) {
expect(actualEvent).to.be.instanceOf(CommandStartedEvent);
resultCheck(actualEvent, expectedEvent.commandStartedEvent, entities, [
`events[${index}].commandStartedEvent`
]);
} else if (
expectedEvent.commandSucceededEvent &&
actualEvent instanceof CommandSucceededEvent
) {
} else if (expectedEvent.commandSucceededEvent) {
expect(actualEvent).to.be.instanceOf(CommandSucceededEvent);
resultCheck(actualEvent, expectedEvent.commandSucceededEvent, entities, [
`events[${index}].commandSucceededEvent`
]);
} else if (expectedEvent.commandFailedEvent && actualEvent instanceof CommandFailedEvent) {
} else if (expectedEvent.commandFailedEvent) {
expect(actualEvent).to.be.instanceOf(CommandFailedEvent);
expect(actualEvent.commandName).to.equal(expectedEvent.commandFailedEvent.commandName);
} else {
expect.fail(`Events must be one of the known types, got ${actualEvent}`);
} else if (expectedEvent.connectionClosedEvent) {
expect(actualEvent).to.be.instanceOf(ConnectionClosedEvent);
if (expectedEvent.connectionClosedEvent.hasServiceId) {
expect(actualEvent).property('serviceId').to.exist;
}
} else if (expectedEvent.poolClearedEvent) {
expect(actualEvent).to.be.instanceOf(ConnectionPoolClearedEvent);
if (expectedEvent.poolClearedEvent.hasServiceId) {
expect(actualEvent).property('serviceId').to.exist;
}
} else if (!validEmptyCmapEvent(expectedEvent, actualEvent)) {
expect.fail(`Events must be one of the known types, got ${inspect(actualEvent)}`);
}
}
}
Expand Down
Loading