diff --git a/.gitignore b/.gitignore index ea323992..82f04479 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # Created by https://www.gitignore.io +analytics/pending ### Maven ### target/ @@ -128,4 +129,4 @@ atlassian-ide-plugin.xml .classpath .project .settings/ -.factorypath \ No newline at end of file +.factorypath diff --git a/ARQUITECTURE.md b/ARQUITECTURE.md new file mode 100644 index 00000000..e06d772f --- /dev/null +++ b/ARQUITECTURE.md @@ -0,0 +1,60 @@ +```mermaid + +sequenceDiagram + box Data Consolidation + participant ConsolidationService + end + + participant SegmentClient + box HTTP + participant QueueHttp + participant LooperHttp + participant SegmentAPI + end + box File + participant QueueFile + participant WriteFile + participant File + participant WatchFile + end + + activate ConsolidationService + ConsolidationService->>+SegmentClient: enqueue + SegmentClient<<->>QueueHttp: offer + alt QueueHttp overflow + SegmentClient<<->>QueueFile: put + end + SegmentClient->>-ConsolidationService: + deactivate ConsolidationService + + loop consume QueueHttp + LooperHttp->>QueueHttp:take + activate LooperHttp + end + LooperHttp->>SegmentAPI: batchUpload + note over LooperHttp,SegmentAPI: Batch + note over LooperHttp,SegmentAPI: CircuitBreaker and Retry + note over LooperHttp,SegmentAPI: HTTP requests submited to a pool + deactivate LooperHttp + + alt retry exhausted or circuit open + note over LooperHttp: pool threads + LooperHttp->>QueueFile: put + end + + loop consume QueueFile + WriteFile->>QueueFile:take + activate WriteFile + end + WriteFile->>File: write + note over WriteFile: Batch and save file + deactivate WriteFile + + note over WatchFile: check last written + activate WatchFile + loop watch QueueFile + WatchFile->>File: read and remove + WatchFile->>QueueHttp: offer + end + deactivate WatchFile +``` diff --git a/analytics-core/src/main/java/com/segment/analytics/http/SegmentService.java b/analytics-core/src/main/java/com/segment/analytics/http/SegmentService.java index c96cbef2..95389759 100644 --- a/analytics-core/src/main/java/com/segment/analytics/http/SegmentService.java +++ b/analytics-core/src/main/java/com/segment/analytics/http/SegmentService.java @@ -2,6 +2,7 @@ import com.segment.analytics.messages.Batch; import okhttp3.HttpUrl; +import okhttp3.RequestBody; import retrofit2.Call; import retrofit2.http.Body; import retrofit2.http.POST; @@ -11,4 +12,7 @@ public interface SegmentService { @POST Call upload(@Url HttpUrl uploadUrl, @Body Batch batch); + + @POST + Call upload(@Url HttpUrl uploadUrl, @Body RequestBody batch); } diff --git a/analytics/pom.xml b/analytics/pom.xml index 8c80856c..25d9fe31 100644 --- a/analytics/pom.xml +++ b/analytics/pom.xml @@ -39,9 +39,21 @@ findbugs provided + - com.segment.backo - backo + dev.failsafe + failsafe + 3.3.2 + + + dev.failsafe + failsafe-retrofit + 3.3.2 + + + commons-io + commons-io + 2.18.0 junit @@ -67,7 +79,19 @@ org.mockito mockito-core test - + + + org.wiremock + wiremock-standalone + 3.2.0 + test + + + org.awaitility + awaitility + 4.2.2 + test + @@ -84,6 +108,18 @@ + + org.apache.maven.plugins + maven-surefire-plugin + 3.5.3 + + + + ${project.basedir}/src/test/resources/logging.properties + + + + diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java index 81af36c7..5b3f2b15 100644 --- a/analytics/src/main/java/com/segment/analytics/Analytics.java +++ b/analytics/src/main/java/com/segment/analytics/Analytics.java @@ -7,8 +7,13 @@ import com.segment.analytics.http.SegmentService; import com.segment.analytics.internal.AnalyticsClient; import com.segment.analytics.internal.AnalyticsVersion; +import com.segment.analytics.internal.Config; +import com.segment.analytics.internal.Config.FileConfig; +import com.segment.analytics.internal.Config.HttpConfig; import com.segment.analytics.messages.Message; import com.segment.analytics.messages.MessageBuilder; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -16,7 +21,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import okhttp3.ConnectionSpec; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; @@ -40,7 +44,7 @@ * * @see Segment */ -public class Analytics { +public class Analytics implements Closeable { private final AnalyticsClient client; private final List messageTransformers; private final List messageInterceptors; @@ -90,14 +94,9 @@ public boolean offer(MessageBuilder builder) { return client.offer(message); } - /** Flush events in the message queue. */ - public void flush() { - client.flush(); - } - - /** Stops this instance from processing further requests. */ - public void shutdown() { - client.shutdown(); + /** Stops this instance from processing further requests. */ + public void close() { + client.close(); } /** @@ -130,26 +129,19 @@ public static class Builder { private static final String DEFAULT_ENDPOINT = "https://api.segment.io"; private static final String DEFAULT_PATH = "/v1/import/"; private static final String DEFAULT_USER_AGENT = "analytics-java/" + AnalyticsVersion.get(); - private static final int MESSAGE_QUEUE_MAX_BYTE_SIZE = 1024 * 500; private final String writeKey; - private OkHttpClient client; private Log log; public HttpUrl endpoint; public HttpUrl uploadURL; private String userAgent = DEFAULT_USER_AGENT; private List messageTransformers; private List messageInterceptors; - private ExecutorService networkExecutor; private ThreadFactory threadFactory; - private int flushQueueSize; - private int maximumFlushAttempts; - private int maximumQueueSizeInBytes; - private long flushIntervalInMillis; - private List callbacks; - private int queueCapacity; private boolean forceTlsV1 = false; private GsonBuilder gsonBuilder; + private HttpConfig httpConfig; + private FileConfig fileConfig; Builder(String writeKey) { if (writeKey == null || writeKey.trim().length() == 0) { @@ -158,15 +150,6 @@ public static class Builder { this.writeKey = writeKey; } - /** Set a custom networking client. */ - public Builder client(OkHttpClient client) { - if (client == null) { - throw new NullPointerException("Null client"); - } - this.client = client; - return this; - } - /** Configure debug logging mechanism. By default, nothing is logged. */ public Builder log(Log log) { if (log == null) { @@ -241,15 +224,6 @@ public Builder messageInterceptor(MessageInterceptor interceptor) { return this; } - /** Set queue capacity */ - public Builder queueCapacity(int capacity) { - if (capacity <= 0) { - throw new IllegalArgumentException("capacity should be positive."); - } - this.queueCapacity = capacity; - return this; - } - public Builder gsonBuilder(GsonBuilder gsonBuilder) { if (gsonBuilder == null) { throw new NullPointerException("Null gsonBuilder"); @@ -263,56 +237,6 @@ public Builder gsonBuilder(GsonBuilder gsonBuilder) { return this; } - /** Set the queueSize at which flushes should be triggered. */ - @Beta - public Builder flushQueueSize(int flushQueueSize) { - if (flushQueueSize < 1) { - throw new IllegalArgumentException("flushQueueSize must not be less than 1."); - } - this.flushQueueSize = flushQueueSize; - return this; - } - - /** Set the queueSize at which flushes should be triggered. */ - @Beta - public Builder maximumQueueSizeInBytes(int bytes) { - if (bytes < 1) { - throw new IllegalArgumentException("maximumQueueSizeInBytes must not be less than 1."); - } - - this.maximumQueueSizeInBytes = bytes; - return this; - } - - /** Set the interval at which the queue should be flushed. */ - @Beta - public Builder flushInterval(long flushInterval, TimeUnit unit) { - long flushIntervalInMillis = unit.toMillis(flushInterval); - if (flushIntervalInMillis < 1000) { - throw new IllegalArgumentException("flushInterval must not be less than 1 second."); - } - this.flushIntervalInMillis = flushIntervalInMillis; - return this; - } - - /** Set how many retries should happen before getting exhausted */ - public Builder retries(int maximumRetries) { - if (maximumRetries < 1) { - throw new IllegalArgumentException("retries must be at least 1"); - } - this.maximumFlushAttempts = maximumRetries; - return this; - } - - /** Set the {@link ExecutorService} on which all HTTP requests will be made. */ - public Builder networkExecutor(ExecutorService networkExecutor) { - if (networkExecutor == null) { - throw new NullPointerException("Null networkExecutor"); - } - this.networkExecutor = networkExecutor; - return this; - } - /** Set the {@link ThreadFactory} used to create threads. */ @Beta public Builder threadFactory(ThreadFactory threadFactory) { @@ -323,21 +247,6 @@ public Builder threadFactory(ThreadFactory threadFactory) { return this; } - /** Add a {@link Callback} to be notified when an event is processed. */ - public Builder callback(Callback callback) { - if (callback == null) { - throw new NullPointerException("Null callback"); - } - if (callbacks == null) { - callbacks = new ArrayList<>(); - } - if (callbacks.contains(callback)) { - throw new IllegalStateException("Callback is already registered."); - } - callbacks.add(callback); - return this; - } - /** Use a {@link Plugin} to configure the builder. */ @Beta public Builder plugin(Plugin plugin) { @@ -353,9 +262,22 @@ public Builder forceTlsVersion1() { forceTlsV1 = true; return this; } + + public Builder httpConfig(HttpConfig httpConfig) { + this.httpConfig = httpConfig; + return this; + } + public Builder fileConfig(FileConfig fileConfig) { + this.fileConfig = fileConfig; + return this; + } - /** Create a {@link Analytics} client. */ - public Analytics build() { + /** + * Create a {@link Analytics} client. + * + * @throws IOException if cannot create the configured filePath directory + */ + public Analytics build() throws IOException { if (gsonBuilder == null) { gsonBuilder = new GsonBuilder(); } @@ -374,28 +296,9 @@ public Analytics build() { } } - if (client == null) { - client = Platform.get().defaultClient(); - } - if (log == null) { log = Log.NONE; } - if (flushIntervalInMillis == 0) { - flushIntervalInMillis = Platform.get().defaultFlushIntervalInMillis(); - } - if (queueCapacity == 0) { - queueCapacity = Integer.MAX_VALUE; - } - if (flushQueueSize == 0) { - flushQueueSize = Platform.get().defaultFlushQueueSize(); - } - if (maximumQueueSizeInBytes == 0) { - maximumQueueSizeInBytes = MESSAGE_QUEUE_MAX_BYTE_SIZE; - } - if (maximumFlushAttempts == 0) { - maximumFlushAttempts = 3; - } if (messageTransformers == null) { messageTransformers = Collections.emptyList(); } else { @@ -406,16 +309,14 @@ public Analytics build() { } else { messageInterceptors = Collections.unmodifiableList(messageInterceptors); } - if (networkExecutor == null) { - networkExecutor = Platform.get().defaultNetworkExecutor(); - } if (threadFactory == null) { - threadFactory = Platform.get().defaultThreadFactory(); + threadFactory = Config.defaultThreadFactory(); } - if (callbacks == null) { - callbacks = Collections.emptyList(); - } else { - callbacks = Collections.unmodifiableList(callbacks); + if(httpConfig == null) { + httpConfig = HttpConfig.builder().build(); + } + if(fileConfig == null) { + fileConfig = FileConfig.builder().build(); } HttpLoggingInterceptor interceptor = @@ -430,7 +331,7 @@ public void log(String message) { interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC); OkHttpClient.Builder builder = - client + httpConfig.client .newBuilder() .addInterceptor(new AnalyticsRequestInterceptor(userAgent)) .addInterceptor(interceptor); @@ -445,33 +346,18 @@ public void log(String message) { builder = builder.connectionSpecs(Arrays.asList(connectionSpec)); } - client = builder.build(); + httpConfig.client = builder.build(); Retrofit restAdapter = new Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create(gson)) .baseUrl(DEFAULT_ENDPOINT) - .client(client) + .client(httpConfig.client) .build(); SegmentService segmentService = restAdapter.create(SegmentService.class); - AnalyticsClient analyticsClient = - AnalyticsClient.create( - endpoint, - segmentService, - queueCapacity, - flushQueueSize, - flushIntervalInMillis, - maximumFlushAttempts, - maximumQueueSizeInBytes, - log, - threadFactory, - networkExecutor, - callbacks, - writeKey, - gson); - + AnalyticsClient analyticsClient = new AnalyticsClient(endpoint, segmentService, log, threadFactory, writeKey, gson, httpConfig, fileConfig); return new Analytics(analyticsClient, messageTransformers, messageInterceptors, log); } } diff --git a/analytics/src/main/java/com/segment/analytics/Platform.java b/analytics/src/main/java/com/segment/analytics/Platform.java deleted file mode 100644 index a5de5cb5..00000000 --- a/analytics/src/main/java/com/segment/analytics/Platform.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.segment.analytics; - -import static java.lang.Thread.MIN_PRIORITY; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import okhttp3.OkHttpClient; - -class Platform { - static final String THREAD_NAME = "Analytics"; - - private static final Platform PLATFORM = findPlatform(); - - static Platform get() { - return PLATFORM; - } - - private static Platform findPlatform() { - return new Platform(); - } - - OkHttpClient defaultClient() { - OkHttpClient client = - new OkHttpClient.Builder() - .connectTimeout(15, TimeUnit.SECONDS) - .readTimeout(15, TimeUnit.SECONDS) - .writeTimeout(15, TimeUnit.SECONDS) - .build(); - return client; - } - - ExecutorService defaultNetworkExecutor() { - return Executors.newSingleThreadExecutor(defaultThreadFactory()); - } - - ThreadFactory defaultThreadFactory() { - return new ThreadFactory() { - @Override - public Thread newThread(final Runnable r) { - return new Thread( - new Runnable() { - @Override - public void run() { - Thread.currentThread().setPriority(MIN_PRIORITY); - r.run(); - } - }, - THREAD_NAME); - } - }; - } - - public long defaultFlushIntervalInMillis() { - return 10 * 1000; // 10s - } - - public int defaultFlushQueueSize() { - return 250; - } -} diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index f7560004..2dc872c2 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -5,43 +5,50 @@ import static com.segment.analytics.Log.Level.VERBOSE; import com.google.gson.Gson; -import com.segment.analytics.Callback; import com.segment.analytics.Log; import com.segment.analytics.http.SegmentService; import com.segment.analytics.http.UploadResponse; +import com.segment.analytics.internal.Config.FileConfig; +import com.segment.analytics.internal.Config.HttpConfig; import com.segment.analytics.messages.Batch; import com.segment.analytics.messages.Message; -import com.segment.backo.Backo; +import dev.failsafe.CircuitBreaker; +import java.io.Closeable; import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; import okhttp3.HttpUrl; -import retrofit2.Call; +import okhttp3.MediaType; +import okhttp3.RequestBody; import retrofit2.Response; -public class AnalyticsClient { +public class AnalyticsClient implements Closeable { + private static final Logger LOGGER = Logger.getLogger(AnalyticsClient.class.getName()); + private static final Map CONTEXT; private static final int BATCH_MAX_SIZE = 1024 * 500; private static final int MSG_MAX_SIZE = 1024 * 32; private static final Charset ENCODING = StandardCharsets.UTF_8; private Gson gsonInstance; - private static final String instanceId = UUID.randomUUID().toString(); + private static final String instanceId = UUID.randomUUID().toString(); // TODO configurable ? static { Map library = new LinkedHashMap<>(); @@ -53,189 +60,99 @@ public class AnalyticsClient { CONTEXT = Collections.unmodifiableMap(context); } + private final HttpConfig config; private final BlockingQueue messageQueue; private final HttpUrl uploadUrl; private final SegmentService service; - private final int size; - private final int maximumRetries; - private final int maximumQueueByteSize; - private int currentQueueSizeInBytes; private final Log log; - private final List callbacks; private final ExecutorService networkExecutor; - private final ExecutorService looperExecutor; - private final ScheduledExecutorService flushScheduler; - private final AtomicBoolean isShutDown; private final String writeKey; - - public static AnalyticsClient create( - HttpUrl uploadUrl, - SegmentService segmentService, - int queueCapacity, - int flushQueueSize, - long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, - Log log, - ThreadFactory threadFactory, - ExecutorService networkExecutor, - List callbacks, - String writeKey, - Gson gsonInstance) { - return new AnalyticsClient( - new LinkedBlockingQueue(queueCapacity), - uploadUrl, - segmentService, - flushQueueSize, - flushIntervalInMillis, - maximumRetries, - maximumQueueSizeInBytes, - log, - threadFactory, - networkExecutor, - callbacks, - new AtomicBoolean(false), - writeKey, - gsonInstance); - } - - public AnalyticsClient( - BlockingQueue messageQueue, - HttpUrl uploadUrl, - SegmentService service, - int maxQueueSize, - long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, - Log log, - ThreadFactory threadFactory, - ExecutorService networkExecutor, - List callbacks, - AtomicBoolean isShutDown, - String writeKey, - Gson gsonInstance) { - this.messageQueue = messageQueue; + private final Thread looperThread; + private final AtomicBoolean isShutDown = new AtomicBoolean(false); + private final CircuitBreaker breaker; + private final FallbackAppender fallback; + private final ResubmitCheck resubmit; + + public AnalyticsClient(HttpUrl uploadUrl, SegmentService service, Log log, ThreadFactory threadFactory, + String writeKey, Gson gsonInstance, HttpConfig config, FileConfig fileConfig) + throws IOException { + this.config = config; + this.messageQueue = new LinkedBlockingQueue(config.queueSize); this.uploadUrl = uploadUrl; this.service = service; - this.size = maxQueueSize; - this.maximumRetries = maximumRetries; - this.maximumQueueByteSize = maximumQueueSizeInBytes; this.log = log; - this.callbacks = callbacks; - this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory); - this.networkExecutor = networkExecutor; - this.isShutDown = isShutDown; + this.looperThread = threadFactory.newThread(new Looper()); + this.looperThread.setName(AnalyticsClient.class.getSimpleName() + "-Looper"); + this.networkExecutor = config.executor; this.writeKey = writeKey; this.gsonInstance = gsonInstance; - this.currentQueueSizeInBytes = 0; - - if (!isShutDown.get()) looperExecutor.submit(new Looper()); - - flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); - flushScheduler.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - flush(); - } - }, - flushIntervalInMillis, - flushIntervalInMillis, - TimeUnit.MILLISECONDS); + this.breaker = CircuitBreaker.>builder() + // X failure in 1 minute open the circuit + .withFailureThreshold(config.circuitErrorsInAMinute, Duration.ofMinutes(1)) + // once open wait X seconds to be half-open + .withDelay(Duration.ofSeconds(config.circuitSecondsInOpen)) + // after X success the circuit is closed + .withSuccessThreshold(config.circuitRequestToClose) + // 5xx or rate limit is an error + .handleResultIf(response -> is5xx(response.code()) || response.code() == 429) + .onOpen(el -> LOGGER.log(Level.INFO, "OPEN: failing requests")) + .onHalfOpen(el -> LOGGER.log(Level.INFO, "HALF OPEN: checking status")) + .onClose(el -> LOGGER.log(Level.INFO, "CLOSED: attending requests normally")).build(); + + this.fallback = new FallbackAppender(gsonInstance, threadFactory, fileConfig); + this.resubmit = new ResubmitCheck(threadFactory, fileConfig, this); + + looperThread.start(); } public int messageSizeInBytes(Message message) { String stringifiedMessage = gsonInstance.toJson(message); - return stringifiedMessage.getBytes(ENCODING).length; } - private Boolean isBackPressuredAfterSize(int incomingSize) { - int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON); - int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE; - // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time - return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9; - } + public boolean offer(Message message) throws IllegalArgumentException { + if (messageSizeInBytes(message) > MSG_MAX_SIZE) { + throw new IllegalArgumentException("Message was above individual limit. MessageId: " + message.messageId()); + } - public boolean offer(Message message) { return messageQueue.offer(message); } - public void enqueue(Message message) { - if (message != StopMessage.STOP && isShutDown.get()) { + public void enqueue(Message message) throws IllegalArgumentException { + if (isShutDown.get()) { log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); return; } - - try { - // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its - // valid message - if (message != StopMessage.STOP && message != FlushMessage.POISON) { - int messageByteSize = messageSizeInBytes(message); - - // @jorgen25 check if message is below 32kb limit for individual messages, no need to check - // for extra characters - if (messageByteSize <= MSG_MAX_SIZE) { - if (isBackPressuredAfterSize(messageByteSize)) { - this.currentQueueSizeInBytes = messageByteSize; - messageQueue.put(FlushMessage.POISON); - messageQueue.put(message); - - log.print(VERBOSE, "Maximum storage size has been hit Flushing..."); - } else { - messageQueue.put(message); - this.currentQueueSizeInBytes += messageByteSize; - } - } else { - log.print( - ERROR, "Message was above individual limit. MessageId: %s", message.messageId()); - throw new IllegalArgumentException( - "Message was above individual limit. MessageId: " + message.messageId()); - } - } else { - messageQueue.put(message); - } - } catch (InterruptedException e) { - log.print(ERROR, e, "Interrupted while adding message %s.", message); - Thread.currentThread().interrupt(); - } - } - - public void flush() { - if (!isShutDown.get()) { - enqueue(FlushMessage.POISON); + if (!offer(message)) { + fallback.add(message); + } else { + LOGGER.log(Level.FINE, "enqueued {0}", message.messageId()); } } - public void shutdown() { + @Override + public void close() { if (isShutDown.compareAndSet(false, true)) { final long start = System.currentTimeMillis(); // first let's tell the system to stop - enqueue(StopMessage.STOP); + looperThread.interrupt(); + fallback.close(); + resubmit.close(); - // we can shutdown the flush scheduler without worrying - flushScheduler.shutdownNow(); - - shutdownAndWait(looperExecutor, "looper"); shutdownAndWait(networkExecutor, "network"); - log.print( - VERBOSE, "Analytics client shut down in %s ms", (System.currentTimeMillis() - start)); + log.print(VERBOSE, "Analytics client shut down in %s ms", (System.currentTimeMillis() - start)); } } - public void shutdownAndWait(ExecutorService executor, String name) { + private void shutdownAndWait(ExecutorService executor, String name) { try { executor.shutdown(); final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); - log.print( - VERBOSE, - "%s executor %s.", - name, - executorTerminated ? "terminated normally" : "timed out"); + log.print(VERBOSE, "%s executor %s.", name, executorTerminated ? "terminated normally" : "timed out"); } catch (InterruptedException e) { log.print(ERROR, e, "Interrupted while stopping %s executor.", name); Thread.currentThread().interrupt(); @@ -243,239 +160,199 @@ public void shutdownAndWait(ExecutorService executor, String name) { } /** - * Looper runs on a background thread and takes messages from the queue. Once it collects enough - * messages, it triggers a flush. + * Looper runs on a background thread and takes messages from the queue. Once it + * collects enough messages, it triggers a flush. */ class Looper implements Runnable { - private boolean stop; public Looper() { - this.stop = false; } @Override public void run() { LinkedList messages = new LinkedList<>(); - AtomicInteger currentBatchSize = new AtomicInteger(); + int currentBatchSize = 0; boolean batchSizeLimitReached = false; int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length; + + long reportedAt = System.currentTimeMillis(); try { - while (!stop) { - Message message = messageQueue.take(); - - if (message == StopMessage.STOP) { - log.print(VERBOSE, "Stopping the Looper"); - stop = true; - } else if (message == FlushMessage.POISON) { - if (!messages.isEmpty()) { - log.print(VERBOSE, "Flushing messages."); - } - } else { - // we do +1 because we are accounting for this new message we just took from the queue - // which is not in list yet - // need to check if this message is going to make us go over the limit considering - // default batch size as well - int defaultBatchSize = - BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); - int msgSize = messageSizeInBytes(message); - if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { - messages.add(message); - currentBatchSize.addAndGet(msgSize); - } else { - // put message that did not make the cut this time back on the queue, we already took - // this message if we dont put it back its lost - // we take care of that after submitting the batch - batchSizeLimitReached = true; - } - } - - Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP; - Boolean isOverflow = messages.size() >= size; - - if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { - Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); - log.print( - VERBOSE, - "Batching %s message(s) into batch %s.", - batch.batch().size(), - batch.sequence()); - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); - - currentBatchSize.set(0); - messages.clear(); - if (batchSizeLimitReached) { - // If this is true that means the last message that would make us go over the limit - // was not added, - // add it to the now cleared messages list so its not lost - messages.add(message); - } - batchSizeLimitReached = false; - } - } + while (!Thread.currentThread().isInterrupted()) { + Message message = messageQueue.poll(config.flushIntervalInMillis, TimeUnit.MILLISECONDS); + + if (message != null) { + // we do +1 because we are accounting for this new message we just took from the + // queue + // which is not in list yet + // need to check if this message is going to make us go over the limit + // considering + // default batch size as well + int defaultBatchSize = BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); + int msgSize = messageSizeInBytes(message); + if (currentBatchSize + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { + messages.add(message); + currentBatchSize += msgSize; + } else { + // put message that did not make the cut this time back on the queue, we already + // took + // this message if we dont put it back its lost + // we take care of that after submitting the batch + batchSizeLimitReached = true; + } + } + + if (messages.isEmpty()) { + continue; + } + + Boolean isBlockingSignal = message == null; + Boolean isOverflow = messages.size() >= config.flushQueueSize; + + if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { + Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); + log.print(VERBOSE, "Batching %s message(s) into batch %s.", batch.batch().size(), batch.sequence()); + + networkExecutor.submit(new UploadBatchTask(breaker, service, uploadUrl, batch, fallback)); + + currentBatchSize = 0; + messages.clear(); + if (batchSizeLimitReached) { + // If this is true that means the last message that would make us go over the + // limit + // was not added, + // add it to the now cleared messages list so its not lost + messages.add(message); + } + batchSizeLimitReached = false; + } + + long now = System.currentTimeMillis(); + if (now - reportedAt > 2_000) { + LOGGER.log(Level.FINE, "HTTPQueue: {0}", messageQueue.size()); + if (networkExecutor instanceof ThreadPoolExecutor) { + ThreadPoolExecutor tpe = (ThreadPoolExecutor) networkExecutor; + LOGGER.log(Level.FINE, "HTTPPool active:{0}", tpe.getActiveCount()); + } + reportedAt = now; + } + } } catch (InterruptedException e) { - log.print(DEBUG, "Looper interrupted while polling for messages."); - Thread.currentThread().interrupt(); + log.print(DEBUG, "Looper interrupted while polling for messages."); + Thread.currentThread().interrupt(); } - log.print(VERBOSE, "Looper stopped"); - } - } - static class BatchUploadTask implements Runnable { - private static final Backo BACKO = - Backo.builder() // - .base(TimeUnit.SECONDS, 15) // - .cap(TimeUnit.HOURS, 1) // - .jitter(1) // - .build(); + isShutDown.compareAndSet(false, true); - private final AnalyticsClient client; - private final Backo backo; - final Batch batch; - private final int maxRetries; - - static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) { - return new BatchUploadTask(client, BACKO, batch, maxRetries); - } + Message msg = messageQueue.poll(); + while (msg != null) { + fallback.add(msg); + msg = messageQueue.poll(); + } - BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) { - this.client = client; - this.batch = batch; - this.backo = backo; - this.maxRetries = maxRetries; + log.print(VERBOSE, "Looper stopped"); } + } - private void notifyCallbacksWithException(Batch batch, Exception exception) { - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.failure(message, exception); - } - } - } + private static boolean is5xx(int status) { + return status >= 500 && status < 600; + } - /** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */ - boolean upload() { - client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence()); + static interface SupplierWithException { + T get() throws Exception; + } - try { - Call call = client.service.upload(client.uploadUrl, batch); - Response response = call.execute(); - if (response.isSuccessful()) { - client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence()); + + static abstract class UploadTask implements Runnable{ + final CircuitBreaker breaker; + final SegmentService service; + final HttpUrl uploadUrl; + public UploadTask(CircuitBreaker breaker, SegmentService service, HttpUrl uploadUrl) { + this.breaker = breaker; + this.service = service; + this.uploadUrl = uploadUrl; + } - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.success(message); + boolean upload(SupplierWithException> uploadRequest) { + if (breaker.tryAcquirePermit()) { + try { + Response upload = uploadRequest.get(); + if (upload.isSuccessful()) { + breaker.recordSuccess(); + // FIXME handle response ? do not retry those ? + // upload.body().success()) + return true; + } else if (upload.code() == 429) { + breaker.open(); + } else { + breaker.recordFailure(); + } + } catch (Exception e) { + breaker.recordException(e); } - } - - return false; } - - int status = response.code(); - if (is5xx(status)) { - client.log.print( - DEBUG, "Could not upload batch %s due to server error. Retrying.", batch.sequence()); - return true; - } else if (status == 429) { - client.log.print( - DEBUG, "Could not upload batch %s due to rate limiting. Retrying.", batch.sequence()); - return true; + return false; } + } + + static class UploadBatchTask extends UploadTask { - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); - notifyCallbacksWithException(batch, new IOException(response.errorBody().string())); + final Batch batch; + final FallbackAppender fallback; + + UploadBatchTask(final CircuitBreaker breaker, final SegmentService service, final HttpUrl uploadUrl, final Batch batch + , FallbackAppender fallback) { + super(breaker, service, uploadUrl); + this.batch = batch; + this.fallback = fallback; + } - return false; - } catch (IOException error) { - client.log.print(DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence()); + @Override + public void run() { + if (!upload(() -> service.upload(uploadUrl, batch).execute())) { + fallback.add(batch); + } + } + } - return true; - } catch (Exception exception) { - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); + static class UploadFileTask extends UploadTask { + final Path path; - notifyCallbacksWithException(batch, exception); + static final MediaType JSON = MediaType.get("application/json"); - return false; - } + UploadFileTask(final CircuitBreaker breaker, final SegmentService service, final HttpUrl uploadUrl, final Path path) { + super(breaker, service, uploadUrl); + this.path = path; } @Override public void run() { - int attempt = 0; - for (; attempt <= maxRetries; attempt++) { - boolean retry = upload(); - if (!retry) return; - try { - backo.sleep(attempt); - } catch (InterruptedException e) { - client.log.print( - DEBUG, "Thread interrupted while backing off for batch %s.", batch.sequence()); - return; - } + if (upload(() -> service.upload(uploadUrl, RequestBody.create(path.toFile(), JSON)).execute())) { + try { + Files.delete(path); + } catch (IOException e) { + // will attempt to submit again (rename file?) + LOGGER.log(Level.WARNING, "Cannot delete file " + path, e); + } } - - client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); - notifyCallbacksWithException( - batch, new IOException(Integer.toString(attempt) + " retries exhausted")); } + } - private static boolean is5xx(int status) { - return status >= 500 && status < 600; - } + public void resubmit(Path path) { + networkExecutor.submit(new UploadFileTask(breaker, service, uploadUrl, path)); } public static class BatchUtility { - /** - * Method to determine what is the expected default size of the batch regardless of messages - * - *

