diff --git a/README.md b/README.md index 77d3fb6..c5687af 100644 --- a/README.md +++ b/README.md @@ -10,14 +10,14 @@ It can serve as integral part of your application's data layer to provide a consistent API over various back-ends and reduce message communication overhead through batching and caching. An important use case for `java-dataloader` is improving the efficiency of GraphQL query execution. Graphql fields -are resolved in a independent manner and with a true graph of objects, you may be fetching the same object many times. +are resolved independently and, with a true graph of objects, you may be fetching the same object many times. A naive implementation of graphql data fetchers can easily lead to the dreaded "n+1" fetch problem. Most of the code is ported directly from Facebook's reference implementation, with one IMPORTANT adaptation to make it work for Java 8. ([more on this below](#manual-dispatching)). -But before reading on, be sure to take a short dive into the +Before reading on, be sure to take a short dive into the [original documentation](https://github.com/facebook/dataloader/blob/master/README.md) provided by Lee Byron (@leebyron) and Nicholas Schrock (@schrockn) from [Facebook](https://www.facebook.com/), the creators of the original data loader. @@ -51,7 +51,8 @@ and Nicholas Schrock (@schrockn) from [Facebook](https://www.facebook.com/), the - Results are ordered according to insertion order of load requests - Deals with partial errors when a batch future fails - Can disable batching and/or caching in configuration -- Can supply your own [`CacheMap`](https://github.com/graphql-java/java-dataloader/blob/master/src/main/java/io/engagingspaces/vertx/dataloader/CacheMap.java) implementations +- Can supply your own `CacheMap` implementations +- Can supply your own `ValueCache` implementations - Has very high test coverage ## Examples @@ -110,7 +111,7 @@ In this version of data loader, this does not happen automatically. More on thi As stated on the original Facebook project : ->A naive application may have issued four round-trips to a backend for the required information, +> A naive application may have issued four round-trips to a backend for the required information, but with DataLoader this application will make at most two. > DataLoader allows you to decouple unrelated parts of your application without sacrificing the @@ -270,9 +271,9 @@ This is not quite as loose in a Java implementation as Java is a type safe langu A batch loader function is defined as `BatchLoader` meaning for a key of type `K` it returns a value of type `V`. -It cant just return some `Exception` as an object of type `V`. Type safety matters. +It can't just return some `Exception` as an object of type `V`. Type safety matters. -However you can use the `Try` data type which can encapsulate a computation that succeeded or returned an exception. +However, you can use the `Try` data type which can encapsulate a computation that succeeded or returned an exception. ```java Try tryS = Try.tryCall(() -> { @@ -291,7 +292,7 @@ However you can use the `Try` data type which can encapsulate a computation that } ``` -DataLoader supports this type and you can use this form to create a batch loader that returns a list of `Try` objects, some of which may have succeeded +DataLoader supports this type, and you can use this form to create a batch loader that returns a list of `Try` objects, some of which may have succeeded, and some of which may have failed. From that data loader can infer the right behavior in terms of the `load(x)` promise. ```java @@ -331,7 +332,7 @@ The value cache uses an async API pattern to encapsulate the idea that the value The default future cache behind `DataLoader` is an in memory `HashMap`. There is no expiry on this, and it lives for as long as the data loader lives. -However, you can create your own custom cache and supply it to the data loader on construction via the `org.dataloader.CacheMap` interface. +However, you can create your own custom future cache and supply it to the data loader on construction via the `org.dataloader.CacheMap` interface. ```java MyCustomCache customCache = new MyCustomCache(); @@ -342,21 +343,27 @@ However, you can create your own custom cache and supply it to the data loader o You could choose to use one of the fancy cache implementations from Guava or Caffeine and wrap it in a `CacheMap` wrapper ready for data loader. They can do fancy things like time eviction and efficient LRU caching. -As stated above, a custom `org.dataloader.CacheMap` is a local cache of futures with values, not values per se. +As stated above, a custom `org.dataloader.CacheMap` is a local cache of `CompleteFuture`s to values, not values per se. + +If you want to externally cache values then you need to use the `org.dataloader.ValueCache` interface. ## Custom value caches -You will need to create your own implementations of the `org.dataloader.ValueCache` if your want to use an external cache. +The `org.dataloader.ValueCache` allows you to use an external cache. + +The API of `ValueCache` has been designed to be asynchronous because it is expected that the value cache could be outside +your JVM. It uses `CompleteableFuture`s to get and set values into cache, which may involve a network call and hence exceptional failures to get +or set values. + +The `ValueCache` API is batch oriented, if you have a backing cache that can do batch cache fetches (such a REDIS) then you can use the `ValueCache.getValues*(` +call directly. However, if you don't have such a backing cache, then the default implementation will break apart the batch of cache value into individual requests +to `ValueCache.getValue()` for you. This library does not ship with any implementations of `ValueCache` because it does not want to have production dependencies on external cache libraries, but you can easily write your own. The tests have an example based on [Caffeine](https://github.com/ben-manes/caffeine). -The API of `ValueCache` has been designed to be asynchronous because it is expected that the value cache could be outside -your JVM. It uses `CompleteableFuture`s to get and set values into cache, which may involve a network call and hence exceptional failures to get -or set values. - ## Disabling caching @@ -369,7 +376,7 @@ In certain uncommon cases, a DataLoader which does not cache may be desirable. Calling the above will ensure that every call to `.load()` will produce a new promise, and requested keys will not be saved in memory. However, when the memoization cache is disabled, your batch function will receive an array of keys which may contain duplicates! Each key will -be associated with each call to `.load()`. Your batch loader should provide a value for each instance of the requested key as per the contract +be associated with each call to `.load()`. Your batch loader MUST provide a value for each instance of the requested key as per the contract ```java userDataLoader.load("A"); @@ -445,7 +452,7 @@ then you will not want to cache data meant for user A to then later give it user The scope of your `DataLoader` instances is important. You will want to create them per web request to ensure data is only cached within that web request and no more. -If your data can be shared across web requests then use a custom cache to keep values in a common place. +If your data can be shared across web requests then use a custom `org.dataloader.ValueCache` to keep values in a common place. Data loaders are stateful components that contain promises (with context) that are likely share the same affinity as the request. diff --git a/build.gradle b/build.gradle index cdbca84..934c51e 100644 --- a/build.gradle +++ b/build.gradle @@ -40,6 +40,19 @@ version = releaseVersion ? releaseVersion : getDevelopmentVersion() group = 'com.graphql-java' description = 'A pure Java 8 port of Facebook Dataloader' +gradle.buildFinished { buildResult -> + println "*******************************" + println "*" + if (buildResult.failure != null) { + println "* FAILURE - ${buildResult.failure}" + } else { + println "* SUCCESS" + } + println "* Version: $version" + println "*" + println "*******************************" +} + repositories { mavenCentral() mavenLocal() diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index 0aea1c7..fe9c59a 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -67,7 +67,7 @@ public class DataLoader { private final DataLoaderHelper helper; private final StatisticsCollector stats; private final CacheMap futureCache; - private final ValueCache valueCache; + private final ValueCache valueCache; /** * Creates new DataLoader with the specified batch loader function and default options @@ -430,8 +430,8 @@ private CacheMap determineFutureCache(DataLoaderOptions loaderOptions } @SuppressWarnings("unchecked") - private ValueCache determineValueCache(DataLoaderOptions loaderOptions) { - return (ValueCache) loaderOptions.valueCache().orElseGet(ValueCache::defaultValueCache); + private ValueCache determineValueCache(DataLoaderOptions loaderOptions) { + return (ValueCache) loaderOptions.valueCache().orElseGet(ValueCache::defaultValueCache); } /** diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 8f8f08f..dbd9383 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -14,13 +14,15 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.stream.Collectors.toList; import static org.dataloader.impl.Assertions.assertState; import static org.dataloader.impl.Assertions.nonNull; @@ -63,7 +65,7 @@ Object getCallContext() { private final Object batchLoadFunction; private final DataLoaderOptions loaderOptions; private final CacheMap futureCache; - private final ValueCache valueCache; + private final ValueCache valueCache; private final List>> loaderQueue; private final StatisticsCollector stats; private final Clock clock; @@ -73,7 +75,7 @@ Object getCallContext() { Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap futureCache, - ValueCache valueCache, + ValueCache valueCache, StatisticsCollector stats, Clock clock) { this.dataLoader = dataLoader; @@ -134,7 +136,7 @@ CompletableFuture load(K key, Object loadContext) { if (cachingEnabled) { return loadFromCache(key, loadContext, batchingEnabled); } else { - return queueOrInvokeLoader(key, loadContext, batchingEnabled); + return queueOrInvokeLoader(key, loadContext, batchingEnabled, false); } } } @@ -168,7 +170,7 @@ DispatchResult dispatch() { lastDispatchTime.set(now()); } if (!batchingEnabled || keys.isEmpty()) { - return new DispatchResult<>(CompletableFuture.completedFuture(emptyList()), 0); + return new DispatchResult<>(completedFuture(emptyList()), 0); } final int totalEntriesHandled = keys.size(); // @@ -211,19 +213,18 @@ private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List< } // // now reassemble all the futures into one that is the complete set of results - return CompletableFuture.allOf(allBatches.toArray(new CompletableFuture[0])) + return allOf(allBatches.toArray(new CompletableFuture[0])) .thenApply(v -> allBatches.stream() .map(CompletableFuture::join) .flatMap(Collection::stream) - .collect(Collectors.toList())); + .collect(toList())); } @SuppressWarnings("unchecked") private CompletableFuture> dispatchQueueBatch(List keys, List callContexts, List> queuedFutures) { stats.incrementBatchLoadCountBy(keys.size()); - CompletionStage> batchLoad = invokeLoader(keys, callContexts); + CompletableFuture> batchLoad = invokeLoader(keys, callContexts, loaderOptions.cachingEnabled()); return batchLoad - .toCompletableFuture() .thenApply(values -> { assertResultSize(keys, values); @@ -254,6 +255,9 @@ private CompletableFuture> dispatchQueueBatch(List keys, List return values; }).exceptionally(ex -> { stats.incrementBatchLoadExceptionCount(); + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } for (int idx = 0; idx < queuedFutures.size(); idx++) { K key = keys.get(idx); CompletableFuture future = queuedFutures.get(idx); @@ -267,7 +271,7 @@ private CompletableFuture> dispatchQueueBatch(List keys, List private void assertResultSize(List keys, List values) { - assertState(keys.size() == values.size(), "The size of the promised values MUST be the same size as the key list"); + assertState(keys.size() == values.size(), () -> "The size of the promised values MUST be the same size as the key list"); } private void possiblyClearCacheEntriesOnExceptions(List keys) { @@ -292,80 +296,92 @@ private CompletableFuture loadFromCache(K key, Object loadContext, boolean ba return futureCache.get(cacheKey); } - /* - We haven't been asked for this key yet. We want to do one of two things: - - 1. Check if our cache store has it. If so: - a. Get the value from the cache store - b. Add a recovery case so we queue the load if fetching from cache store fails - c. Put that future in our futureCache to hit the early return next time - d. Return the resilient future - 2. If not in value cache: - a. queue or invoke the load - b. Add a success handler to store the result in the cache store - c. Return the result - */ - final CompletableFuture future = new CompletableFuture<>(); - - valueCache.get(cacheKey).whenComplete((cachedValue, getCallEx) -> { - if (getCallEx == null) { - future.complete(cachedValue); - } else { - synchronized (dataLoader) { - queueOrInvokeLoader(key, loadContext, batchingEnabled) - .whenComplete(setValueIntoCacheAndCompleteFuture(cacheKey, future)); - } - } - }); - - futureCache.set(cacheKey, future); - - return future; - } + CompletableFuture loadCallFuture; + synchronized (dataLoader) { + loadCallFuture = queueOrInvokeLoader(key, loadContext, batchingEnabled, true); + } - private BiConsumer setValueIntoCacheAndCompleteFuture(Object cacheKey, CompletableFuture future) { - return (result, loadCallEx) -> { - if (loadCallEx == null) { - valueCache.set(cacheKey, result) - .whenComplete((v, setCallExIgnored) -> future.complete(result)); - } else { - future.completeExceptionally(loadCallEx); - } - }; + futureCache.set(cacheKey, loadCallFuture); + return loadCallFuture; } - private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled) { + private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, boolean cachingEnabled) { if (batchingEnabled) { - CompletableFuture future = new CompletableFuture<>(); - loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext)); - return future; + CompletableFuture loadCallFuture = new CompletableFuture<>(); + loaderQueue.add(new LoaderQueueEntry<>(key, loadCallFuture, loadContext)); + return loadCallFuture; } else { stats.incrementBatchLoadCountBy(1); // immediate execution of batch function - return invokeLoaderImmediately(key, loadContext); + return invokeLoaderImmediately(key, loadContext, cachingEnabled); } } - CompletableFuture invokeLoaderImmediately(K key, Object keyContext) { + CompletableFuture invokeLoaderImmediately(K key, Object keyContext, boolean cachingEnabled) { List keys = singletonList(key); - CompletionStage singleLoadCall; - try { - Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); - BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment() - .context(context).keyContexts(keys, singletonList(keyContext)).build(); - if (isMapLoader()) { - singleLoadCall = invokeMapBatchLoader(keys, environment).thenApply(list -> list.get(0)); + List keyContexts = singletonList(keyContext); + return invokeLoader(keys, keyContexts, cachingEnabled) + .thenApply(list -> list.get(0)) + .toCompletableFuture(); + } + + CompletableFuture> invokeLoader(List keys, List keyContexts, boolean cachingEnabled) { + if (!cachingEnabled) { + return invokeLoader(keys, keyContexts); + } + CompletableFuture>> cacheCallCF = getFromValueCache(keys); + return cacheCallCF.thenCompose(cachedValues -> { + + assertState(keys.size() == cachedValues.size(), () -> "The size of the cached values MUST be the same size as the key list"); + + // the following is NOT a Map because keys in data loader can repeat (by design) + // and hence "a","b","c","b" is a valid set of keys + List> valuesInKeyOrder = new ArrayList<>(); + List missedKeyIndexes = new ArrayList<>(); + List missedKeys = new ArrayList<>(); + List missedKeyContexts = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + Try cacheGet = cachedValues.get(i); + valuesInKeyOrder.add(cacheGet); + if (cacheGet.isFailure()) { + missedKeyIndexes.add(i); + missedKeys.add(keys.get(i)); + missedKeyContexts.add(keyContexts.get(i)); + } + } + if (missedKeys.isEmpty()) { + // + // everything was cached + // + List assembledValues = valuesInKeyOrder.stream().map(Try::get).collect(toList()); + return completedFuture(assembledValues); } else { - singleLoadCall = invokeListBatchLoader(keys, environment).thenApply(list -> list.get(0)); + // + // we missed some of the keys from cache, so send them to the batch loader + // and then fill in their values + // + CompletableFuture> batchLoad = invokeLoader(missedKeys, missedKeyContexts); + return batchLoad.thenCompose(missedValues -> { + assertResultSize(missedKeys, missedValues); + + for (int i = 0; i < missedValues.size(); i++) { + V v = missedValues.get(i); + Integer listIndex = missedKeyIndexes.get(i); + valuesInKeyOrder.set(listIndex, Try.succeeded(v)); + } + List assembledValues = valuesInKeyOrder.stream().map(Try::get).collect(toList()); + // + // fire off a call to the ValueCache to allow it to set values into the + // cache now that we have them + return setToValueCache(assembledValues, missedKeys, missedValues); + }); } - return singleLoadCall.toCompletableFuture(); - } catch (Exception e) { - return CompletableFutureKit.failedFuture(e); - } + }); } - CompletionStage> invokeLoader(List keys, List keyContexts) { - CompletionStage> batchLoad; + + CompletableFuture> invokeLoader(List keys, List keyContexts) { + CompletableFuture> batchLoad; try { Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment() @@ -382,14 +398,14 @@ CompletionStage> invokeLoader(List keys, List keyContexts) { } @SuppressWarnings("unchecked") - private CompletionStage> invokeListBatchLoader(List keys, BatchLoaderEnvironment environment) { + private CompletableFuture> invokeListBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; if (batchLoadFunction instanceof BatchLoaderWithContext) { loadResult = ((BatchLoaderWithContext) batchLoadFunction).load(keys, environment); } else { loadResult = ((BatchLoader) batchLoadFunction).load(keys); } - return nonNull(loadResult, "Your batch loader function MUST return a non null CompletionStage promise"); + return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); } @@ -398,7 +414,7 @@ private CompletionStage> invokeListBatchLoader(List keys, BatchLoader * to missing elements. */ @SuppressWarnings("unchecked") - private CompletionStage> invokeMapBatchLoader(List keys, BatchLoaderEnvironment environment) { + private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; Set setOfKeys = new LinkedHashSet<>(keys); if (batchLoadFunction instanceof MappedBatchLoaderWithContext) { @@ -406,7 +422,7 @@ private CompletionStage> invokeMapBatchLoader(List keys, BatchLoaderE } else { loadResult = ((MappedBatchLoader) batchLoadFunction).load(setOfKeys); } - CompletionStage> mapBatchLoad = nonNull(loadResult, "Your batch loader function MUST return a non null CompletionStage promise"); + CompletableFuture> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); return mapBatchLoad.thenApply(map -> { List values = new ArrayList<>(); for (K key : keys) { @@ -426,4 +442,33 @@ int dispatchDepth() { return loaderQueue.size(); } } + + private CompletableFuture>> getFromValueCache(List keys) { + try { + return nonNull(valueCache.getValues(keys), () -> "Your ValueCache.getValues function MUST return a non null CompletableFuture"); + } catch (RuntimeException e) { + return CompletableFutureKit.failedFuture(e); + } + } + + private CompletableFuture> setToValueCache(List assembledValues, List missedKeys, List missedValues) { + try { + boolean completeValueAfterCacheSet = loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet(); + if (completeValueAfterCacheSet) { + return nonNull(valueCache + .setValues(missedKeys, missedValues), () -> "Your ValueCache.setValues function MUST return a non null CompletableFuture") + // we dont trust the set cache to give us the values back - we have them - lets use them + // if the cache set fails - then they wont be in cache and maybe next time they will + .handle((ignored, setExIgnored) -> assembledValues); + } else { + // no one is waiting for the set to happen here so if its truly async + // it will happen eventually but no result will be dependant on it + valueCache.setValues(missedKeys, missedValues); + } + } catch (RuntimeException ignored) { + // if we cant set values back into the cache - so be it - this must be a faulty + // ValueCache implementation + } + return CompletableFuture.completedFuture(assembledValues); + } } diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index 89530e1..8cd35ba 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -17,6 +17,7 @@ package org.dataloader; import org.dataloader.annotations.PublicApi; +import org.dataloader.impl.Assertions; import org.dataloader.stats.SimpleStatisticsCollector; import org.dataloader.stats.StatisticsCollector; @@ -39,11 +40,12 @@ public class DataLoaderOptions { private boolean cachingEnabled; private boolean cachingExceptionsEnabled; private CacheKey cacheKeyFunction; - private CacheMap cacheMap; - private ValueCache valueCache; + private CacheMap cacheMap; + private ValueCache valueCache; private int maxBatchSize; private Supplier statisticsCollector; private BatchLoaderContextProvider environmentProvider; + private ValueCacheOptions valueCacheOptions; /** * Creates a new data loader options with default settings. @@ -55,6 +57,7 @@ public DataLoaderOptions() { maxBatchSize = -1; statisticsCollector = SimpleStatisticsCollector::new; environmentProvider = NULL_PROVIDER; + valueCacheOptions = ValueCacheOptions.newOptions(); } /** @@ -72,6 +75,7 @@ public DataLoaderOptions(DataLoaderOptions other) { this.maxBatchSize = other.maxBatchSize; this.statisticsCollector = other.statisticsCollector; this.environmentProvider = other.environmentProvider; + this.valueCacheOptions = other.valueCacheOptions; } /** @@ -179,7 +183,7 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey cacheKeyFunction) { * * @return an optional with the cache map instance, or empty */ - public Optional> cacheMap() { + public Optional> cacheMap() { return Optional.ofNullable(cacheMap); } @@ -190,7 +194,7 @@ public Optional> cacheMap() { * * @return the data loader options for fluent coding */ - public DataLoaderOptions setCacheMap(CacheMap cacheMap) { + public DataLoaderOptions setCacheMap(CacheMap cacheMap) { this.cacheMap = cacheMap; return this; } @@ -265,7 +269,7 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide * * @return an optional with the cache store instance, or empty */ - public Optional> valueCache() { + public Optional> valueCache() { return Optional.ofNullable(valueCache); } @@ -276,8 +280,27 @@ public Optional> valueCache() { * * @return the data loader options for fluent coding */ - public DataLoaderOptions setValueCache(ValueCache valueCache) { + public DataLoaderOptions setValueCache(ValueCache valueCache) { this.valueCache = valueCache; return this; } + + /** + * @return the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used + */ + public ValueCacheOptions getValueCacheOptions() { + return valueCacheOptions; + } + + /** + * Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used + * + * @param valueCacheOptions the value cache options + * + * @return the data loader options for fluent coding + */ + public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) { + this.valueCacheOptions = Assertions.nonNull(valueCacheOptions); + return this; + } } diff --git a/src/main/java/org/dataloader/Try.java b/src/main/java/org/dataloader/Try.java index e273155..3f9a129 100644 --- a/src/main/java/org/dataloader/Try.java +++ b/src/main/java/org/dataloader/Try.java @@ -4,6 +4,7 @@ import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import java.util.function.Function; @@ -26,13 +27,22 @@ */ @PublicApi public class Try { - private static Throwable NIL = new Throwable() { + private final static Object NIL = new Object() { + }; + + private final static Throwable NIL_THROWABLE = new RuntimeException() { + @Override + public String getMessage() { + return "failure"; + } + @Override public synchronized Throwable fillInStackTrace() { return this; } }; + private final Throwable throwable; private final V value; @@ -48,6 +58,12 @@ private Try(V value) { this.throwable = null; } + + @Override + public String toString() { + return isSuccess() ? "success" : "failure"; + } + /** * Creates a Try that has succeeded with the provided value * @@ -72,6 +88,18 @@ public static Try failed(Throwable throwable) { return new Try<>(throwable); } + /** + * This returns a Try that has always failed with an consistent exception. Use this when + * yiu dont care about the exception but only that the Try failed. + * + * @param the type of value + * + * @return a Try that has failed + */ + public static Try alwaysFailed() { + return Try.failed(NIL_THROWABLE); + } + /** * Calls the callable and if it returns a value, the Try is successful with that value or if throws * and exception the Try captures that @@ -96,7 +124,7 @@ public static Try tryCall(Callable callable) { * @param completionStage the completion stage that will complete * @param the value type * - * @return a Try which is the result of the call + * @return a CompletionStage Try which is the result of the call */ public static CompletionStage> tryStage(CompletionStage completionStage) { return completionStage.handle((value, throwable) -> { @@ -107,6 +135,19 @@ public static CompletionStage> tryStage(CompletionStage completion }); } + /** + * Creates a CompletableFuture that, when it completes, will capture into a Try whether the given completionStage + * was successful or not + * + * @param completionStage the completion stage that will complete + * @param the value type + * + * @return a CompletableFuture Try which is the result of the call + */ + public static CompletableFuture> tryFuture(CompletionStage completionStage) { + return tryStage(completionStage).toCompletableFuture(); + } + /** * @return the successful value of this try * diff --git a/src/main/java/org/dataloader/ValueCache.java b/src/main/java/org/dataloader/ValueCache.java index 31042c6..44071bc 100644 --- a/src/main/java/org/dataloader/ValueCache.java +++ b/src/main/java/org/dataloader/ValueCache.java @@ -1,20 +1,26 @@ package org.dataloader; import org.dataloader.annotations.PublicSpi; +import org.dataloader.impl.CompletableFutureKit; import org.dataloader.impl.NoOpValueCache; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; /** * The {@link ValueCache} is used by data loaders that use caching and want a long-lived or external cache - * of values. The {@link ValueCache} is used as a place to cache values when they come back from + * of values. The {@link ValueCache} is used as a place to cache values when they come back from an async + * cache store. *

- * It differs from {@link CacheMap} which is in fact a cache of promises to values aka {@link CompletableFuture}<V> and it rather suited - * to be a wrapper of a long lived or external value cache. {@link CompletableFuture}s cant be easily placed in an external cache - * outside the JVM say, hence the need for the {@link ValueCache}. + * It differs from {@link CacheMap} which is in fact a cache of promised values aka {@link CompletableFuture}<V>'s. + *

+ * {@link ValueCache} is more suited to be a wrapper of a long-lived or externallly cached values. {@link CompletableFuture}s cant + * be easily placed in an external cache outside the JVM say, hence the need for the {@link ValueCache}. *

* {@link DataLoader}s use a two stage cache strategy if caching is enabled. If the {@link CacheMap} already has the promise to a value * that is used. If not then the {@link ValueCache} is asked for a value, if it has one then that is returned (and cached as a promise in the {@link CacheMap}. + *

* If there is no value then the key is queued and loaded via the {@link BatchLoader} calls. The returned values will then be stored in * the {@link ValueCache} and the promises to those values are also stored in the {@link CacheMap}. *

@@ -22,18 +28,18 @@ * store any actual results. This is to avoid duplicating the stored data between the {@link CacheMap} * out of the box. *

- * The API signature uses completable futures because the backing implementation MAY be a remote external cache - * and hence exceptions may happen in retrieving values. + * The API signature uses {@link CompletableFuture}s because the backing implementation MAY be a remote external cache + * and hence exceptions may happen in retrieving values and they may take time to complete. * * @param the type of cache keys * @param the type of cache values * * @author Craig Day + * @author Brad Baker */ @PublicSpi public interface ValueCache { - /** * Creates a new value cache, using the default no-op implementation. * @@ -48,9 +54,10 @@ static ValueCache defaultValueCache() { } /** - * Gets the specified key from the store. if the key si not present, then the implementation MUST return an exceptionally completed future - * and not null because null is a valid cacheable value. Any exception is will cause {@link DataLoader} to load the key via batch loading + * Gets the specified key from the value cache. If the key is not present, then the implementation MUST return an exceptionally completed future + * and not null because null is a valid cacheable value. An exceptionally completed future will cause {@link DataLoader} to load the key via batch loading * instead. + *

* * @param key the key to retrieve * @@ -59,6 +66,28 @@ static ValueCache defaultValueCache() { */ CompletableFuture get(K key); + /** + * Gets the specified keys from the value cache, in a batch call. If your underlying cache cant do batch caching retrieval + * then do not implement this method and it will delegate back to {@link #get(Object)} for you + *

+ * Each item in the returned list of values is a {@link Try}. If the key could not be found then a failed Try just be returned otherwise + * a successful Try contain the cached value is returned. + *

+ * You MUST return a List that is the same size as the keys passed in. The code will assert if you do not. + * + * @param keys the list of keys to get cached values for. + * + * @return a future containing a list of {@link Try} cached values for each key passed in. + */ + default CompletableFuture>> getValues(List keys) { + List>> cacheLookups = new ArrayList<>(); + for (K key : keys) { + CompletableFuture> cacheTry = Try.tryFuture(get(key)); + cacheLookups.add(cacheTry); + } + return CompletableFutureKit.allOf(cacheLookups); + } + /** * Stores the value with the specified key, or updates it if the key already exists. * @@ -70,7 +99,31 @@ static ValueCache defaultValueCache() { CompletableFuture set(K key, V value); /** - * Deletes the entry with the specified key from the store, if it exists. + * Stores the value with the specified keys, or updates it if the keys if they already exist. If your underlying cache cant do batch caching setting + * then do not implement this method and it will delegate back to {@link #set(Object, Object)} for you + * + * @param keys the keys to store + * @param values the values to store + * + * @return a future containing the stored values for fluent composition + */ + default CompletableFuture> setValues(List keys, List values) { + List> cacheSets = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + K k = keys.get(i); + V v = values.get(i); + CompletableFuture setCall = set(k, v); + CompletableFuture set = Try.tryFuture(setCall).thenApply(ignored -> v); + cacheSets.add(set); + } + return CompletableFutureKit.allOf(cacheSets); + } + + /** + * Deletes the entry with the specified key from the value cache, if it exists. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @param key the key to delete * @@ -79,7 +132,10 @@ static ValueCache defaultValueCache() { CompletableFuture delete(K key); /** - * Clears all entries from the store. + * Clears all entries from the value cache. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @return a void future for error handling and fluent composition */ diff --git a/src/main/java/org/dataloader/ValueCacheOptions.java b/src/main/java/org/dataloader/ValueCacheOptions.java new file mode 100644 index 0000000..1a0c1a1 --- /dev/null +++ b/src/main/java/org/dataloader/ValueCacheOptions.java @@ -0,0 +1,42 @@ +package org.dataloader; + +/** + * Options that control how the {@link ValueCache} is used by {@link DataLoader} + * + * @author Brad Baker + */ +public class ValueCacheOptions { + private final boolean completeValueAfterCacheSet; + + private ValueCacheOptions() { + this.completeValueAfterCacheSet = false; + } + + private ValueCacheOptions(boolean completeValueAfterCacheSet) { + this.completeValueAfterCacheSet = completeValueAfterCacheSet; + } + + public static ValueCacheOptions newOptions() { + return new ValueCacheOptions(); + } + + /** + * This controls whether the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call + * to complete before it completes the returned value. By default this is false and hence + * the {@link ValueCache#set(Object, Object)} call may complete some time AFTER the data loader + * value has been returned. + * + * This is false by default, for performance reasons. + * + * @return true the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call to complete before + * it completes the returned value. + */ + public boolean isCompleteValueAfterCacheSet() { + return completeValueAfterCacheSet; + } + + public ValueCacheOptions setCompleteValueAfterCacheSet(boolean flag) { + return new ValueCacheOptions(flag); + } + +} diff --git a/src/main/java/org/dataloader/impl/Assertions.java b/src/main/java/org/dataloader/impl/Assertions.java index 60b605b..e3eac4d 100644 --- a/src/main/java/org/dataloader/impl/Assertions.java +++ b/src/main/java/org/dataloader/impl/Assertions.java @@ -2,28 +2,26 @@ import org.dataloader.annotations.Internal; -import java.util.Objects; +import java.util.function.Supplier; @Internal public class Assertions { - public static void assertState(boolean state, String message) { + public static void assertState(boolean state, Supplier message) { if (!state) { - throw new AssertionException(message); + throw new DataLoaderAssertionException(message.get()); } } public static T nonNull(T t) { - return Objects.requireNonNull(t, "nonNull object required"); + return nonNull(t, () -> "nonNull object required"); } - public static T nonNull(T t, String message) { - return Objects.requireNonNull(t, message); - } - - private static class AssertionException extends IllegalStateException { - public AssertionException(String message) { - super(message); + public static T nonNull(T t, Supplier message) { + if (t == null) { + throw new NullPointerException(message.get()); } + return t; } + } diff --git a/src/main/java/org/dataloader/impl/DataLoaderAssertionException.java b/src/main/java/org/dataloader/impl/DataLoaderAssertionException.java new file mode 100644 index 0000000..4631387 --- /dev/null +++ b/src/main/java/org/dataloader/impl/DataLoaderAssertionException.java @@ -0,0 +1,7 @@ +package org.dataloader.impl; + +public class DataLoaderAssertionException extends IllegalStateException { + public DataLoaderAssertionException(String message) { + super(message); + } +} diff --git a/src/main/java/org/dataloader/impl/PromisedValuesImpl.java b/src/main/java/org/dataloader/impl/PromisedValuesImpl.java index 4cf3ea8..2ba592b 100644 --- a/src/main/java/org/dataloader/impl/PromisedValuesImpl.java +++ b/src/main/java/org/dataloader/impl/PromisedValuesImpl.java @@ -104,7 +104,7 @@ public Throwable cause(int index) { @Override public T get(int index) { - assertState(isDone(), "The PromisedValues MUST be complete before calling the get() method"); + assertState(isDone(), () -> "The PromisedValues MUST be complete before calling the get() method"); try { CompletionStage future = futures.get(index); return future.toCompletableFuture().get(); @@ -115,7 +115,7 @@ public T get(int index) { @Override public List toList() { - assertState(isDone(), "The PromisedValues MUST be complete before calling the toList() method"); + assertState(isDone(), () -> "The PromisedValues MUST be complete before calling the toList() method"); int size = size(); List list = new ArrayList<>(size); for (int index = 0; index < size; index++) { diff --git a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java index 1c54e91..38cfe77 100644 --- a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java +++ b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java @@ -4,12 +4,14 @@ import com.github.benmanes.caffeine.cache.Caffeine; import org.dataloader.fixtures.CaffeineValueCache; import org.dataloader.fixtures.CustomValueCache; +import org.dataloader.impl.DataLoaderAssertionException; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; @@ -17,6 +19,8 @@ import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderOptions.newOptions; import static org.dataloader.fixtures.TestKit.idLoader; +import static org.dataloader.fixtures.TestKit.snooze; +import static org.dataloader.fixtures.TestKit.sort; import static org.dataloader.impl.CompletableFutureKit.failedFuture; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertArrayEquals; @@ -45,6 +49,7 @@ public void test_by_default_we_have_no_value_caching() { assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); // futures are still cached but not values + loadCalls.clear(); fA = identityLoader.load("a"); fB = identityLoader.load("b"); @@ -56,15 +61,14 @@ public void test_by_default_we_have_no_value_caching() { assertThat(fA.join(), equalTo("a")); assertThat(fB.join(), equalTo("b")); - assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); - + assertThat(loadCalls, equalTo(emptyList())); } @Test public void should_accept_a_remote_value_store_for_caching() { - CustomValueCache customStore = new CustomValueCache(); + CustomValueCache customValueCache = new CustomValueCache(); List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); DataLoader identityLoader = idLoader(options, loadCalls); // Fetches as expected @@ -77,7 +81,7 @@ public void should_accept_a_remote_value_store_for_caching() { assertThat(fB.join(), equalTo("b")); assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); - assertArrayEquals(customStore.store.keySet().toArray(), asList("a", "b").toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), asList("a", "b").toArray()); CompletableFuture future3 = identityLoader.load("c"); CompletableFuture future2a = identityLoader.load("b"); @@ -87,21 +91,21 @@ public void should_accept_a_remote_value_store_for_caching() { assertThat(future2a.join(), equalTo("b")); assertThat(loadCalls, equalTo(asList(asList("a", "b"), singletonList("c")))); - assertArrayEquals(customStore.store.keySet().toArray(), asList("a", "b", "c").toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), asList("a", "b", "c").toArray()); // Supports clear CompletableFuture fC = new CompletableFuture<>(); identityLoader.clear("b", (v, e) -> fC.complete(v)); await().until(fC::isDone); - assertArrayEquals(customStore.store.keySet().toArray(), asList("a", "c").toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), asList("a", "c").toArray()); // Supports clear all CompletableFuture fCa = new CompletableFuture<>(); identityLoader.clearAll((v, e) -> fCa.complete(v)); await().until(fCa::isDone); - assertArrayEquals(customStore.store.keySet().toArray(), emptyList().toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), emptyList().toArray()); } @Test @@ -115,10 +119,10 @@ public void can_use_caffeine_for_caching() { .maximumSize(100) .build(); - ValueCache customStore = new CaffeineValueCache(caffeineCache); + ValueCache caffeineValueCache = new CaffeineValueCache(caffeineCache); List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoaderOptions options = newOptions().setValueCache(caffeineValueCache); DataLoader identityLoader = idLoader(options, loadCalls); // Fetches as expected @@ -146,7 +150,7 @@ public void can_use_caffeine_for_caching() { @Test public void will_invoke_loader_if_CACHE_GET_call_throws_exception() { - CustomValueCache customStore = new CustomValueCache() { + CustomValueCache customValueCache = new CustomValueCache() { @Override public CompletableFuture get(String key) { @@ -156,11 +160,11 @@ public CompletableFuture get(String key) { return super.get(key); } }; - customStore.set("a", "Not From Cache"); - customStore.set("b", "From Cache"); + customValueCache.set("a", "Not From Cache"); + customValueCache.set("b", "From Cache"); List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); DataLoader identityLoader = idLoader(options, loadCalls); CompletableFuture fA = identityLoader.load("a"); @@ -176,7 +180,7 @@ public CompletableFuture get(String key) { @Test public void will_still_work_if_CACHE_SET_call_throws_exception() { - CustomValueCache customStore = new CustomValueCache() { + CustomValueCache customValueCache = new CustomValueCache() { @Override public CompletableFuture set(String key, Object value) { if (key.equals("a")) { @@ -187,7 +191,7 @@ public CompletableFuture set(String key, Object value) { }; List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); DataLoader identityLoader = idLoader(options, loadCalls); CompletableFuture fA = identityLoader.load("a"); @@ -199,6 +203,255 @@ public CompletableFuture set(String key, Object value) { // a was not in cache (according to get) and hence needed to be loaded assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); - assertArrayEquals(customStore.store.keySet().toArray(), singletonList("b").toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), singletonList("b").toArray()); + } + + @Test + public void caching_can_take_some_time_complete() { + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture get(String key) { + if (key.startsWith("miss")) { + return CompletableFuture.supplyAsync(() -> { + snooze(1000); + throw new IllegalStateException("no a in cache"); + }); + } else { + return CompletableFuture.supplyAsync(() -> { + snooze(1000); + return key; + }); + } + } + + }; + + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("missC"); + CompletableFuture fD = identityLoader.load("missD"); + + await().until(identityLoader.dispatch()::isDone); + + assertThat(fA.join(), equalTo("a")); + assertThat(fB.join(), equalTo("b")); + assertThat(fC.join(), equalTo("missC")); + assertThat(fD.join(), equalTo("missD")); + + assertThat(loadCalls, equalTo(singletonList(asList("missC", "missD")))); + } + + @Test + public void batch_caching_works_as_expected() { + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture>> getValues(List keys) { + List> cacheCalls = new ArrayList<>(); + for (String key : keys) { + if (key.startsWith("miss")) { + cacheCalls.add(Try.alwaysFailed()); + } else { + cacheCalls.add(Try.succeeded(key)); + } + } + return CompletableFuture.supplyAsync(() -> { + snooze(1000); + return cacheCalls; + }); + } + }; + + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("missC"); + CompletableFuture fD = identityLoader.load("missD"); + + await().until(identityLoader.dispatch()::isDone); + + assertThat(fA.join(), equalTo("a")); + assertThat(fB.join(), equalTo("b")); + assertThat(fC.join(), equalTo("missC")); + assertThat(fD.join(), equalTo("missD")); + + assertThat(loadCalls, equalTo(singletonList(asList("missC", "missD")))); + + List values = new ArrayList<>(customValueCache.asMap().values()); + // it will only set back in values that are missed - it wont set in values that successfully + // came out of the cache + assertThat(values, equalTo(asList("missC", "missD"))); + } + + @Test + public void assertions_will_be_thrown_if_the_cache_does_not_follow_contract() { + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture>> getValues(List keys) { + List> cacheCalls = new ArrayList<>(); + for (String key : keys) { + if (key.startsWith("miss")) { + cacheCalls.add(Try.alwaysFailed()); + } else { + cacheCalls.add(Try.succeeded(key)); + } + } + List> renegOnContract = cacheCalls.subList(1, cacheCalls.size() - 1); + return CompletableFuture.completedFuture(renegOnContract); + } + }; + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("missC"); + CompletableFuture fD = identityLoader.load("missD"); + + await().until(identityLoader.dispatch()::isDone); + + assertTrue(isAssertionException(fA)); + assertTrue(isAssertionException(fB)); + assertTrue(isAssertionException(fC)); + assertTrue(isAssertionException(fD)); + } + + private boolean isAssertionException(CompletableFuture fA) { + Throwable throwable = Try.tryFuture(fA).join().getThrowable(); + return throwable instanceof DataLoaderAssertionException; + } + + + @Test + public void if_caching_is_off_its_never_hit() { + AtomicInteger getCalls = new AtomicInteger(); + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture get(String key) { + getCalls.incrementAndGet(); + return super.get(key); + } + }; + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache).setCachingEnabled(false); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("missC"); + CompletableFuture fD = identityLoader.load("missD"); + + await().until(identityLoader.dispatch()::isDone); + + assertThat(fA.join(), equalTo("a")); + assertThat(fB.join(), equalTo("b")); + assertThat(fC.join(), equalTo("missC")); + assertThat(fD.join(), equalTo("missD")); + + assertThat(loadCalls, equalTo(singletonList(asList("a", "b", "missC", "missD")))); + assertThat(getCalls.get(), equalTo(0)); + assertTrue(customValueCache.asMap().isEmpty()); + } + + @Test + public void if_everything_is_cached_no_batching_happens() { + AtomicInteger getCalls = new AtomicInteger(); + AtomicInteger setCalls = new AtomicInteger(); + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture get(String key) { + getCalls.incrementAndGet(); + return super.get(key); + } + + @Override + public CompletableFuture> setValues(List keys, List values) { + setCalls.incrementAndGet(); + return super.setValues(keys, values); + } + }; + customValueCache.asMap().put("a", "cachedA"); + customValueCache.asMap().put("b", "cachedB"); + customValueCache.asMap().put("c", "cachedC"); + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache).setCachingEnabled(true); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("c"); + + await().until(identityLoader.dispatch()::isDone); + + assertThat(fA.join(), equalTo("cachedA")); + assertThat(fB.join(), equalTo("cachedB")); + assertThat(fC.join(), equalTo("cachedC")); + + assertThat(loadCalls, equalTo(emptyList())); + assertThat(getCalls.get(), equalTo(3)); + assertThat(setCalls.get(), equalTo(0)); + } + + + @Test + public void if_batching_is_off_it_still_can_cache() { + AtomicInteger getCalls = new AtomicInteger(); + AtomicInteger setCalls = new AtomicInteger(); + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture get(String key) { + getCalls.incrementAndGet(); + return super.get(key); + } + + @Override + public CompletableFuture> setValues(List keys, List values) { + setCalls.incrementAndGet(); + return super.setValues(keys, values); + } + }; + customValueCache.asMap().put("a", "cachedA"); + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache).setCachingEnabled(true).setBatchingEnabled(false); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("c"); + + assertTrue(fA.isDone()); // with batching off they are dispatched immediately + assertTrue(fB.isDone()); + assertTrue(fC.isDone()); + + await().until(identityLoader.dispatch()::isDone); + + assertThat(fA.join(), equalTo("cachedA")); + assertThat(fB.join(), equalTo("b")); + assertThat(fC.join(), equalTo("c")); + + assertThat(loadCalls, equalTo(asList(singletonList("b"), singletonList("c")))); + assertThat(getCalls.get(), equalTo(3)); + assertThat(setCalls.get(), equalTo(2)); + + assertThat(sort(customValueCache.asMap().values()), equalTo(sort(asList("b", "c", "cachedA")))); } } diff --git a/src/test/java/org/dataloader/TryTest.java b/src/test/java/org/dataloader/TryTest.java index 46514ad..1fdd286 100644 --- a/src/test/java/org/dataloader/TryTest.java +++ b/src/test/java/org/dataloader/TryTest.java @@ -11,7 +11,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; @SuppressWarnings("ConstantConditions") public class TryTest { @@ -193,4 +195,12 @@ public void recover() throws Exception { assertSuccess(sTry, "Hello Again"); } + + @Test + public void canAlwaysFail() { + Try failedTry = Try.alwaysFailed(); + + assertTrue(failedTry.isFailure()); + assertFalse(failedTry.isSuccess()); + } } \ No newline at end of file diff --git a/src/test/java/org/dataloader/fixtures/CustomValueCache.java b/src/test/java/org/dataloader/fixtures/CustomValueCache.java index d707175..316016e 100644 --- a/src/test/java/org/dataloader/fixtures/CustomValueCache.java +++ b/src/test/java/org/dataloader/fixtures/CustomValueCache.java @@ -37,4 +37,8 @@ public CompletableFuture clear() { store.clear(); return CompletableFuture.completedFuture(null); } + + public Map asMap() { + return store; + } } \ No newline at end of file diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 2ea23a8..5c87148 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -9,8 +9,8 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; +import static java.util.stream.Collectors.toList; import static org.dataloader.impl.CompletableFutureKit.failedFuture; public class TestKit { @@ -26,7 +26,7 @@ public static BatchLoader keysAsValues(List> loadCalls) { @SuppressWarnings("unchecked") List values = keys.stream() .map(k -> (V) k) - .collect(Collectors.toList()); + .collect(toList()); return CompletableFuture.completedFuture(values); }; } @@ -62,4 +62,9 @@ public static void snooze(int millis) { throw new RuntimeException(e); } } + + + public static List sort(Collection collection) { + return collection.stream().sorted().collect(toList()); + } }