Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ private void addEvent(final Object event) {
}
}

public void clearEvents() {
lock.lock();
try {
events.clear();
} finally {
lock.unlock();
}
}

@Override
public void connectionPoolCreated(final ConnectionPoolCreatedEvent event) {
if (eventTypes.contains("poolCreatedEvent")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ boolean match(final LogMessage message){
return false;
}
}

public synchronized void clearMessages() {
messages.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.mongodb.client.vault.ClientEncryption;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.event.ConnectionReadyEvent;
import com.mongodb.event.TestServerMonitorListener;
import com.mongodb.internal.connection.ServerMonitoringModeUtil;
import com.mongodb.internal.connection.TestClusterListener;
Expand All @@ -64,6 +65,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -82,6 +84,7 @@
import static com.mongodb.client.unified.UnifiedCrudHelper.asReadPreference;
import static com.mongodb.client.unified.UnifiedCrudHelper.asWriteConcern;
import static com.mongodb.internal.connection.AbstractConnectionPoolTest.waitForPoolAsyncWorkManagerStart;
import static java.lang.String.format;
import static java.lang.System.getenv;
import static java.util.Arrays.asList;
import static org.junit.Assume.assumeTrue;
Expand All @@ -90,7 +93,8 @@ public final class Entities {
private static final Set<String> SUPPORTED_CLIENT_ENTITY_OPTIONS = new HashSet<>(
asList(
"id", "autoEncryptOpts", "uriOptions", "serverApi", "useMultipleMongoses", "storeEventsAsEntities",
"observeEvents", "observeLogMessages", "observeSensitiveCommands", "ignoreCommandMonitoringEvents"));
"observeEvents", "observeLogMessages", "observeSensitiveCommands", "ignoreCommandMonitoringEvents",
"awaitMinPoolSizeMS"));
private final Set<String> entityNames = new HashSet<>();
private final Map<String, ExecutorService> threads = new HashMap<>();
private final Map<String, ArrayList<Future<?>>> tasks = new HashMap<>();
Expand Down Expand Up @@ -306,6 +310,7 @@ private void initClient(final BsonDocument entity, final String id,
throw new UnsupportedOperationException("Client entity contains unsupported options: " + entity.keySet()
+ ". Supported options are " + SUPPORTED_CLIENT_ENTITY_OPTIONS);
}
boolean waitForMinPoolSizeToPopulate = isWaitForMinPoolSizeToPopulate(entity);
MongoClientSettings.Builder clientSettingsBuilder;
if (entity.getBoolean("useMultipleMongoses", BsonBoolean.FALSE).getValue() && (isSharded() || isLoadBalanced())) {
assumeTrue("Multiple mongos connection string not available for sharded cluster",
Expand All @@ -331,6 +336,9 @@ private void initClient(final BsonDocument entity, final String id,
if (entity.containsKey("observeEvents")) {
List<String> observeEvents = entity.getArray("observeEvents").stream()
.map(type -> type.asString().getValue()).collect(Collectors.toList());
if (waitForMinPoolSizeToPopulate) {
observeEvents.add("connectionReadyEvent");
}
List<String> ignoreCommandMonitoringEvents = entity
.getArray("ignoreCommandMonitoringEvents", new BsonArray()).stream()
.map(type -> type.asString().getValue()).collect(Collectors.toList());
Expand All @@ -341,7 +349,6 @@ private void initClient(final BsonDocument entity, final String id,
null);
clientSettingsBuilder.addCommandListener(testCommandListener);
putEntity(id + "-command-listener", testCommandListener, clientCommandListeners);

TestConnectionPoolListener testConnectionPoolListener = new TestConnectionPoolListener(observeEvents);
clientSettingsBuilder.applyToConnectionPoolSettings(builder ->
builder.addConnectionPoolListener(testConnectionPoolListener));
Expand Down Expand Up @@ -580,11 +587,35 @@ private void initClient(final BsonDocument entity, final String id,
}

putEntity(id, mongoClientSupplier.apply(clientSettings), clients);
if (waitForMinPoolSizeToPopulate) {
waitForMinPoolSizeToPopulate(entity, id, clientSettings);
}
if (waitForPoolAsyncWorkManagerStart) {
waitForPoolAsyncWorkManagerStart();
}
}

private void waitForMinPoolSizeToPopulate(final BsonDocument entity, final String id, final MongoClientSettings clientSettings) {
int minSize = clientSettings.getConnectionPoolSettings().getMinSize();
int awaitMinPoolSizeMS = entity.getInt32("awaitMinPoolSizeMS").getValue();
TestConnectionPoolListener testConnectionPoolListener = getConnectionPoolListener(id);
try {
testConnectionPoolListener.waitForEvent(ConnectionReadyEvent.class, minSize, awaitMinPoolSizeMS, TimeUnit.MILLISECONDS);
// testConnectionPoolListener.clearEvents();
// getClientLoggingInterceptor(id).clearMessages();
} catch (TimeoutException | InterruptedException e) {
throw new RuntimeException(format("Error waiting for awaitMinPoolSizeMS [%s] to establish minPoolSize [%s] connections",
awaitMinPoolSizeMS, minSize));
}
}

private static boolean isWaitForMinPoolSizeToPopulate(final BsonDocument clientEntity) {
int minPoolSize = clientEntity.getDocument("uriOptions", new BsonDocument())
.get("minPoolSize", new BsonInt32(0))
.asInt32().getValue();
return minPoolSize != 0 && clientEntity.containsKey("awaitMinPoolSizeMS");
}

private static LogMessage.Component toComponent(final Map.Entry<String, BsonValue> entry) {
String componentName = entry.getKey();
return LogMessage.Component.of(componentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public abstract class UnifiedTest {
private static final Set<String> PRESTART_POOL_ASYNC_WORK_MANAGER_FILE_DESCRIPTIONS = Collections.singleton(
"wait queue timeout errors include details about checked out connections");

private static final String MAX_SUPPORTED_SCHEMA_VERSION = "1.25";
private static final String MAX_SUPPORTED_SCHEMA_VERSION = "1.26";
private static final List<Integer> MAX_SUPPORTED_SCHEMA_VERSION_COMPONENTS = Arrays.stream(MAX_SUPPORTED_SCHEMA_VERSION.split("\\."))
.map(Integer::parseInt)
.collect(Collectors.toList());
Expand Down