From e44e5c77abbcb1b1e86efd4e8dd009f32405b575 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Fri, 3 May 2024 08:13:13 -0600 Subject: [PATCH 1/3] Add throwTranslatedWriteException, refactoring, async helper --- .../mongodb/internal/async/AsyncRunnable.java | 36 +++++++++++ .../connection/InternalStreamConnection.java | 59 +++++++------------ .../internal/async/AsyncFunctionsTest.java | 54 ++++++++++++++++- 3 files changed, 110 insertions(+), 39 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index fcf8d61387d..33e1af001bb 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -178,6 +178,42 @@ default AsyncRunnable thenRun(final AsyncRunnable runnable) { }; } + /** + * The error check checks if the exception is an instance of the provided class. + * @see #thenRunTryCatchAsyncBlocks(AsyncRunnable, java.util.function.Predicate, AsyncFunction) + */ + default AsyncRunnable thenRunTryCatchAsyncBlocks( + final AsyncRunnable runnable, + final Class exceptionClass, + final AsyncFunction errorFunction) { + return thenRunTryCatchAsyncBlocks(runnable, e -> exceptionClass.isInstance(e), errorFunction); + } + + /** + * Convenience method corresponding to a try-catch block in sync code. + * This MUST be used to properly handle cases where there is code above + * the block, whose errors must not be caught by an ensuing + * {@link #onErrorIf(java.util.function.Predicate, AsyncFunction)}. + * + * @param runnable corresponds to the contents of the try block + * @param errorCheck for matching on an error (or, a more complex condition) + * @param errorFunction corresponds to the contents of the catch block + * @return the composition of this runnable, a runnable that runs the + * provided runnable, followed by (composed with) the error function, which + * is conditional on there being an exception meeting the error check. + */ + default AsyncRunnable thenRunTryCatchAsyncBlocks( + final AsyncRunnable runnable, + final Predicate errorCheck, + final AsyncFunction errorFunction) { + return this.thenRun(c -> { + beginAsync() + .thenRun(runnable) + .onErrorIf(errorCheck, errorFunction) + .finish(c); + }); + } + /** * @param condition the condition to check * @param runnable The async runnable to run after this runnable, diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 218835f083e..31a3cf3f45d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -73,7 +73,6 @@ import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; -import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate; import static com.mongodb.internal.connection.CommandHelper.HELLO; import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO; @@ -633,19 +632,34 @@ private T getCommandResult(final Decoder decoder, final ResponseBuffers r @Override public void sendMessage(final List byteBuffers, final int lastRequestId) { notNull("stream is open", stream); - if (isClosed()) { throw new MongoSocketClosedException("Cannot write to a closed stream", getServerAddress()); } - try { stream.write(byteBuffers); } catch (Exception e) { close(); - throw translateWriteException(e); + throwTranslatedWriteException(e); } } + @Override + public void sendMessageAsync(final List byteBuffers, final int lastRequestId, + final SingleResultCallback callback) { + beginAsync().thenRun((c) -> { + notNull("stream is open", stream); + if (isClosed()) { + throw new MongoSocketClosedException("Cannot write to a closed stream", getServerAddress()); + } + c.complete(c); + }).thenRunTryCatchAsyncBlocks(c -> { + stream.writeAsync(byteBuffers, c.asHandler()); + }, Exception.class, (e, c) -> { + close(); + throwTranslatedWriteException(e); + }).finish(callback); + } + @Override public ResponseBuffers receiveMessage(final int responseTo) { assertNotNull(stream); @@ -665,39 +679,6 @@ private ResponseBuffers receiveMessageWithAdditionalTimeout(final int additional } } - @Override - public void sendMessageAsync(final List byteBuffers, final int lastRequestId, - final SingleResultCallback callback) { - assertNotNull(stream); - - if (isClosed()) { - callback.onResult(null, new MongoSocketClosedException("Can not read from a closed socket", getServerAddress())); - return; - } - - writeAsync(byteBuffers, errorHandlingCallback(callback, LOGGER)); - } - - private void writeAsync(final List byteBuffers, final SingleResultCallback callback) { - try { - stream.writeAsync(byteBuffers, new AsyncCompletionHandler() { - @Override - public void completed(@Nullable final Void v) { - callback.onResult(null, null); - } - - @Override - public void failed(final Throwable t) { - close(); - callback.onResult(null, translateWriteException(t)); - } - }); - } catch (Throwable t) { - close(); - callback.onResult(null, t); - } - } - @Override public void receiveMessageAsync(final int responseTo, final SingleResultCallback callback) { assertNotNull(stream); @@ -762,6 +743,10 @@ private void updateSessionContext(final SessionContext sessionContext, final Res } } + private void throwTranslatedWriteException(final Throwable e) { + throw translateWriteException(e); + } + private MongoException translateWriteException(final Throwable e) { if (e instanceof MongoException) { return (MongoException) e; diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java index b783b3de93b..bcc0fbe26c7 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java @@ -393,8 +393,8 @@ void testTryCatch() { // chain of 2 in try. // WARNING: "onErrorIf" will consider everything in // the preceding chain to be part of the try. - // Use nested async chains to define the beginning - // of the "try". + // Use nested async chains, or convenience methods, + // to define the beginning of the try. assertBehavesSameVariations(5, () -> { try { @@ -491,6 +491,56 @@ void testTryCatch() { }); } + @Test + void testTryCatchHelper() { + assertBehavesSameVariations(4, + () -> { + plain(0); + try { + sync(1); + } catch (Throwable t) { + plain(2); + throw t; + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + plain(0); + c.complete(c); + }).thenRunTryCatchAsyncBlocks(c -> { + async(1, c); + }, Throwable.class, (t, c) -> { + plain(2); + c.completeExceptionally(t); + }).finish(callback); + }); + + assertBehavesSameVariations(5, + () -> { + plain(0); + try { + sync(1); + } catch (Throwable t) { + plain(2); + throw t; + } + sync(4); + }, + (callback) -> { + beginAsync().thenRun(c -> { + plain(0); + c.complete(c); + }).thenRunTryCatchAsyncBlocks(c -> { + async(1, c); + }, Throwable.class, (t, c) -> { + plain(2); + c.completeExceptionally(t); + }).thenRun(c -> { + async(4, c); + }).finish(callback); + }); + } + @Test void testTryCatchWithVariables() { // using supply etc. From 0f9299ed60ad6f345bc275aa17975ccc0310206b Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Fri, 3 May 2024 08:29:14 -0600 Subject: [PATCH 2/3] Move non-@Test to super, increase visibility (no other changes) --- .../internal/async/AsyncFunctionsTest.java | 286 +---------------- .../async/AsyncFunctionsTestAbstract.java | 292 ++++++++++++++++++ 2 files changed, 293 insertions(+), 285 deletions(-) create mode 100644 driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java index bcc0fbe26c7..24ad1c8f8e1 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java @@ -15,30 +15,17 @@ */ package com.mongodb.internal.async; -import com.mongodb.client.TestListener; import org.junit.jupiter.api.Test; -import org.opentest4j.AssertionFailedError; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; -final class AsyncFunctionsTest { - private final TestListener listener = new TestListener(); - private final InvocationTracker invocationTracker = new InvocationTracker(); - private boolean isTestingAbruptCompletion = false; +final class AsyncFunctionsTest extends AsyncFunctionsTestAbstract { @Test void test1Method() { @@ -877,275 +864,4 @@ void testDerivation() { }); } - // invoked methods: - - private void plain(final int i) { - int cur = invocationTracker.getNextOption(2); - if (cur == 0) { - listener.add("plain-exception-" + i); - throw new RuntimeException("affected method exception-" + i); - } else { - listener.add("plain-success-" + i); - } - } - - private int plainReturns(final int i) { - int cur = invocationTracker.getNextOption(2); - if (cur == 0) { - listener.add("plain-exception-" + i); - throw new RuntimeException("affected method exception-" + i); - } else { - listener.add("plain-success-" + i); - return i; - } - } - - private boolean plainTest(final int i) { - int cur = invocationTracker.getNextOption(3); - if (cur == 0) { - listener.add("plain-exception-" + i); - throw new RuntimeException("affected method exception-" + i); - } else if (cur == 1) { - listener.add("plain-false-" + i); - return false; - } else { - listener.add("plain-true-" + i); - return true; - } - } - - private void sync(final int i) { - assertFalse(invocationTracker.isAsyncStep); - affected(i); - } - - - private Integer syncReturns(final int i) { - assertFalse(invocationTracker.isAsyncStep); - return affectedReturns(i); - } - - private void async(final int i, final SingleResultCallback callback) { - assertTrue(invocationTracker.isAsyncStep); - if (isTestingAbruptCompletion) { - affected(i); - callback.complete(callback); - - } else { - try { - affected(i); - callback.complete(callback); - } catch (Throwable t) { - callback.onResult(null, t); - } - } - } - - private void asyncReturns(final int i, final SingleResultCallback callback) { - assertTrue(invocationTracker.isAsyncStep); - if (isTestingAbruptCompletion) { - callback.complete(affectedReturns(i)); - } else { - try { - callback.complete(affectedReturns(i)); - } catch (Throwable t) { - callback.onResult(null, t); - } - } - } - - private void affected(final int i) { - int cur = invocationTracker.getNextOption(2); - if (cur == 0) { - listener.add("affected-exception-" + i); - throw new RuntimeException("exception-" + i); - } else { - listener.add("affected-success-" + i); - } - } - - private int affectedReturns(final int i) { - int cur = invocationTracker.getNextOption(2); - if (cur == 0) { - listener.add("affected-exception-" + i); - throw new RuntimeException("exception-" + i); - } else { - listener.add("affected-success-" + i); - return i; - } - } - - // assert methods: - - private void assertBehavesSameVariations(final int expectedVariations, final Runnable sync, - final Consumer> async) { - assertBehavesSameVariations(expectedVariations, - () -> { - sync.run(); - return null; - }, - (c) -> { - async.accept((v, e) -> c.onResult(v, e)); - }); - } - - private void assertBehavesSameVariations(final int expectedVariations, final Supplier sync, - final Consumer> async) { - // run the variation-trying code twice, with direct/indirect exceptions - for (int i = 0; i < 2; i++) { - isTestingAbruptCompletion = i != 0; - - // the variation-trying code: - invocationTracker.reset(); - do { - invocationTracker.startInitialStep(); - assertBehavesSame( - sync, - () -> invocationTracker.startMatchStep(), - async); - } while (invocationTracker.countDown()); - assertEquals(expectedVariations, invocationTracker.getVariationCount(), - "number of variations did not match"); - } - - } - - private void assertBehavesSame(final Supplier sync, final Runnable between, - final Consumer> async) { - - T expectedValue = null; - Throwable expectedException = null; - try { - expectedValue = sync.get(); - } catch (Throwable e) { - expectedException = e; - } - List expectedEvents = listener.getEventStrings(); - - listener.clear(); - between.run(); - - AtomicReference actualValue = new AtomicReference<>(); - AtomicReference actualException = new AtomicReference<>(); - AtomicBoolean wasCalled = new AtomicBoolean(false); - try { - async.accept((v, e) -> { - actualValue.set(v); - actualException.set(e); - if (wasCalled.get()) { - fail(); - } - wasCalled.set(true); - }); - } catch (Throwable e) { - fail("async threw instead of using callback"); - } - - // The following code can be used to debug variations: -// System.out.println("===VARIATION START"); -// System.out.println("sync: " + expectedEvents); -// System.out.println("callback called?: " + wasCalled.get()); -// System.out.println("value -- sync: " + expectedValue + " -- async: " + actualValue.get()); -// System.out.println("excep -- sync: " + expectedException + " -- async: " + actualException.get()); -// System.out.println("exception mode: " + (isTestingAbruptCompletion -// ? "exceptions thrown directly (abrupt completion)" : "exceptions into callbacks")); -// System.out.println("===VARIATION END"); - - // show assertion failures arising in async tests - if (actualException.get() != null && actualException.get() instanceof AssertionFailedError) { - throw (AssertionFailedError) actualException.get(); - } - - assertTrue(wasCalled.get(), "callback should have been called"); - assertEquals(expectedEvents, listener.getEventStrings(), "steps should have matched"); - assertEquals(expectedValue, actualValue.get()); - assertEquals(expectedException == null, actualException.get() == null, - "both or neither should have produced an exception"); - if (expectedException != null) { - assertEquals(expectedException.getMessage(), actualException.get().getMessage()); - assertEquals(expectedException.getClass(), actualException.get().getClass()); - } - - listener.clear(); - } - - /** - * Tracks invocations: allows testing of all variations of a method calls - */ - private static class InvocationTracker { - public static final int DEPTH_LIMIT = 50; - private final List invocationOptionSequence = new ArrayList<>(); - private boolean isAsyncStep; // async = matching, vs initial step = populating - private int currentInvocationIndex; - private int variationCount; - - public void reset() { - variationCount = 0; - } - - public void startInitialStep() { - variationCount++; - isAsyncStep = false; - currentInvocationIndex = -1; - } - - public int getNextOption(final int myOptionsSize) { - /* - This method creates (or gets) the next invocation's option. Each - invoker of this method has the "option" to behave in various ways, - usually just success (option 1) and exceptional failure (option 0), - though some callers might have more options. A sequence of method - outcomes (options) is one "variation". Tests automatically test - all possible variations (up to a limit, to prevent infinite loops). - - Methods generally have labels, to ensure that corresponding - sync/async methods are called in the right order, but these labels - are unrelated to the "variation" logic here. There are two "modes" - (whether completion is abrupt, or not), which are also unrelated. - */ - - currentInvocationIndex++; // which invocation result we are dealing with - - if (currentInvocationIndex >= invocationOptionSequence.size()) { - if (isAsyncStep) { - fail("result should have been pre-initialized: steps may not match"); - } - if (isWithinDepthLimit()) { - invocationOptionSequence.add(myOptionsSize - 1); - } else { - invocationOptionSequence.add(0); // choose "0" option, should always be an exception - } - } - return invocationOptionSequence.get(currentInvocationIndex); - } - - public void startMatchStep() { - isAsyncStep = true; - currentInvocationIndex = -1; - } - - private boolean countDown() { - while (!invocationOptionSequence.isEmpty()) { - int lastItemIndex = invocationOptionSequence.size() - 1; - int lastItem = invocationOptionSequence.get(lastItemIndex); - if (lastItem > 0) { - // count current digit down by 1, until 0 - invocationOptionSequence.set(lastItemIndex, lastItem - 1); - return true; - } else { - // current digit completed, remove (move left) - invocationOptionSequence.remove(lastItemIndex); - } - } - return false; - } - - public int getVariationCount() { - return variationCount; - } - - public boolean isWithinDepthLimit() { - return invocationOptionSequence.size() < DEPTH_LIMIT; - } - } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java new file mode 100644 index 00000000000..93b8cdbeeb5 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java @@ -0,0 +1,292 @@ +package com.mongodb.internal.async; + +import com.mongodb.client.TestListener; +import org.opentest4j.AssertionFailedError; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class AsyncFunctionsTestAbstract { + + final TestListener listener = new TestListener(); + final InvocationTracker invocationTracker = new InvocationTracker(); + boolean isTestingAbruptCompletion = false; + + void plain(final int i) { + int cur = invocationTracker.getNextOption(2); + if (cur == 0) { + listener.add("plain-exception-" + i); + throw new RuntimeException("affected method exception-" + i); + } else { + listener.add("plain-success-" + i); + } + } + + int plainReturns(final int i) { + int cur = invocationTracker.getNextOption(2); + if (cur == 0) { + listener.add("plain-exception-" + i); + throw new RuntimeException("affected method exception-" + i); + } else { + listener.add("plain-success-" + i); + return i; + } + } + + boolean plainTest(final int i) { + int cur = invocationTracker.getNextOption(3); + if (cur == 0) { + listener.add("plain-exception-" + i); + throw new RuntimeException("affected method exception-" + i); + } else if (cur == 1) { + listener.add("plain-false-" + i); + return false; + } else { + listener.add("plain-true-" + i); + return true; + } + } + + void sync(final int i) { + assertFalse(invocationTracker.isAsyncStep); + affected(i); + } + + Integer syncReturns(final int i) { + assertFalse(invocationTracker.isAsyncStep); + return affectedReturns(i); + } + + void async(final int i, final SingleResultCallback callback) { + assertTrue(invocationTracker.isAsyncStep); + if (isTestingAbruptCompletion) { + affected(i); + callback.complete(callback); + + } else { + try { + affected(i); + callback.complete(callback); + } catch (Throwable t) { + callback.onResult(null, t); + } + } + } + + void asyncReturns(final int i, final SingleResultCallback callback) { + assertTrue(invocationTracker.isAsyncStep); + if (isTestingAbruptCompletion) { + callback.complete(affectedReturns(i)); + } else { + try { + callback.complete(affectedReturns(i)); + } catch (Throwable t) { + callback.onResult(null, t); + } + } + } + + private void affected(final int i) { + int cur = invocationTracker.getNextOption(2); + if (cur == 0) { + listener.add("affected-exception-" + i); + throw new RuntimeException("exception-" + i); + } else { + listener.add("affected-success-" + i); + } + } + + private int affectedReturns(final int i) { + int cur = invocationTracker.getNextOption(2); + if (cur == 0) { + listener.add("affected-exception-" + i); + throw new RuntimeException("exception-" + i); + } else { + listener.add("affected-success-" + i); + return i; + } + } + + // assert methods: + + void assertBehavesSameVariations(final int expectedVariations, final Runnable sync, + final Consumer> async) { + assertBehavesSameVariations(expectedVariations, + () -> { + sync.run(); + return null; + }, + (c) -> { + async.accept((v, e) -> c.onResult(v, e)); + }); + } + + void assertBehavesSameVariations(final int expectedVariations, final Supplier sync, + final Consumer> async) { + // run the variation-trying code twice, with direct/indirect exceptions + for (int i = 0; i < 2; i++) { + isTestingAbruptCompletion = i != 0; + + // the variation-trying code: + invocationTracker.reset(); + do { + invocationTracker.startInitialStep(); + assertBehavesSame( + sync, + () -> invocationTracker.startMatchStep(), + async); + } while (invocationTracker.countDown()); + assertEquals(expectedVariations, invocationTracker.getVariationCount(), + "number of variations did not match"); + } + + } + + private void assertBehavesSame(final Supplier sync, final Runnable between, + final Consumer> async) { + + T expectedValue = null; + Throwable expectedException = null; + try { + expectedValue = sync.get(); + } catch (Throwable e) { + expectedException = e; + } + List expectedEvents = listener.getEventStrings(); + + listener.clear(); + between.run(); + + AtomicReference actualValue = new AtomicReference<>(); + AtomicReference actualException = new AtomicReference<>(); + AtomicBoolean wasCalled = new AtomicBoolean(false); + try { + async.accept((v, e) -> { + actualValue.set(v); + actualException.set(e); + if (wasCalled.get()) { + fail(); + } + wasCalled.set(true); + }); + } catch (Throwable e) { + fail("async threw instead of using callback"); + } + + // The following code can be used to debug variations: +// System.out.println("===VARIATION START"); +// System.out.println("sync: " + expectedEvents); +// System.out.println("callback called?: " + wasCalled.get()); +// System.out.println("value -- sync: " + expectedValue + " -- async: " + actualValue.get()); +// System.out.println("excep -- sync: " + expectedException + " -- async: " + actualException.get()); +// System.out.println("exception mode: " + (isTestingAbruptCompletion +// ? "exceptions thrown directly (abrupt completion)" : "exceptions into callbacks")); +// System.out.println("===VARIATION END"); + + // show assertion failures arising in async tests + if (actualException.get() != null && actualException.get() instanceof AssertionFailedError) { + throw (AssertionFailedError) actualException.get(); + } + + assertTrue(wasCalled.get(), "callback should have been called"); + assertEquals(expectedEvents, listener.getEventStrings(), "steps should have matched"); + assertEquals(expectedValue, actualValue.get()); + assertEquals(expectedException == null, actualException.get() == null, + "both or neither should have produced an exception"); + if (expectedException != null) { + assertEquals(expectedException.getMessage(), actualException.get().getMessage()); + assertEquals(expectedException.getClass(), actualException.get().getClass()); + } + + listener.clear(); + } + + /** + * Tracks invocations: allows testing of all variations of a method calls + */ + static class InvocationTracker { + public static final int DEPTH_LIMIT = 50; + private final List invocationOptionSequence = new ArrayList<>(); + boolean isAsyncStep; // async = matching, vs initial step = populating + private int currentInvocationIndex; + private int variationCount; + + public void reset() { + variationCount = 0; + } + + public void startInitialStep() { + variationCount++; + isAsyncStep = false; + currentInvocationIndex = -1; + } + + public int getNextOption(final int myOptionsSize) { + /* + This method creates (or gets) the next invocation's option. Each + invoker of this method has the "option" to behave in various ways, + usually just success (option 1) and exceptional failure (option 0), + though some callers might have more options. A sequence of method + outcomes (options) is one "variation". Tests automatically test + all possible variations (up to a limit, to prevent infinite loops). + + Methods generally have labels, to ensure that corresponding + sync/async methods are called in the right order, but these labels + are unrelated to the "variation" logic here. There are two "modes" + (whether completion is abrupt, or not), which are also unrelated. + */ + + currentInvocationIndex++; // which invocation result we are dealing with + + if (currentInvocationIndex >= invocationOptionSequence.size()) { + if (isAsyncStep) { + fail("result should have been pre-initialized: steps may not match"); + } + if (isWithinDepthLimit()) { + invocationOptionSequence.add(myOptionsSize - 1); + } else { + invocationOptionSequence.add(0); // choose "0" option, should always be an exception + } + } + return invocationOptionSequence.get(currentInvocationIndex); + } + + public void startMatchStep() { + isAsyncStep = true; + currentInvocationIndex = -1; + } + + private boolean countDown() { + while (!invocationOptionSequence.isEmpty()) { + int lastItemIndex = invocationOptionSequence.size() - 1; + int lastItem = invocationOptionSequence.get(lastItemIndex); + if (lastItem > 0) { + // count current digit down by 1, until 0 + invocationOptionSequence.set(lastItemIndex, lastItem - 1); + return true; + } else { + // current digit completed, remove (move left) + invocationOptionSequence.remove(lastItemIndex); + } + } + return false; + } + + public int getVariationCount() { + return variationCount; + } + + public boolean isWithinDepthLimit() { + return invocationOptionSequence.size() < DEPTH_LIMIT; + } + } +} From 1b8d37b0676f56b049bdccf6c5ec871a3d449b41 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Tue, 21 May 2024 16:15:44 -0600 Subject: [PATCH 3/3] PR fixes --- .../internal/connection/CommandHelper.java | 3 +- .../connection/InternalStreamConnection.java | 3 +- .../internal/async/AsyncFunctionsTest.java | 8 ++-- .../async/AsyncFunctionsTestAbstract.java | 48 +++++++++++++++---- 4 files changed, 48 insertions(+), 14 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandHelper.java b/driver-core/src/main/com/mongodb/internal/connection/CommandHelper.java index ccf80716a23..dc0df6ac27e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/CommandHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/CommandHelper.java @@ -61,7 +61,8 @@ static BsonDocument executeCommandWithoutCheckingForFailure(final String databas static void executeCommandAsync(final String database, final BsonDocument command, final ClusterConnectionMode clusterConnectionMode, @Nullable final ServerApi serverApi, final InternalConnection internalConnection, final SingleResultCallback callback) { - internalConnection.sendAndReceiveAsync(getCommandMessage(database, command, internalConnection, clusterConnectionMode, serverApi), + internalConnection.sendAndReceiveAsync( + getCommandMessage(database, command, internalConnection, clusterConnectionMode, serverApi), new BsonDocumentCodec(), NoOpSessionContext.INSTANCE, IgnorableRequestContext.INSTANCE, new OperationContext(), (result, t) -> { if (t != null) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 31a3cf3f45d..fc90ce81bef 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -73,6 +73,7 @@ import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; +import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate; import static com.mongodb.internal.connection.CommandHelper.HELLO; import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO; @@ -657,7 +658,7 @@ public void sendMessageAsync(final List byteBuffers, final int lastRequ }, Exception.class, (e, c) -> { close(); throwTranslatedWriteException(e); - }).finish(callback); + }).finish(errorHandlingCallback(callback, LOGGER)); } @Override diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java index 24ad1c8f8e1..deb8e4a2e4a 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java @@ -759,8 +759,8 @@ void testVariables() { @Test void testInvalid() { - isTestingAbruptCompletion = false; - invocationTracker.isAsyncStep = true; + setIsTestingAbruptCompletion(false); + setAsyncStep(true); assertThrows(IllegalStateException.class, () -> { beginAsync().thenRun(c -> { async(3, c); @@ -783,8 +783,8 @@ void testDerivation() { // Stand-ins for sync-async methods; these "happily" do not throw // exceptions, to avoid complicating this demo async code. Consumer happySync = (i) -> { - invocationTracker.getNextOption(1); - listener.add("affected-success-" + i); + getNextOption(1); + listenerAdd("affected-success-" + i); }; BiConsumer> happyAsync = (i, c) -> { happySync.accept(i); diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java index 93b8cdbeeb5..7cc8b456f1c 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java @@ -1,3 +1,19 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.mongodb.internal.async; import com.mongodb.client.TestListener; @@ -17,9 +33,25 @@ public class AsyncFunctionsTestAbstract { - final TestListener listener = new TestListener(); - final InvocationTracker invocationTracker = new InvocationTracker(); - boolean isTestingAbruptCompletion = false; + private final TestListener listener = new TestListener(); + private final InvocationTracker invocationTracker = new InvocationTracker(); + private boolean isTestingAbruptCompletion = false; + + void setIsTestingAbruptCompletion(final boolean b) { + isTestingAbruptCompletion = b; + } + + public void setAsyncStep(final boolean isAsyncStep) { + invocationTracker.isAsyncStep = isAsyncStep; + } + + public void getNextOption(final int i) { + invocationTracker.getNextOption(i); + } + + public void listenerAdd(final String s) { + listener.add(s); + } void plain(final int i) { int cur = invocationTracker.getNextOption(2); @@ -34,10 +66,10 @@ void plain(final int i) { int plainReturns(final int i) { int cur = invocationTracker.getNextOption(2); if (cur == 0) { - listener.add("plain-exception-" + i); + listener.add("plain-returns-exception-" + i); throw new RuntimeException("affected method exception-" + i); } else { - listener.add("plain-success-" + i); + listener.add("plain-returns-success-" + i); return i; } } @@ -108,10 +140,10 @@ private void affected(final int i) { private int affectedReturns(final int i) { int cur = invocationTracker.getNextOption(2); if (cur == 0) { - listener.add("affected-exception-" + i); + listener.add("affected-returns-exception-" + i); throw new RuntimeException("exception-" + i); } else { - listener.add("affected-success-" + i); + listener.add("affected-returns-success-" + i); return i; } } @@ -216,7 +248,7 @@ private void assertBehavesSame(final Supplier sync, final Runnable betwee static class InvocationTracker { public static final int DEPTH_LIMIT = 50; private final List invocationOptionSequence = new ArrayList<>(); - boolean isAsyncStep; // async = matching, vs initial step = populating + private boolean isAsyncStep; // async = matching, vs initial step = populating private int currentInvocationIndex; private int variationCount;