Sample batch: - * {"batch":[{"type":"alias","messageId":"fc9198f9-d827-47fb-96c8-095bd3405d93","timestamp":"Nov - * 18, 2021, 2:45:07 - * PM","userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021, 2:45:07 - * PM","userId":"jorgen25", - * "integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021, 2:45:07 PM", - * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, 2:45:07 PM", - * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"}], - * "sentAt":"Nov 18, 2021, 2:45:07 PM","context":{"library":{"name":"analytics-java", - * "version":"3.1.3"}},"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - * - *

total size of batch : 932 - * - *

BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, yyyy, HH:mm:ss - * tt","context":CONTEXT,"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - * - *

so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object size = 55 in - * this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence characters) + - * extra chars (these are chars like "batch":[] or "context": etc and will be pretty much the - * same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72 actually, char 73 is the - * sequence digit which we account for in point 5) 4 -Commas between each message, the total - * number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886 because the hour - * in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence Number increments - * with every batch created - * - *

so formulae to determine the expected default size of the batch is - * - * @return: defaultSize = messages size + context size + metadata size + comma number + sequence - * digits + writekey + buffer - * @return - */ private static int getBatchDefaultSize(int contextSize, int currentMessageNumber) { - // sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss tt","context":,"sequence":1, - // "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119 + // sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss + // tt","context":,"sequence":1, + // "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119 // Don't need to squeeze everything possible into a batch, adding a buffer int metadataExtraCharsSize = 119 + 1024; int commaNumber = currentMessageNumber - 1; - return contextSize - + metadataExtraCharsSize - + commaNumber - + String.valueOf(Integer.MAX_VALUE).length(); + return contextSize + metadataExtraCharsSize + commaNumber + String.valueOf(Integer.MAX_VALUE).length(); } } } diff --git a/analytics/src/main/java/com/segment/analytics/internal/Config.java b/analytics/src/main/java/com/segment/analytics/internal/Config.java new file mode 100644 index 00000000..5f0a5ad0 --- /dev/null +++ b/analytics/src/main/java/com/segment/analytics/internal/Config.java @@ -0,0 +1,237 @@ +package com.segment.analytics.internal; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.TimeUnit; +import okhttp3.Dispatcher; +import okhttp3.OkHttpClient; + +public class Config { + + // from SegmentQueue + public static final int DEFAULT_HTTP_QUEUE_SIZE = 250; // analytics-java Integer.MAX_VALUE; + public static final int DEFAULT_HTTP_QUEUE_FLUSH = 50; // analytics-java 250; + public static final int DEFAULT_HTTP_QUEUE_FLUSH_MS = 10 * 1000; + + public static final int DEFAULT_HTTP_EXECUTOR_SIZE = 1; + public static final int DEFAULT_HTTP_EXECUTOR_QUEUE_SIZE = 0; // SegmentQueue 5; + + public static final int DEFAULT_HTTP_TIMEOUT_SECONDS = 15; + + public static final int DEFAULT_HTTP_CIRCUIT_ERRORS_IN_A_MINUTE = 10; + public static final int DEFAULT_HTTP_CIRCUIT_SECONDS_IN_OPEN = 30; + public static final int DEFAULT_HTTP_CIRCUIT_REQUESTS_TO_CLOSE = 1; + + // FALLBACK + public static final int DEFAULT_FALLBACK_QUEUE_SIZE = 250; + public static final int DEFAULT_FALLBACK_QUEUE_FLUSH_SIZE = 50; + public static final int DEFAULT_FALLBACK_QUEUE_FLUSH_MS = 2_000; + public static final int DEFAULT_FALLBACK_ROLLOVER_TIMEOUT_SECONDS = 60; + public static final String DEFAULT_FALLBACK_FILE = "pending"; + + public static ThreadFactory defaultThreadFactory() { + return new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + return new Thread(new Runnable() { + @Override + public void run() { + Thread.currentThread().setPriority(Thread.MIN_PRIORITY); + r.run(); + } + }); + } + }; + } + + public static class HttpConfig { + final int queueSize; + final int flushQueueSize; + final long flushIntervalInMillis; + + final int circuitErrorsInAMinute; + final int circuitSecondsInOpen; + final int circuitRequestToClose; + + final ExecutorService executor; + public OkHttpClient client; // Analytics touch the instance + + private HttpConfig(Builder builder) { + this.queueSize = builder.queueSize; + this.flushQueueSize = builder.flushQueueSize; + this.flushIntervalInMillis = builder.flushIntervalInMillis; + this.circuitErrorsInAMinute = builder.circuitErrorsInAMinute; + this.circuitSecondsInOpen = builder.circuitSecondsInOpen; + this.circuitRequestToClose = builder.circuitRequestToClose; + + this.executor = new ThreadPoolExecutor( + builder.executorSize, + builder.executorSize, + 15, + TimeUnit.SECONDS, + builder.executorQueueSize == 0 + ? new SynchronousQueue<>(true) + : new ArrayBlockingQueue<>(builder.executorQueueSize, true), + // this will cause the HTTP requests to be handled on AnalyticsClient.Looper + // SegmentQueue was discarding oldest tasks // e.getQueue().poll(); e.execute(r); + new CallerRunsPolicy() { + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + // System.err.println("==== NetowrkPool exhausted, running in %s + // ====".formatted(Thread.currentThread().getName())); + super.rejectedExecution(r, e); + } + }); + + this.client = new OkHttpClient.Builder() + .connectTimeout(builder.timeoutSeconds, TimeUnit.SECONDS) + .readTimeout(builder.timeoutSeconds, TimeUnit.SECONDS) + .writeTimeout(builder.timeoutSeconds, TimeUnit.SECONDS) + // use same executor + .dispatcher(new Dispatcher(this.executor)) + .build(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private int queueSize = DEFAULT_HTTP_QUEUE_SIZE; + private int flushQueueSize = DEFAULT_HTTP_QUEUE_FLUSH; + private long flushIntervalInMillis = DEFAULT_HTTP_QUEUE_FLUSH_MS; + + private int circuitErrorsInAMinute = DEFAULT_HTTP_CIRCUIT_ERRORS_IN_A_MINUTE; + private int circuitSecondsInOpen = DEFAULT_HTTP_CIRCUIT_SECONDS_IN_OPEN; + private int circuitRequestToClose = DEFAULT_HTTP_CIRCUIT_REQUESTS_TO_CLOSE; + + private int executorSize = DEFAULT_HTTP_EXECUTOR_SIZE; + private int executorQueueSize = DEFAULT_HTTP_EXECUTOR_QUEUE_SIZE; + private int timeoutSeconds = DEFAULT_HTTP_TIMEOUT_SECONDS; + + public Builder queueSize(int value) { + if (value <= 0) { + throw new IllegalArgumentException("queueSize should be positive."); + } + this.queueSize = value; + return this; + } + + public Builder flushQueueSize(int value) { + if (value < 1) { + throw new IllegalArgumentException("flushQueueSize must not be less than 1."); + } + this.flushQueueSize = value; + return this; + } + + public Builder flushIntervalInMillis(long value) { + if (value < 1000) { + throw new IllegalArgumentException("flushIntervalInMillis must not be less than 1 second."); + } + + this.flushIntervalInMillis = value; + return this; + } + + public Builder circuitErrorsInAMinute(int value) { + this.circuitErrorsInAMinute = value; + return this; + } + + public Builder circuitSecondsInOpen(int value) { + this.circuitSecondsInOpen = value; + return this; + } + + public Builder circuitRequestToClose(int value) { + this.circuitRequestToClose = value; + return this; + } + + public Builder executorSize(int value) { + this.executorSize = value; + return this; + } + + public Builder executorQueueSize(int value) { + this.executorQueueSize = value; + return this; + } + + public Builder timeoutSeconds(int value) { + this.timeoutSeconds = value; + return this; + } + + public HttpConfig build() { + return new HttpConfig(this); + } + } + } + + public static class FileConfig { + /** size of the queue waiting to be written to file */ + final int size; + /** batch size to flush messages to file*/ + final int flushSize; + /** max milliseconds without a flush messages to file*/ + final int flushMs; + /** path to save pending messages */ + final String filePath; + /** max time to keep a open overflow file before finish the batch */ + final int rolloverTimeoutSeconds; + + private FileConfig(Builder builder) { + this.size = builder.size; + this.flushSize = builder.flushSize; + this.flushMs = builder.flushMs; + this.filePath = builder.filePath; + this.rolloverTimeoutSeconds = builder.rolloverTimeoutSeconds; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private int size = DEFAULT_FALLBACK_QUEUE_SIZE; + private int flushSize = DEFAULT_FALLBACK_QUEUE_FLUSH_SIZE; + private int flushMs = DEFAULT_FALLBACK_QUEUE_FLUSH_MS; + private String filePath = DEFAULT_FALLBACK_FILE; + private int rolloverTimeoutSeconds = DEFAULT_FALLBACK_ROLLOVER_TIMEOUT_SECONDS; + + public Builder size(int value) { + this.size = value; + return this; + } + + public Builder flushSize(int value) { + this.flushSize = value; + return this; + } + + public Builder flushMs(int value) { + this.flushMs = value; + return this; + } + + public Builder filePath(String value) { + this.filePath = value; + return this; + } + + public Builder rolloverTimeoutSeconds(int value) { + this.rolloverTimeoutSeconds = value; + return this; + } + + public FileConfig build() { + return new FileConfig(this); + } + } + } +} diff --git a/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java new file mode 100644 index 00000000..202d8fe4 --- /dev/null +++ b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java @@ -0,0 +1,246 @@ +package com.segment.analytics.internal; + +import com.google.gson.Gson; +import com.segment.analytics.internal.Config.FileConfig; +import com.segment.analytics.messages.Batch; +import com.segment.analytics.messages.Message; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Writer; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class FallbackAppender implements Closeable { + + private static final Logger LOGGER = Logger.getLogger(FallbackAppender.class.getName()); + + public static final String TMP_EXTENSION = ".tmp"; + + /** + * Our servers only accept batches < 500KB. This limit is 475KB to account for + * extra data that is not present in payloads themselves, but is added later, + * such as `sentAt`, `integrations` and other json tokens. + */ + private static final long MAX_BATCH_SIZE = 475_000; // 475KB. + + private Path currentFile; + private Instant currentStart; + private final Path directory; + private final Gson gson; + private final FileConfig config; + private final BlockingQueue queue; + private final Thread writer; + + /** + * @throws IOException if cannot create the configured filePath directory + */ + public FallbackAppender(Gson gson, ThreadFactory threadFactory, FileConfig config) throws IOException { + this.gson = gson; + this.config = config; + this.directory = Files.createDirectories(Path.of(config.filePath)); + + rollover(); + + this.queue = new ArrayBlockingQueue(config.size); + this.writer = threadFactory.newThread(new FileWriter()); + this.writer.setName(FallbackAppender.class.getSimpleName()); + this.writer.start(); + } + + /** Ends the currentFile and start a new one */ + private void rollover() { + String fileName; + if (currentFile != null) { + fileName = currentFile.getFileName().toString(); + try { + Files.move( + currentFile, + currentFile.resolveSibling(fileName.substring(0, fileName.length() - TMP_EXTENSION.length())), + StandardCopyOption.ATOMIC_MOVE); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Cannot rollover " + fileName, e); + } + } + + currentStart = Instant.now(); + fileName = String.format("%s-%s%s", currentStart.toEpochMilli(), UUID.randomUUID(), TMP_EXTENSION); + this.currentFile = directory.resolve(fileName); + LOGGER.log(Level.FINE, "currentFile : {0}", fileName); + } + + @Override + public void close() { + writer.interrupt(); + } + + /** Write a new file with the content of a batch */ + public void add(Batch batch) { + String fileName = String.format("%s-%s", batch.sentAt().getTime(), UUID.randomUUID()); + Path path = directory.resolve(fileName + TMP_EXTENSION); + + try (FileChannel fileChannel = FileChannel.open( + path, StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE_NEW); + Writer w = Channels.newWriter(fileChannel, StandardCharsets.UTF_8)) { + + saveBatch(batch, w); + + // TODO fileChannel.force(true); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Cannot write file batch file " + fileName, e); + } + + try { + Files.move(path, path.resolveSibling(fileName), StandardCopyOption.ATOMIC_MOVE); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Cannot move file batch file " + fileName, e); + } + } + + /** + * Add elements to be persisted. Called on httpQueue overflow + *

+ * This operation may block the calling thread + *

+ */ + public void add(Message msg) { + try { + LOGGER.log(Level.FINEST, "adding to fallback {0}", msg.messageId()); + queue.put(msg); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + class FileWriter implements Runnable { + @Override + public void run() { + final List batch = new ArrayList<>(config.flushSize); + while (!Thread.currentThread().isInterrupted()) { + try { + + if (Duration.between(currentStart, Instant.now()).getSeconds() > config.rolloverTimeoutSeconds + && currentFile.toFile().exists()) { + endCurrentFile(); + rollover(); + } + + final Message msg = queue.poll(config.flushMs, TimeUnit.MILLISECONDS); + if (msg == null) { + if (!batch.isEmpty()) { + write(batch); + } + } else { + batch.add(msg); + if (batch.size() >= config.flushSize) { + write(batch); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (!batch.isEmpty()) { + write(batch); + } + } + } + + private static final byte[] BATCH_BEGIN = "{\"batch\":[".getBytes(StandardCharsets.UTF_8); + private static final byte[] COMMA = ",".getBytes(StandardCharsets.UTF_8); + private static final byte[] BATCH_END = + "],\"sentAt\":\"2023-04-19T04:03:46.880Z\",\"writeKey\":\"mywrite\"}".getBytes(StandardCharsets.UTF_8); + // FIXME DateTimeUtils + // FIXME mywrite + + private void write(List batch) { + List remaining = writeInternal(batch); + while (!remaining.isEmpty()) { + rollover(); + remaining = writeInternal(remaining); + } + + batch.clear(); + } + + /** @return messages that do not fit in the current file */ + private List writeInternal(List batch) { + try (FileChannel fileChannel = FileChannel.open( + currentFile, StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + OutputStream os = Channels.newOutputStream(fileChannel)) { + + long currentFileSize = fileChannel.size(); + boolean first = currentFileSize == 0; + if (first) { + os.write(BATCH_BEGIN); + } + + for (int i = 0; i < batch.size(); i++) { + Message msg = batch.get(i); + byte[] msgBytes = toJson(msg).getBytes(StandardCharsets.UTF_8); + if (msgBytes.length + currentFileSize + COMMA.length + BATCH_END.length > MAX_BATCH_SIZE) { + os.write(BATCH_END); + // TODO fileChannel.force(true); + + return batch.subList(i, batch.size()); + } + + if (first) { + first = false; + } else { + os.write(COMMA); + } + os.write(msgBytes); + } + // TODO fileChannel.force(true); + return Collections.emptyList(); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "write file " + currentFile, e); + return Collections.emptyList(); + } + } + + private void endCurrentFile() { + try (FileChannel fileChannel = FileChannel.open( + currentFile, StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + OutputStream os = Channels.newOutputStream(fileChannel)) { + os.write(BATCH_END); + // TODO fileChannel.force(true); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "write file " + currentFile, e); + } + } + + private String toJson(final Object msg) { + try { + return gson.toJson(msg); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void saveBatch(final Batch batch, Writer file) { + try { + gson.toJson(batch, file); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java b/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java deleted file mode 100644 index b3ee9dc2..00000000 --- a/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.segment.analytics.internal; - -import com.segment.analytics.messages.Message; -import java.util.Date; -import java.util.Map; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -class FlushMessage implements Message { - static final FlushMessage POISON = new FlushMessage(); - - private FlushMessage() {} - - @Nonnull - @Override - public Type type() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public String messageId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Date sentAt() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public Date timestamp() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map context() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String anonymousId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String userId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map integrations() { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() { - return "FlushMessage{}"; - } -} diff --git a/analytics/src/main/java/com/segment/analytics/internal/ResubmitCheck.java b/analytics/src/main/java/com/segment/analytics/internal/ResubmitCheck.java new file mode 100644 index 00000000..f1fe6485 --- /dev/null +++ b/analytics/src/main/java/com/segment/analytics/internal/ResubmitCheck.java @@ -0,0 +1,60 @@ +package com.segment.analytics.internal; + +import com.segment.analytics.internal.Config.FileConfig; +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +// TODO this class, somehow, should keep track of the number of retries. All the filenames start with the creation time +public class ResubmitCheck implements Closeable, Runnable { + private static final Logger LOGGER = Logger.getLogger(ResubmitCheck.class.getName()); + + private final Path directory; + private final AnalyticsClient client; + private final ScheduledExecutorService executor; + + public ResubmitCheck(ThreadFactory threadFactory, FileConfig config, AnalyticsClient client) { + this.directory = Path.of(config.filePath); + this.client = client; + this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = threadFactory.newThread(r); + t.setName(ResubmitCheck.class.getSimpleName()); + return t; + } + }); + this.executor.scheduleWithFixedDelay(this, 0, 1, TimeUnit.MINUTES); + } + + @Override + public void close() { + executor.shutdownNow(); + } + + @Override + public void run() { + try (DirectoryStream files = Files.newDirectoryStream(directory, entry -> Files.isRegularFile(entry) + && !entry.getFileName().toString().endsWith(FallbackAppender.TMP_EXTENSION))) { + Iterator fileIterator = files.iterator(); + while (fileIterator.hasNext()) { + Path file = fileIterator.next(); + + // FIXME filter by instance + client.resubmit(file); + // FIXME use calling thread policy executor + } + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Cannot list directory " + directory, e); + } + } +} diff --git a/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java b/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java deleted file mode 100644 index eccd278c..00000000 --- a/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.segment.analytics.internal; - -import com.segment.analytics.messages.Message; -import java.util.Date; -import java.util.Map; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -class StopMessage implements Message { - static final StopMessage STOP = new StopMessage(); - - private StopMessage() {} - - @Nonnull - @Override - public Type type() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public String messageId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Date sentAt() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public Date timestamp() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map context() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String anonymousId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String userId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map integrations() { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() { - return "StopMessage{}"; - } -} diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java deleted file mode 100644 index 31596e90..00000000 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java +++ /dev/null @@ -1,463 +0,0 @@ -package com.segment.analytics; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -import com.google.gson.GsonBuilder; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -public class AnalyticsBuilderTest { - Analytics.Builder builder; - - @Before - public void setUp() { - builder = Analytics.builder("foo"); - } - - @Test - public void nullWriteKey() { - try { - builder = Analytics.builder(null); - fail("Should fail for null writeKey"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("writeKey cannot be null or empty."); - } - } - - @Test - public void emptyWriteKey() { - try { - builder = Analytics.builder(""); - fail("Should fail for empty writeKey"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("writeKey cannot be null or empty."); - } - - try { - builder = Analytics.builder(" "); - fail("Should fail for empty writeKey"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("writeKey cannot be null or empty."); - } - } - - @Test - public void nullUserAgent() { - try { - builder.userAgent(null); - fail("Should fail for null userAgent"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("userAgent cannot be null or empty."); - } - } - - @Test - public void emptyUserAgent() { - try { - builder.userAgent(""); - fail("Should fail for empty userAgent"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("userAgent cannot be null or empty."); - } - - try { - builder.userAgent(" "); - fail("Should fail for empty userAgent"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("userAgent cannot be null or empty."); - } - } - - @Test - public void nullClient() { - try { - builder.client(null); - fail("Should fail for null client"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null client"); - } - } - - @Test - public void nullLog() { - try { - builder.log(null); - fail("Should fail for null log"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null log"); - } - } - - @Test - public void invalidRetryCount() { - try { - builder.retries(0); - fail("Should fail for retries less than 1"); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessage("retries must be at least 1"); - } - - try { - builder.retries(-1); - fail("Should fail for retries less than 1"); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessage("retries must be at least 1"); - } - } - - @Test - public void nullTransformer() { - try { - builder.messageTransformer(null); - fail("Should fail for null transformer"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null transformer"); - } - } - - @Test - public void duplicateTransformer() { - MessageTransformer transformer = mock(MessageTransformer.class); - try { - builder.messageTransformer(transformer).messageTransformer(transformer); - fail("Should fail for duplicate transformer"); - } catch (IllegalStateException e) { - assertThat(e).hasMessage("MessageTransformer is already registered."); - } - } - - @Test - public void buildsWithValidTransformer() { - Analytics analytics = builder.messageTransformer(mock(MessageTransformer.class)).build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void nullInterceptor() { - try { - builder.messageInterceptor(null); - fail("Should fail for null interceptor"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null interceptor"); - } - } - - @Test - public void duplicateInterceptor() { - MessageInterceptor interceptor = mock(MessageInterceptor.class); - try { - builder.messageInterceptor(interceptor).messageInterceptor(interceptor); - fail("Should fail for duplicate interceptor"); - } catch (IllegalStateException e) { - assertThat(e).hasMessage("MessageInterceptor is already registered."); - } - } - - @Test - public void buildsWithValidInterceptor() { - Analytics analytics = builder.messageInterceptor(mock(MessageInterceptor.class)).build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void nullGsonBuilder() { - try { - builder.gsonBuilder(null); - fail("Should fail for null gsonBuilder"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null gsonBuilder"); - } - } - - @Test - public void duplicateGsonBuilder() { - GsonBuilder gsonBuilder = new GsonBuilder(); - try { - builder.gsonBuilder(gsonBuilder).gsonBuilder(gsonBuilder); - fail("Should fail for duplicate gsonBuilder"); - } catch (IllegalStateException e) { - assertThat(e).hasMessage("gsonBuilder is already registered."); - } - } - - @Test - public void buildsWithValidGsonBuilder() { - GsonBuilder gsonBuilder = new GsonBuilder(); - Analytics analytics = builder.gsonBuilder(gsonBuilder).build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void invalidFlushQueueSize() { - try { - builder.flushQueueSize(0); - fail("Should fail for non positive flushQueueSize"); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessage("flushQueueSize must not be less than 1."); - } - - try { - builder.flushQueueSize(-1); - fail("Should fail for non positive flushQueueSize"); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessage("flushQueueSize must not be less than 1."); - } - } - - @Test - public void buildsWithValidFlushQueueSize() { - Analytics analytics = builder.flushQueueSize(1).build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void invalidFlushInterval() { - try { - builder.flushInterval(-1, TimeUnit.SECONDS); - fail("Should fail for negative flushInterval"); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessage("flushInterval must not be less than 1 second."); - } - - try { - builder.flushInterval(500, TimeUnit.MILLISECONDS); - fail("Should fail for flushInterval < 1 second"); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessage("flushInterval must not be less than 1 second."); - } - - // Exercise a bug where we only checked the number passed without converting to milliseconds - try { - builder.flushInterval(2000, TimeUnit.NANOSECONDS); - fail("Should fail for flushInterval < 1 second"); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessage("flushInterval must not be less than 1 second."); - } - } - - @Test - public void buildsWithValidFlushInterval() { - Analytics analytics = builder.flushInterval(2, TimeUnit.SECONDS).build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void nullNetworkExecutor() { - try { - builder.networkExecutor(null); - fail("Should fail for null network executor"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null networkExecutor"); - } - } - - @Test - public void buildsWithValidNetworkExecutor() { - Analytics analytics = builder.networkExecutor(mock(ExecutorService.class)).build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void nullEndpoint() { - try { - builder.endpoint(null); - fail("Should fail for null endpoint"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("endpoint cannot be null or empty."); - } - } - - @Test - public void emptyEndpoint() { - try { - builder.endpoint(""); - fail("Should fail for empty endpoint"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("endpoint cannot be null or empty."); - } - - try { - builder.endpoint(" "); - fail("Should fail for empty endpoint"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("endpoint cannot be null or empty."); - } - } - - @Test - public void buildsWithValidEndpoint() { - Analytics analytics = builder.endpoint("https://api.segment.io").build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void buildsCorrectEndpoint() { - builder.endpoint("https://api.segment.io"); - String expectedURL = "https://api.segment.io/v1/import/"; - assertEquals(expectedURL, builder.endpoint.toString()); - } - - @Test - public void buildsWithValidUploadURL() { - Analytics analytics = builder.setUploadURL("https://example.com/v2/batch/").build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void buildsCorrectEndpointWithUploadURL() { - builder.setUploadURL("https://dummy.url/api/v1/segment/").build(); - String expectedURL = "https://dummy.url/api/v1/segment/"; - assertEquals(expectedURL, builder.endpoint.toString()); - } - - @Test - public void shouldPrioritizeUploadURLOverEndpoint() { - builder - .endpoint("this wont be set anyway") - .setUploadURL("https://dummy.url/api/v1/segment/") - .build(); - String expectedURL = "https://dummy.url/api/v1/segment/"; - - assertEquals(expectedURL, builder.uploadURL.toString()); - assertNotEquals("this wont be set anyway", builder.endpoint.toString()); - } - - @Test - public void buildsCorrectURLWithUploadURL() { - builder.setUploadURL("https://example.com/v2/batch/").build(); - String expectedURL = "https://example.com/v2/batch/"; - assertEquals(expectedURL, builder.uploadURL.toString()); - } - - @Test - public void nullHostAndPrefixEndpoint() { - try { - builder.setUploadURL(null); - fail("Should fail for null endpoint"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Upload URL cannot be null or empty."); - } - } - - @Test - public void emptyHostAndPrefixEndpoint() { - try { - builder.setUploadURL(""); - fail("Should fail for empty endpoint"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Upload URL cannot be null or empty."); - } - - try { - builder.setUploadURL(" "); - fail("Should fail for empty endpoint"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Upload URL cannot be null or empty."); - } - } - - @Test - public void nullThreadFactory() { - try { - builder.threadFactory(null); - fail("Should fail for null thread factory"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null threadFactory"); - } - } - - @Test - public void buildsWithThreadFactory() { - Analytics analytics = builder.threadFactory(mock(ThreadFactory.class)).build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void nullCallback() { - try { - builder.callback(null); - fail("Should fail for null callback"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null callback"); - } - } - - @Test - public void duplicateCallback() { - Callback callback = mock(Callback.class); - try { - builder.callback(callback).callback(callback); - } catch (IllegalStateException e) { - assertThat(e).hasMessage("Callback is already registered."); - } - } - - @Test - public void buildsWithValidCallback() { - Analytics analytics = builder.callback(mock(Callback.class)).build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void buildsWithForceTlsV1() { - Analytics analytics = builder.forceTlsVersion1().build(); - assertThat(analytics).isNotNull(); - } - - @Test - public void multipleCallbacks() { - Analytics analytics = - builder.callback(mock(Callback.class)).callback(mock(Callback.class)).build(); - - assertThat(analytics).isNotNull(); - } - - @Test - public void nullPlugin() { - try { - builder.plugin(null); - fail("Should fail for null plugin"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null plugin"); - } - } - - @Test - public void pluginCanConfigure() { - Plugin plugin = Mockito.mock(Plugin.class); - builder.plugin(plugin); - verify(plugin).configure(builder); - } - - @Test - public void invalidQueueCapacity() { - try { - builder.queueCapacity(0); - fail("Should fail when queue capacity is 0"); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessage("capacity should be positive."); - } - - try { - builder.queueCapacity(-1); - fail("Should fail when queue capacity is -1"); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessage("capacity should be positive."); - } - } - - @Test - public void buildWithQueueCapacity() { - Analytics analytics = builder.queueCapacity(10).build(); - assertThat(analytics).isNotNull(); - } -} diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java deleted file mode 100644 index 8be3012e..00000000 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.segment.analytics; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - -import com.segment.analytics.TestUtils.MessageBuilderTest; -import com.segment.analytics.internal.AnalyticsClient; -import com.segment.analytics.messages.Message; -import com.segment.analytics.messages.MessageBuilder; -import com.squareup.burst.BurstJUnit4; -import java.lang.reflect.Field; -import java.time.LocalDateTime; -import java.time.temporal.ChronoUnit; -import java.util.Collections; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; - -@RunWith(BurstJUnit4.class) -public class AnalyticsTest { - @Mock AnalyticsClient client; - @Mock Log log; - @Mock MessageTransformer messageTransformer; - @Mock MessageInterceptor messageInterceptor; - Analytics analytics; - - @Before - public void setUp() { - initMocks(this); - - analytics = - new Analytics( - client, - Collections.singletonList(messageTransformer), - Collections.singletonList(messageInterceptor), - log); - } - - @Test - public void enqueueIsDispatched(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("prateek"); - Message message = messageBuilder.build(); - when(messageTransformer.transform(messageBuilder)).thenReturn(true); - when(messageInterceptor.intercept(any(Message.class))).thenReturn(message); - - analytics.enqueue(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor).intercept(any(Message.class)); - verify(client).enqueue(message); - } - - @Test - public void doesNotEnqueueWhenTransformerReturnsFalse(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("prateek"); - when(messageTransformer.transform(messageBuilder)).thenReturn(false); - - analytics.enqueue(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor, never()).intercept(any(Message.class)); - verify(client, never()).enqueue(any(Message.class)); - } - - @Test - public void doesNotEnqueueWhenInterceptorReturnsNull(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("prateek"); - when(messageTransformer.transform(messageBuilder)).thenReturn(true); - - analytics.enqueue(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor).intercept(any(Message.class)); - verify(client, never()).enqueue(any(Message.class)); - } - - @Test - public void shutdownIsDispatched() { - analytics.shutdown(); - - verify(client).shutdown(); - } - - @Test - public void flushIsDispatched() { - analytics.flush(); - - verify(client).flush(); - } - - @Test - public void offerIsDispatched(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("dummy"); - Message message = messageBuilder.build(); - when(messageTransformer.transform(messageBuilder)).thenReturn(true); - when(messageInterceptor.intercept(any(Message.class))).thenReturn(message); - - analytics.offer(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor).intercept(any(Message.class)); - verify(client).offer(message); - } - - @Test - public void threadSafeTest(MessageBuilderTest builder) - throws NoSuchFieldException, IllegalAccessException, InterruptedException { - // we want to test if msgs get lost during a multithreaded env - Analytics analytics = Analytics.builder("testWriteKeyForIssue321").build(); - // So we just want to spy on the client of an Analytics object created normally - Field clientField = analytics.getClass().getDeclaredField("client"); - clientField.setAccessible(true); - AnalyticsClient spy = spy((AnalyticsClient) clientField.get(analytics)); - clientField.set(analytics, spy); - - // we are going to run this test for a specific amount of seconds - int millisRunning = 200; - LocalDateTime initialTime = LocalDateTime.now(); - LocalDateTime now; - - // and a set number of threads will be using the library - ExecutorService service = Executors.newFixedThreadPool(20); - AtomicInteger counter = new AtomicInteger(); - - MessageBuilder messageBuilder = builder.get().userId("jorgen25"); - - do { - service.submit( - () -> { - analytics.enqueue(messageBuilder); - counter.incrementAndGet(); - }); - now = LocalDateTime.now(); - } while (initialTime.until(now, ChronoUnit.MILLIS) < millisRunning); - - service.shutdown(); - while (!service.isShutdown() || !service.isTerminated()) {} - - verify(spy, times(counter.get())).enqueue(any(Message.class)); - } -} diff --git a/analytics/src/test/java/com/segment/analytics/SegmentTest.java b/analytics/src/test/java/com/segment/analytics/SegmentTest.java new file mode 100644 index 00000000..768e6331 --- /dev/null +++ b/analytics/src/test/java/com/segment/analytics/SegmentTest.java @@ -0,0 +1,214 @@ +package com.segment.analytics; + +import static com.github.tomakehurst.wiremock.client.WireMock.okJson; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; + +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.github.tomakehurst.wiremock.stubbing.ServeEvent; +import com.segment.analytics.internal.Config; +import com.segment.analytics.internal.Config.FileConfig; +import com.segment.analytics.internal.Config.HttpConfig; +import com.segment.analytics.messages.TrackMessage; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import wiremock.com.fasterxml.jackson.core.JsonProcessingException; +import wiremock.com.fasterxml.jackson.databind.JsonNode; +import wiremock.com.fasterxml.jackson.databind.ObjectMapper; +import wiremock.com.google.common.util.concurrent.RateLimiter; + +public class SegmentTest { + + public int requestsPerSecond = 1_000; + public int numClients = 10; + public int messageContentChars = 100; + public int responseDelay = 300; + + @Rule + public WireMockRule wireMockRule = + new WireMockRule(wireMockConfig().port(8088).gzipDisabled(true), false); + + Analytics analytics; + + @Before + public void setup() throws IOException { + FileUtils.deleteDirectory(Path.of(Config.DEFAULT_FALLBACK_FILE).toFile()); + + analytics = Analytics.builder("write-key") + .endpoint(wireMockRule.baseUrl()) + .httpConfig(HttpConfig.builder() + // .queueSize(250) + // .flushQueueSize(50) + // .flushIntervalInMillis(10 * 1_000) + // .executorSize(1) + // .executorQueueSize(0) + // .timeoutSeconds(15) + .build()) + .fileConfig(FileConfig.builder() + // .size(250) + // .flushSize(50) + .build()) + .build(); + } + + + @Test + public void testOk() throws Throwable { + run(Duration.ofSeconds(30), new TimedAction(Duration.ZERO, () -> segmentHttpUp())); + } + + @Test + public void testFailRestoreAtEnd() throws Throwable { + Duration duration = Duration.ofSeconds(30); + run( + duration, + new TimedAction(Duration.ZERO, () -> segmentHttpDown()), + new TimedAction(duration, () -> segmentHttpUp())); + } + + @Test + public void testFailThenRestore() throws Throwable { + run( + Duration.ofMinutes(2), + new TimedAction(Duration.ZERO, () -> segmentHttpDown()), + new TimedAction(Duration.ofMinutes(1), () -> segmentHttpUp())); + } + + private void run(Duration durationToRun, TimedAction... actions) throws Throwable { + long timeToRun = durationToRun.toMillis(); + long start = System.currentTimeMillis(); + + final String content = RandomStringUtils.randomAlphanumeric(messageContentChars); + final AtomicInteger id = new AtomicInteger(0); + + ExecutorService exec = new ThreadPoolExecutor( + numClients, + numClients, + 15l, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(10_000), + new CallerRunsPolicy()); + + RateLimiter rate = RateLimiter.create(requestsPerSecond); + int actionIndex = 0; + while (true) { + long elapsed = System.currentTimeMillis() - start; + + if (actionIndex < actions.length && actions[actionIndex].at <= elapsed) { + actions[actionIndex].action.run(); + actionIndex++; + } + + if (elapsed > timeToRun) { + break; + } + + if (rate.tryAcquire()) { + exec.submit(() -> { + String msgid = String.valueOf(id.getAndIncrement()); + analytics.enqueue(TrackMessage.builder("my-track") + .messageId(msgid) + .userId("userId") + .context(Map.of("content", content))); + }); + } + Thread.yield(); + } + + exec.shutdown(); + exec.awaitTermination(10, TimeUnit.MINUTES); + + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> checkSentMessages(id.get())); + } + + void segmentHttpUp() { + stubFor(post(urlEqualTo("/v1/import/")) + .willReturn(okJson("{\"success\": \"true\"}").withFixedDelay(responseDelay))); + System.err.println("HTTP server UP"); + } + + void segmentHttpDown() { + stubFor(post(urlEqualTo("/v1/import/")) + .willReturn( + WireMock.aResponse().withStatus(503).withBody("fail").withFixedDelay(responseDelay))); + System.err.println("HTTP server DOWN"); + } + + private static final ObjectMapper OM = new ObjectMapper(); + + private boolean checkSentMessages(int expected) { + int sentMessages = countSendMessages(); + System.err.println("Confirmed msgs %d / %d ".formatted(sentMessages, expected)); + return sentMessages >= expected; + } + + private int countSendMessages() { + int count = 0; + Set messageIds = new HashSet<>(); + for (ServeEvent event : wireMockRule.getAllServeEvents()) { + if (event.getResponse().getStatus() != 200) { + continue; + } + + JsonNode batch; + try { + JsonNode json = OM.readTree(event.getRequest().getBodyAsString()); + batch = json.get("batch"); + if (batch == null) { + continue; + } + } catch (JsonProcessingException e) { + continue; + } + Iterator msgs = batch.elements(); + while (msgs.hasNext()) { + count++; + messageIds.add(msgs.next().get("messageId").asInt()); + } + } + if (count != messageIds.size()) { + System.err.println(String.format("Duplicates!, count: %d messageIds: %d", count, messageIds.size())); + } + return messageIds.size(); + } + + @After + public void tearDown() { + analytics.close(); + } + + static class TimedAction { + final long at; + final Runnable action; + + public TimedAction(Duration at, Runnable run) { + super(); + this.at = at.toMillis(); + this.action = run; + } + } +} diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java deleted file mode 100644 index 74f04e13..00000000 --- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java +++ /dev/null @@ -1,969 +0,0 @@ -package com.segment.analytics.internal; - -import static com.segment.analytics.internal.FlushMessage.POISON; -import static com.segment.analytics.internal.StopMessage.STOP; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.openMocks; - -import com.google.gson.Gson; -import com.segment.analytics.Callback; -import com.segment.analytics.Log; -import com.segment.analytics.TestUtils.MessageBuilderTest; -import com.segment.analytics.http.SegmentService; -import com.segment.analytics.http.UploadResponse; -import com.segment.analytics.internal.AnalyticsClient.BatchUploadTask; -import com.segment.analytics.messages.Batch; -import com.segment.analytics.messages.Message; -import com.segment.analytics.messages.TrackMessage; -import com.segment.backo.Backo; -import com.squareup.burst.BurstJUnit4; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Queue; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import okhttp3.ResponseBody; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatcher; -import org.mockito.Mock; -import org.mockito.Spy; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import retrofit2.Call; -import retrofit2.Response; -import retrofit2.mock.Calls; - -@RunWith(BurstJUnit4.class) // -public class AnalyticsClientTest { - // Backo instance for testing which trims down the wait times. - private static final Backo BACKO = - Backo.builder().base(TimeUnit.NANOSECONDS, 1).factor(1).build(); - - private int DEFAULT_RETRIES = 10; - private int MAX_BATCH_SIZE = 1024 * 500; // 500kb - private int MAX_MSG_SIZE = 1024 * 32; // 32kb //This is the limit for a message object - private int MSG_MAX_CREATE_SIZE = - MAX_MSG_SIZE - - 200; // Once we create msg object with this size it barely below 32 threshold so good - // for tests - private static String writeKey = "writeKey"; - - Log log = Log.NONE; - - ThreadFactory threadFactory; - @Spy LinkedBlockingQueue messageQueue; - @Mock SegmentService segmentService; - @Mock ExecutorService networkExecutor; - @Mock Callback callback; - @Mock UploadResponse response; - - AtomicBoolean isShutDown; - - @Before - public void setUp() { - openMocks(this); - - isShutDown = new AtomicBoolean(false); - threadFactory = Executors.defaultThreadFactory(); - } - - // Defers loading the client until tests can initialize all required - // dependencies. - AnalyticsClient newClient() { - return new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - } - - @Test - public void enqueueAddsToQueue(MessageBuilderTest builder) throws InterruptedException { - AnalyticsClient client = newClient(); - - Message message = builder.get().userId("prateek").build(); - client.enqueue(message); - - verify(messageQueue).put(message); - } - - @Test - public void shutdown() throws InterruptedException { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - client.shutdown(); - - verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); - } - - @Test - public void flushInsertsPoison() throws InterruptedException { - AnalyticsClient client = newClient(); - - client.flush(); - - verify(messageQueue).put(FlushMessage.POISON); - } - - /** Wait until the queue is drained. */ - static void wait(Queue queue) { - // noinspection StatementWithEmptyBody - while (queue.size() > 0) {} - } - - /** - * Verify that a {@link BatchUploadTask} was submitted to the executor, and return the {@link - * BatchUploadTask#batch} it was uploading.. - */ - static Batch captureBatch(ExecutorService executor) { - final ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(executor, timeout(1000)).submit(runnableArgumentCaptor.capture()); - final BatchUploadTask task = (BatchUploadTask) runnableArgumentCaptor.getValue(); - return task.batch; - } - - private static String generateDataOfSize(int msgSize) { - char[] chars = new char[msgSize]; - Arrays.fill(chars, 'a'); - - return new String(chars); - } - - private static String generateDataOfSizeSpecialChars( - int sizeInBytes, boolean slightlyBelowLimit) { - StringBuilder builder = new StringBuilder(); - Character[] specialChars = new Character[] {'$', '¢', 'ह', '€', '한', '©', '¶'}; - int currentSize = 0; - String smileyFace = "\uD83D\uDE01"; - // 😁 = '\uD83D\uDE01'; - Random rand = new Random(); - int loopCount = 1; - while (currentSize < sizeInBytes) { - int randomNum; - // decide if regular/special character - if (loopCount > 3 && loopCount % 4 == 0) { - randomNum = rand.nextInt(((specialChars.length - 1) - 0) + 1) + 0; - builder.append(specialChars[randomNum]); - } else if (loopCount > 9 && loopCount % 10 == 0) { - builder.append(smileyFace); - } else { - // random letter from a - z - randomNum = rand.nextInt(('z' - 'a') + 1) + 'a'; - builder.append((char) randomNum); - } - - // check size so far - String temp = builder.toString(); - currentSize = temp.getBytes(StandardCharsets.UTF_8).length; - if (slightlyBelowLimit && ((sizeInBytes - currentSize) < 500)) { - break; - } - loopCount++; - } - return builder.toString(); - } - - @Test - public void flushSubmitsToExecutor() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - TrackMessage first = TrackMessage.builder("foo").userId("bar").build(); - TrackMessage second = TrackMessage.builder("qaz").userId("qux").build(); - client.enqueue(first); - client.enqueue(second); - client.flush(); - wait(messageQueue); - - assertThat(captureBatch(networkExecutor).batch()).containsExactly(first, second); - } - - @Test - public void enqueueMaxTriggersFlush() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - // Enqueuing 51 messages (> 50) should trigger flush. - for (int i = 0; i < 51; i++) { - client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); - } - wait(messageQueue); - - // Verify that the executor saw the batch. - assertThat(captureBatch(networkExecutor).batch()).hasSize(50); - } - - @Test - public void shouldBeAbleToCalculateMessageSize() { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSize(1024 * 33)); - - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - // can't test for exact size cause other attributes come in play - assertThat(client.messageSizeInBytes(bigMessage)).isGreaterThan(1024 * 33); - } - - @Test - public void dontFlushUntilReachesMaxSize() throws InterruptedException { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - - properties.put("property2", generateDataOfSize(MAX_BATCH_SIZE - 200)); - - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - // throw new InterruptedException(e.getMessage()); - } - - wait(messageQueue); - - verify(networkExecutor, never()).submit(any(Runnable.class)); - } - - /** - * Modified this test case since we are changing logic to NOT allow messages bigger than 32 kbs - * individually to be enqueued, hence had to lower the size of the generated msg here. chose - * MSG_MAX_CREATE_SIZE because it will generate a message just below the limit of 32 kb after it - * creates a Message object modified the number of events that will be created since the batch - * creation logic was also changed to not allow batches larger than 500 kb meaning every 15/16 - * events the queue will be backPressured and poisoned/flushed (3 times) (purpose of test) AND - * there will be 4 batches submitted (15 msgs, 1 msg, 15 msg, 15 msg) so purpose of test case - * stands - * - * @throws InterruptedException - */ - @Test - public void flushHowManyTimesNecessaryToStayWithinLimit() throws InterruptedException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - - properties.put("property3", generateDataOfSize(MSG_MAX_CREATE_SIZE)); - - for (int i = 0; i < 46; i++) { - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - client.enqueue(bigMessage); - verify(messageQueue).put(bigMessage); - } - - wait(messageQueue); - /** - * modified from expected 4 to expected 3 times, since we removed the inner loop. The inner loop - * was forcing to message list created from the queue to keep making batches even if its a 1 - * message batch until the message list is empty, that was forcing the code to make one last - * batch of 1 msg in size bumping the number of times a batch would be submitted from 3 to 4 - */ - verify(networkExecutor, times(3)).submit(any(Runnable.class)); - } - - /** - * Had to slightly change test case since we are now modifying the logic to NOT allow messages - * above 32 KB in size So needed to change size of generated msg to MSG_MAX_CREATE_SIZE to keep - * purpose of test case intact which is to test the scenario for several messages eventually - * filling up the queue and flushing. Batches submitted will change from 1 to 2 because the queue - * will be backpressured at 16 (at this point queue is over the 500KB batch limit so its flushed - * and when batch is created 16 will be above 500kbs limit so it creates one batch for 15 msg and - * another one for the remaining single message so 500kb limit per batch is not violated - * - * @throws InterruptedException - */ - @Test - public void flushWhenMultipleMessagesReachesMaxSize() throws InterruptedException { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - properties.put("property3", generateDataOfSize(MSG_MAX_CREATE_SIZE)); - - for (int i = 0; i < 16; i++) { - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - client.enqueue(bigMessage); - } - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - verify(networkExecutor, times(2)).submit(any(Runnable.class)); - } - - @Test - public void enqueueBeforeMaxDoesNotTriggerFlush() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - // Enqueuing 5 messages (< 50) should not trigger flush. - for (int i = 0; i < 5; i++) { - client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); - } - wait(messageQueue); - - // Verify that the executor didn't see anything. - verify(networkExecutor, never()).submit(any(Runnable.class)); - } - - static Batch batchFor(Message message) { - return Batch.create( - Collections.emptyMap(), Collections.singletonList(message), writeKey); - } - - @Test - public void batchRetriesForNetworkErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - Response successResponse = Response.success(200, response); - Response failureResponse = Response.error(429, ResponseBody.create(null, "")); - - // Throw a network error 3 times. - when(segmentService.upload(null, batch)) - .thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); - verify(callback).success(trackMessage); - } - - @Test - public void batchRetriesForHTTP5xxErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error 3 times. - - Response successResponse = Response.success(200, response); - Response failResponse = - Response.error(500, ResponseBody.create(null, "Server Error")); - when(segmentService.upload(null, batch)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); - verify(callback).success(trackMessage); - } - - @Test - public void batchRetriesForHTTP429Errors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error 3 times. - Response successResponse = Response.success(200, response); - Response failResponse = - Response.error(429, ResponseBody.create(null, "Rate Limited")); - when(segmentService.upload(null, batch)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); - verify(callback).success(trackMessage); - } - - @Test - public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error that should not be retried. - Response failResponse = - Response.error(404, ResponseBody.create(null, "Not Found")); - when(segmentService.upload(null, batch)).thenReturn(Calls.response(failResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify we only tried to upload once. - verify(segmentService).upload(null, batch); - verify(callback).failure(eq(trackMessage), any(IOException.class)); - } - - @Test - public void batchDoesNotRetryForNonNetworkErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - Call networkFailure = Calls.failure(new RuntimeException()); - when(segmentService.upload(null, batch)).thenReturn(networkFailure); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify we only tried to upload once. - verify(segmentService).upload(null, batch); - verify(callback).failure(eq(trackMessage), any(RuntimeException.class)); - } - - @Test - public void givesUpAfterMaxRetries() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(null, batch)) - .thenAnswer( - new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = - Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10); - batchUploadTask.run(); - - // DEFAULT_RETRIES == maxRetries - // tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java - verify(segmentService, times(11)).upload(null, batch); - verify(callback) - .failure( - eq(trackMessage), - argThat( - new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("11 retries exhausted"); - } - })); - } - - @Test - public void hasDefaultRetriesSetTo3() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(null, batch)) - .thenAnswer( - new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = - Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 3); - batchUploadTask.run(); - - // DEFAULT_RETRIES == maxRetries - // tries 11(one normal run + 10 retries) - verify(segmentService, times(4)).upload(null, batch); - verify(callback) - .failure( - eq(trackMessage), - argThat( - new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("4 retries exhausted"); - } - })); - } - - @Test - public void flushWhenNotShutDown() throws InterruptedException { - AnalyticsClient client = newClient(); - - client.flush(); - verify(messageQueue).put(POISON); - } - - @Test - public void flushWhenShutDown() throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.flush(); - - verify(messageQueue, times(0)).put(any(Message.class)); - } - - @Test - public void enqueueWithRegularMessageWhenNotShutdown(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient client = newClient(); - - final Message message = builder.get().userId("foo").build(); - client.enqueue(message); - - verify(messageQueue).put(message); - } - - @Test - public void enqueueWithRegularMessageWhenShutdown(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.enqueue(builder.get().userId("foo").build()); - - verify(messageQueue, times(0)).put(any(Message.class)); - } - - @Test - public void enqueueWithStopMessageWhenShutdown() throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.enqueue(STOP); - - verify(messageQueue).put(STOP); - } - - @Test - public void shutdownWhenAlreadyShutDown() throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.shutdown(); - - verify(messageQueue, times(0)).put(any(Message.class)); - verifyNoInteractions(networkExecutor, callback, segmentService); - } - - @Test - public void shutdownWithNoMessageInTheQueue() throws InterruptedException { - AnalyticsClient client = newClient(); - client.shutdown(); - - verify(messageQueue).put(STOP); - verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); - verifyNoMoreInteractions(networkExecutor); - } - - @Test - public void shutdownWithMessagesInTheQueue(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient client = newClient(); - - client.enqueue(builder.get().userId("foo").build()); - client.shutdown(); - - verify(messageQueue).put(STOP); - verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); - verify(networkExecutor).submit(any(AnalyticsClient.BatchUploadTask.class)); - } - - @Test - public void neverRetries() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(null, batch)) - .thenAnswer( - new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = - Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 0); - batchUploadTask.run(); - - // runs once but never retries - verify(segmentService, times(1)).upload(null, batch); - verify(callback) - .failure( - eq(trackMessage), - argThat( - new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("1 retries exhausted"); - } - })); - } - - /** - * ********************************************************************************************** - * Test cases for Size check - * ********************************************************************************************* - */ - - /** Individual Size check happy path regular chars */ - @Test - public void checkForIndividualMessageSizeLessThanLimit() { - AnalyticsClient client = newClient(); - int msgSize = 1024 * 31; // 31KB - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSize(msgSize)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - client.enqueue(bigMessage); - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isLessThanOrEqualTo(sizeLimit); - } - - /** Individual Size check sad path regular chars (over the limit) */ - @Test - public void checkForIndividualMessageSizeOverLimit() throws IllegalArgumentException { - AnalyticsClient client = newClient(); - int msgSize = MAX_MSG_SIZE + 1; // BARELY over the limit - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSize(msgSize)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isGreaterThan(sizeLimit); - } - - /** Individual Size check happy path special chars */ - @Test - public void checkForIndividualMessageSizeSpecialCharsLessThanLimit() { - AnalyticsClient client = newClient(); - int msgSize = MAX_MSG_SIZE; // 32KB - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - - Map properties = new HashMap(); - properties.put("property1", generateDataOfSizeSpecialChars(msgSize, true)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - client.enqueue(bigMessage); - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isLessThanOrEqualTo(sizeLimit); - } - - /** Individual Size check sad path special chars (over the limit) */ - @Test - public void checkForIndividualMessageSizeSpecialCharsAboveLimit() { - AnalyticsClient client = newClient(); - int msgSize = MAX_MSG_SIZE; // 32KB - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSizeSpecialChars(msgSize, false)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isGreaterThan(sizeLimit); - } - - /** - * ***************************************************************************************************************** - * Test cases for enqueue modified logic - * *************************************************************************************************************** - */ - @Test - public void enqueueVerifyPoisonIsNotCheckedForSize() throws InterruptedException { - AnalyticsClient clientSpy = spy(newClient()); - - clientSpy.enqueue(POISON); - verify(messageQueue).put(POISON); - verify(clientSpy, never()).messageSizeInBytes(POISON); - } - - @Test - public void enqueueVerifyStopIsNotCheckedForSize() throws InterruptedException { - AnalyticsClient clientSpy = spy(newClient()); - - clientSpy.enqueue(STOP); - verify(messageQueue).put(STOP); - verify(clientSpy, never()).messageSizeInBytes(STOP); - } - - @Test - public void enqueueVerifyRegularMessageIsEnqueuedAndCheckedForSize(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient clientSpy = spy(newClient()); - - Message message = builder.get().userId("jorgen25").build(); - clientSpy.enqueue(message); - verify(messageQueue).put(message); - verify(clientSpy, times(1)).messageSizeInBytes(message); - } - - /** - * This test case was to prove the limit in batch is not being respected so will probably delete - * it later NOTE: Used to be a test case created to prove huge messages above the limit are still - * being submitted in batch modified it to prove they are not anymore after changing logic in - * analyticsClient - * - * @param builder - * @throws InterruptedException - */ - @Test - public void enqueueSingleMessageAboveLimitWhenNotShutdown(MessageBuilderTest builder) - throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = newClient(); - - // Message is above batch limit - final String massData = generateDataOfSizeSpecialChars(MAX_MSG_SIZE, false); - Map integrationOpts = new HashMap<>(); - integrationOpts.put("massData", massData); - Message message = - builder.get().userId("foo").integrationOptions("someKey", integrationOpts).build(); - - try { - client.enqueue(message); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - wait(messageQueue); - - // Message is above MSG/BATCH size limit so it should not be put in queue - verify(messageQueue, never()).put(message); - // And since it was never in the queue, it was never submitted in batch - verify(networkExecutor, never()).submit(any(AnalyticsClient.BatchUploadTask.class)); - } - - @Test - public void enqueueVerifyRegularMessagesSpecialCharactersBelowLimit(MessageBuilderTest builder) - throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = newClient(); - int msgSize = 1024 * 18; // 18KB - - for (int i = 0; i < 2; i++) { - final String data = generateDataOfSizeSpecialChars(msgSize, true); - Map integrationOpts = new HashMap<>(); - integrationOpts.put("data", data); - Message message = - builder.get().userId("jorgen25").integrationOptions("someKey", integrationOpts).build(); - client.enqueue(message); - verify(messageQueue).put(message); - } - client.enqueue(POISON); - verify(messageQueue).put(POISON); - - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - - verify(networkExecutor, times(1)).submit(any(AnalyticsClient.BatchUploadTask.class)); - } - - /** - * ****************************************************************************************************************** - * Test cases for Batch creation logic - * **************************************************************************************************************** - */ - - /** - * Several messages are enqueued and then submitted in a batch - * - * @throws InterruptedException - */ - @Test - public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - properties.put("property3", generateDataOfSizeSpecialChars(((int) (MAX_MSG_SIZE * 0.9)), true)); - - for (int i = 0; i < 15; i++) { - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("jorgen25").properties(properties).build(); - client.enqueue(bigMessage); - verify(messageQueue).put(bigMessage); - } - client.enqueue(POISON); - wait(messageQueue); - - client.shutdown(); - while (!isShutDown.get()) {} - verify(networkExecutor, times(1)).submit(any(Runnable.class)); - } - - /** - * Enqueued several messages above threshold of 500Kbs so queue gets backpressured at some point - * and several batches have to be created to not violate threshold - * - * @throws InterruptedException - */ - @Test - public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - properties.put("property3", generateDataOfSizeSpecialChars(MAX_MSG_SIZE, true)); - - for (int i = 0; i < 100; i++) { - TrackMessage message = - TrackMessage.builder("Big Event").userId("jorgen25").properties(properties).build(); - client.enqueue(message); - verify(messageQueue).put(message); - } - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - - verify(networkExecutor, times(8)).submit(any(Runnable.class)); - } - - @Test - public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - properties.put("property3", generateDataOfSizeSpecialChars(1024 * 8, true)); - - for (int i = 0; i < 600; i++) { - TrackMessage message = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - client.enqueue(message); - verify(messageQueue).put(message); - } - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - - verify(networkExecutor, times(21)).submit(any(Runnable.class)); - } -} diff --git a/analytics/src/test/resources/logging.properties b/analytics/src/test/resources/logging.properties new file mode 100644 index 00000000..1c7c7d3a --- /dev/null +++ b/analytics/src/test/resources/logging.properties @@ -0,0 +1,7 @@ +.level=ALL + +handlers=java.util.logging.ConsoleHandler +java.util.logging.ConsoleHandler.level=ALL +java.util.logging.ConsoleHandler.filter= +java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter +java.util.logging.SimpleFormatter.format=%1$tH:%1$tM:%1$tS.%1$tL - %5$s%6$s%n \ No newline at end of file