From 24fcc00214bd43e07f7c4cc76362c1669f82ddc4 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Wed, 19 Mar 2025 15:57:34 +0100 Subject: [PATCH 01/13] fix OOM error in test --- .../src/test/java/com/segment/analytics/AnalyticsTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java index 8be3012e..ca3a8099 100644 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java +++ b/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; @@ -143,7 +144,7 @@ public void threadSafeTest(MessageBuilderTest builder) } while (initialTime.until(now, ChronoUnit.MILLIS) < millisRunning); service.shutdown(); - while (!service.isShutdown() || !service.isTerminated()) {} + service.awaitTermination(5, TimeUnit.SECONDS); verify(spy, times(counter.get())).enqueue(any(Message.class)); } From 8d814200660a2dd8ead6005749fc79e988c86045 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Thu, 20 Mar 2025 16:12:59 +0100 Subject: [PATCH 02/13] setup wiremock test --- analytics/pom.xml | 7 +++- .../com/segment/analytics/SegmentTest.java | 40 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 analytics/src/test/java/com/segment/analytics/SegmentTest.java diff --git a/analytics/pom.xml b/analytics/pom.xml index 8c80856c..bf2e9ae1 100644 --- a/analytics/pom.xml +++ b/analytics/pom.xml @@ -67,7 +67,12 @@ org.mockito mockito-core test - + + + org.wiremock + wiremock-standalone + 3.2.0 + diff --git a/analytics/src/test/java/com/segment/analytics/SegmentTest.java b/analytics/src/test/java/com/segment/analytics/SegmentTest.java new file mode 100644 index 00000000..531229d3 --- /dev/null +++ b/analytics/src/test/java/com/segment/analytics/SegmentTest.java @@ -0,0 +1,40 @@ +package com.segment.analytics; + +import static com.github.tomakehurst.wiremock.client.WireMock.okJson; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; + +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.segment.analytics.messages.TrackMessage; +import java.util.UUID; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class SegmentTest { + + @Rule + public WireMockRule wireMockRule = new WireMockRule(wireMockConfig().dynamicPort(), false); + + Analytics analytics; + + @Before + public void confWireMock() { + stubFor(post(urlEqualTo("/v1/import/")).willReturn(okJson("{\"success\": \"true\"}"))); + + analytics = Analytics.builder("write-key") + .endpoint(wireMockRule.baseUrl()) + // callback + // http client + .build(); + } + + @Test + public void test() { + analytics.enqueue(TrackMessage.builder("my-track") + .messageId(UUID.randomUUID().toString()) + .userId("userId")); + } +} From 3d232fadb3f9d87446c2ef3c5ef0335100f4422b Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Thu, 20 Mar 2025 18:16:22 +0100 Subject: [PATCH 03/13] remove --- .../java/com/segment/analytics/Analytics.java | 5 - .../analytics/internal/AnalyticsClient.java | 117 +++++------------- .../com/segment/analytics/AnalyticsTest.java | 7 -- .../internal/AnalyticsClientTest.java | 48 +------ 4 files changed, 35 insertions(+), 142 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java index 81af36c7..26bd0bcb 100644 --- a/analytics/src/main/java/com/segment/analytics/Analytics.java +++ b/analytics/src/main/java/com/segment/analytics/Analytics.java @@ -90,11 +90,6 @@ public boolean offer(MessageBuilder builder) { return client.offer(message); } - /** Flush events in the message queue. */ - public void flush() { - client.flush(); - } - /** Stops this instance from processing further requests. */ public void shutdown() { client.shutdown(); diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index f7560004..024f2186 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -24,9 +24,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -54,17 +52,17 @@ public class AnalyticsClient { } private final BlockingQueue messageQueue; + private final BlockingQueue pendingQueue; private final HttpUrl uploadUrl; private final SegmentService service; private final int size; + private final long flushIntervalInMillis; private final int maximumRetries; private final int maximumQueueByteSize; - private int currentQueueSizeInBytes; private final Log log; private final List callbacks; private final ExecutorService networkExecutor; - private final ExecutorService looperExecutor; - private final ScheduledExecutorService flushScheduler; + private final Thread looperThread; private final AtomicBoolean isShutDown; private final String writeKey; @@ -83,6 +81,7 @@ public static AnalyticsClient create( String writeKey, Gson gsonInstance) { return new AnalyticsClient( + new LinkedBlockingQueue(queueCapacity), new LinkedBlockingQueue(queueCapacity), uploadUrl, segmentService, @@ -101,6 +100,7 @@ public static AnalyticsClient create( public AnalyticsClient( BlockingQueue messageQueue, + BlockingQueue pendingQueue, HttpUrl uploadUrl, SegmentService service, int maxQueueSize, @@ -115,34 +115,20 @@ public AnalyticsClient( String writeKey, Gson gsonInstance) { this.messageQueue = messageQueue; + this.pendingQueue = pendingQueue; this.uploadUrl = uploadUrl; this.service = service; this.size = maxQueueSize; + this.flushIntervalInMillis = flushIntervalInMillis; this.maximumRetries = maximumRetries; this.maximumQueueByteSize = maximumQueueSizeInBytes; this.log = log; this.callbacks = callbacks; - this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory); + this.looperThread = threadFactory.newThread(new Looper()); this.networkExecutor = networkExecutor; this.isShutDown = isShutDown; this.writeKey = writeKey; this.gsonInstance = gsonInstance; - - this.currentQueueSizeInBytes = 0; - - if (!isShutDown.get()) looperExecutor.submit(new Looper()); - - flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); - flushScheduler.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - flush(); - } - }, - flushIntervalInMillis, - flushIntervalInMillis, - TimeUnit.MILLISECONDS); } public int messageSizeInBytes(Message message) { @@ -151,74 +137,33 @@ public int messageSizeInBytes(Message message) { return stringifiedMessage.getBytes(ENCODING).length; } - private Boolean isBackPressuredAfterSize(int incomingSize) { - int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON); - int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE; - // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time - return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9; - } - public boolean offer(Message message) { return messageQueue.offer(message); } - public void enqueue(Message message) { - if (message != StopMessage.STOP && isShutDown.get()) { + public void enqueue(Message message) {} + + public void enqueueSend(Message message) { + if (isShutDown.get()) { log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); return; } try { - // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its - // valid message - if (message != StopMessage.STOP && message != FlushMessage.POISON) { - int messageByteSize = messageSizeInBytes(message); - - // @jorgen25 check if message is below 32kb limit for individual messages, no need to check - // for extra characters - if (messageByteSize <= MSG_MAX_SIZE) { - if (isBackPressuredAfterSize(messageByteSize)) { - this.currentQueueSizeInBytes = messageByteSize; - messageQueue.put(FlushMessage.POISON); - messageQueue.put(message); - - log.print(VERBOSE, "Maximum storage size has been hit Flushing..."); - } else { - messageQueue.put(message); - this.currentQueueSizeInBytes += messageByteSize; - } - } else { - log.print( - ERROR, "Message was above individual limit. MessageId: %s", message.messageId()); - throw new IllegalArgumentException( - "Message was above individual limit. MessageId: " + message.messageId()); - } - } else { - messageQueue.put(message); - } + messageQueue.put(message); } catch (InterruptedException e) { log.print(ERROR, e, "Interrupted while adding message %s.", message); Thread.currentThread().interrupt(); } } - public void flush() { - if (!isShutDown.get()) { - enqueue(FlushMessage.POISON); - } - } - public void shutdown() { if (isShutDown.compareAndSet(false, true)) { final long start = System.currentTimeMillis(); // first let's tell the system to stop - enqueue(StopMessage.STOP); - - // we can shutdown the flush scheduler without worrying - flushScheduler.shutdownNow(); + looperThread.interrupt(); - shutdownAndWait(looperExecutor, "looper"); shutdownAndWait(networkExecutor, "network"); log.print( @@ -247,30 +192,21 @@ public void shutdownAndWait(ExecutorService executor, String name) { * messages, it triggers a flush. */ class Looper implements Runnable { - private boolean stop; public Looper() { - this.stop = false; } @Override public void run() { LinkedList messages = new LinkedList<>(); - AtomicInteger currentBatchSize = new AtomicInteger(); + int currentBatchSize = 0; boolean batchSizeLimitReached = false; int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length; try { - while (!stop) { - Message message = messageQueue.take(); - - if (message == StopMessage.STOP) { - log.print(VERBOSE, "Stopping the Looper"); - stop = true; - } else if (message == FlushMessage.POISON) { - if (!messages.isEmpty()) { - log.print(VERBOSE, "Flushing messages."); - } - } else { + while (!Thread.currentThread().isInterrupted()) { + Message message = messageQueue.poll(flushIntervalInMillis, TimeUnit.MILLISECONDS); + + if (message != null) { // we do +1 because we are accounting for this new message we just took from the queue // which is not in list yet // need to check if this message is going to make us go over the limit considering @@ -278,9 +214,9 @@ public void run() { int defaultBatchSize = BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); int msgSize = messageSizeInBytes(message); - if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { + if (currentBatchSize + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { messages.add(message); - currentBatchSize.addAndGet(msgSize); + currentBatchSize+=msgSize; } else { // put message that did not make the cut this time back on the queue, we already took // this message if we dont put it back its lost @@ -288,8 +224,12 @@ public void run() { batchSizeLimitReached = true; } } + + if (messages.isEmpty()) { + continue; + } - Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP; + Boolean isBlockingSignal = message == null; Boolean isOverflow = messages.size() >= size; if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { @@ -302,7 +242,7 @@ public void run() { networkExecutor.submit( BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); - currentBatchSize.set(0); + currentBatchSize=0; messages.clear(); if (batchSizeLimitReached) { // If this is true that means the last message that would make us go over the limit @@ -315,8 +255,9 @@ public void run() { } } catch (InterruptedException e) { log.print(DEBUG, "Looper interrupted while polling for messages."); - Thread.currentThread().interrupt(); + Thread.currentThread().interrupt(); //XXX } + // SEND pending log.print(VERBOSE, "Looper stopped"); } } diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java index ca3a8099..075ae003 100644 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java +++ b/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java @@ -91,13 +91,6 @@ public void shutdownIsDispatched() { verify(client).shutdown(); } - @Test - public void flushIsDispatched() { - analytics.flush(); - - verify(client).flush(); - } - @Test public void offerIsDispatched(MessageBuilderTest builder) { MessageBuilder messageBuilder = builder.get().userId("dummy"); diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java index 74f04e13..b58ec35b 100644 --- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java +++ b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java @@ -75,6 +75,7 @@ public class AnalyticsClientTest { ThreadFactory threadFactory; @Spy LinkedBlockingQueue messageQueue; + @Spy LinkedBlockingQueue pendingQueue; @Mock SegmentService segmentService; @Mock ExecutorService networkExecutor; @Mock Callback callback; @@ -95,6 +96,7 @@ public void setUp() { AnalyticsClient newClient() { return new AnalyticsClient( messageQueue, + pendingQueue, null, segmentService, 50, @@ -131,15 +133,6 @@ public void shutdown() throws InterruptedException { verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); } - @Test - public void flushInsertsPoison() throws InterruptedException { - AnalyticsClient client = newClient(); - - client.flush(); - - verify(messageQueue).put(FlushMessage.POISON); - } - /** Wait until the queue is drained. */ static void wait(Queue queue) { // noinspection StatementWithEmptyBody @@ -198,21 +191,6 @@ private static String generateDataOfSizeSpecialChars( return builder.toString(); } - @Test - public void flushSubmitsToExecutor() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - TrackMessage first = TrackMessage.builder("foo").userId("bar").build(); - TrackMessage second = TrackMessage.builder("qaz").userId("qux").build(); - client.enqueue(first); - client.enqueue(second); - client.flush(); - wait(messageQueue); - - assertThat(captureBatch(networkExecutor).batch()).containsExactly(first, second); - } - @Test public void enqueueMaxTriggersFlush() { messageQueue = new LinkedBlockingQueue<>(); @@ -284,6 +262,7 @@ public void flushHowManyTimesNecessaryToStayWithinLimit() throws InterruptedExce AnalyticsClient client = new AnalyticsClient( messageQueue, + pendingQueue, null, segmentService, 50, @@ -544,24 +523,6 @@ public boolean matches(IOException exception) { })); } - @Test - public void flushWhenNotShutDown() throws InterruptedException { - AnalyticsClient client = newClient(); - - client.flush(); - verify(messageQueue).put(POISON); - } - - @Test - public void flushWhenShutDown() throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.flush(); - - verify(messageQueue, times(0)).put(any(Message.class)); - } - @Test public void enqueueWithRegularMessageWhenNotShutdown(MessageBuilderTest builder) throws InterruptedException { @@ -860,6 +821,7 @@ public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgu AnalyticsClient client = new AnalyticsClient( messageQueue, + pendingQueue, null, segmentService, 50, @@ -902,6 +864,7 @@ public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgu AnalyticsClient client = new AnalyticsClient( messageQueue, + pendingQueue, null, segmentService, 50, @@ -937,6 +900,7 @@ public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedExcep AnalyticsClient client = new AnalyticsClient( messageQueue, + pendingQueue, null, segmentService, 50, From 82fb90840e23b2ff6112667ca23b093d196536df Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Fri, 21 Mar 2025 16:40:00 +0100 Subject: [PATCH 04/13] simple wiremock test --- analytics/pom.xml | 7 ++ .../analytics/internal/AnalyticsClient.java | 9 ++- .../com/segment/analytics/SegmentTest.java | 78 +++++++++++++++++-- 3 files changed, 85 insertions(+), 9 deletions(-) diff --git a/analytics/pom.xml b/analytics/pom.xml index bf2e9ae1..6a97d231 100644 --- a/analytics/pom.xml +++ b/analytics/pom.xml @@ -72,7 +72,14 @@ org.wiremock wiremock-standalone 3.2.0 + test + + org.awaitility + awaitility + 4.2.2 + test + diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 024f2186..1a644aa5 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -28,7 +28,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import okhttp3.HttpUrl; import retrofit2.Call; import retrofit2.Response; @@ -129,6 +128,7 @@ public AnalyticsClient( this.isShutDown = isShutDown; this.writeKey = writeKey; this.gsonInstance = gsonInstance; + looperThread.start(); } public int messageSizeInBytes(Message message) { @@ -141,7 +141,10 @@ public boolean offer(Message message) { return messageQueue.offer(message); } - public void enqueue(Message message) {} + public void enqueue(Message message) { + + enqueueSend(message); + } public void enqueueSend(Message message) { if (isShutDown.get()) { @@ -289,7 +292,7 @@ static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetrie private void notifyCallbacksWithException(Batch batch, Exception exception) { for (Message message : batch.batch()) { for (Callback callback : client.callbacks) { - callback.failure(message, exception); + callback.failure(message, exception); } } } diff --git a/analytics/src/test/java/com/segment/analytics/SegmentTest.java b/analytics/src/test/java/com/segment/analytics/SegmentTest.java index 531229d3..5af61174 100644 --- a/analytics/src/test/java/com/segment/analytics/SegmentTest.java +++ b/analytics/src/test/java/com/segment/analytics/SegmentTest.java @@ -7,34 +7,100 @@ import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.github.tomakehurst.wiremock.stubbing.ServeEvent; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.segment.analytics.gson.AutoValueAdapterFactory; +import com.segment.analytics.gson.ISO8601DateAdapter; import com.segment.analytics.messages.TrackMessage; -import java.util.UUID; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import wiremock.com.fasterxml.jackson.core.JsonProcessingException; +import wiremock.com.fasterxml.jackson.databind.JsonNode; +import wiremock.com.fasterxml.jackson.databind.ObjectMapper; public class SegmentTest { @Rule - public WireMockRule wireMockRule = new WireMockRule(wireMockConfig().dynamicPort(), false); + public WireMockRule wireMockRule = + new WireMockRule(wireMockConfig().dynamicPort().gzipDisabled(true), false); Analytics analytics; + GsonBuilder gsonBuilder = new GsonBuilder() + .registerTypeAdapterFactory(new AutoValueAdapterFactory()) + .registerTypeAdapter(Date.class, new ISO8601DateAdapter()); + + Gson gson = gsonBuilder.create(); + @Before public void confWireMock() { stubFor(post(urlEqualTo("/v1/import/")).willReturn(okJson("{\"success\": \"true\"}"))); analytics = Analytics.builder("write-key") .endpoint(wireMockRule.baseUrl()) + .flushInterval(1, TimeUnit.SECONDS) + .queueCapacity(500) // callback // http client .build(); } @Test - public void test() { - analytics.enqueue(TrackMessage.builder("my-track") - .messageId(UUID.randomUUID().toString()) - .userId("userId")); + public void test() throws Throwable { + analytics.enqueue(TrackMessage.builder("my-track").messageId("m1").userId("userId")); + analytics.enqueue(TrackMessage.builder("my-track").messageId("m2").userId("userId")); + + Awaitility.await().until(() -> sentMessagesEqualsTo("m1", "m2")); + } + + @Test + public void testMore() throws Throwable { + System.err.println("wm at " + wireMockRule.baseUrl()); + int num = 100_000; + String[] expectedIds = new String[num]; + for (int i = 0; i < num; i++) { + String id = "m" + i; + expectedIds[i] = id; + analytics.enqueue(TrackMessage.builder("my-track").messageId(id).userId("userId")); + } + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> sentMessagesEqualsTo(expectedIds)); + } + + private static final ObjectMapper OM = new ObjectMapper(); + + private boolean sentMessagesEqualsTo(String... msgIds) { + return new HashSet<>(sentMessages()).equals(new HashSet<>(Arrays.asList(msgIds))); + } + + private List sentMessages() { + List messageIds = new ArrayList<>(); + for (ServeEvent event : wireMockRule.getAllServeEvents()) { + JsonNode batch; + try { + JsonNode json = OM.readTree(event.getRequest().getBodyAsString()); + batch = json.get("batch"); + if (batch == null) { + continue; + } + } catch (JsonProcessingException e) { + continue; + } + Iterator msgs = batch.elements(); + while (msgs.hasNext()) { + messageIds.add(msgs.next().get("messageId").asText()); + } + } + return messageIds; } } From 205e2dee5fa36c06a5bab76252fb133d46c05ce7 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Mon, 24 Mar 2025 17:32:46 +0100 Subject: [PATCH 05/13] remove callback --- .../java/com/segment/analytics/Analytics.java | 22 ---- .../analytics/internal/AnalyticsClient.java | 20 +--- .../analytics/AnalyticsBuilderTest.java | 34 ------ .../internal/AnalyticsClientTest.java | 102 ++++++++---------- 4 files changed, 48 insertions(+), 130 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java index 26bd0bcb..d7444643 100644 --- a/analytics/src/main/java/com/segment/analytics/Analytics.java +++ b/analytics/src/main/java/com/segment/analytics/Analytics.java @@ -141,7 +141,6 @@ public static class Builder { private int maximumFlushAttempts; private int maximumQueueSizeInBytes; private long flushIntervalInMillis; - private List callbacks; private int queueCapacity; private boolean forceTlsV1 = false; private GsonBuilder gsonBuilder; @@ -318,21 +317,6 @@ public Builder threadFactory(ThreadFactory threadFactory) { return this; } - /** Add a {@link Callback} to be notified when an event is processed. */ - public Builder callback(Callback callback) { - if (callback == null) { - throw new NullPointerException("Null callback"); - } - if (callbacks == null) { - callbacks = new ArrayList<>(); - } - if (callbacks.contains(callback)) { - throw new IllegalStateException("Callback is already registered."); - } - callbacks.add(callback); - return this; - } - /** Use a {@link Plugin} to configure the builder. */ @Beta public Builder plugin(Plugin plugin) { @@ -407,11 +391,6 @@ public Analytics build() { if (threadFactory == null) { threadFactory = Platform.get().defaultThreadFactory(); } - if (callbacks == null) { - callbacks = Collections.emptyList(); - } else { - callbacks = Collections.unmodifiableList(callbacks); - } HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor( @@ -463,7 +442,6 @@ public void log(String message) { log, threadFactory, networkExecutor, - callbacks, writeKey, gson); diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 1a644aa5..c6cc9abd 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -59,7 +59,6 @@ public class AnalyticsClient { private final int maximumRetries; private final int maximumQueueByteSize; private final Log log; - private final List callbacks; private final ExecutorService networkExecutor; private final Thread looperThread; private final AtomicBoolean isShutDown; @@ -76,7 +75,6 @@ public static AnalyticsClient create( Log log, ThreadFactory threadFactory, ExecutorService networkExecutor, - List callbacks, String writeKey, Gson gsonInstance) { return new AnalyticsClient( @@ -91,7 +89,6 @@ public static AnalyticsClient create( log, threadFactory, networkExecutor, - callbacks, new AtomicBoolean(false), writeKey, gsonInstance); @@ -109,7 +106,6 @@ public AnalyticsClient( Log log, ThreadFactory threadFactory, ExecutorService networkExecutor, - List callbacks, AtomicBoolean isShutDown, String writeKey, Gson gsonInstance) { @@ -122,8 +118,7 @@ public AnalyticsClient( this.maximumRetries = maximumRetries; this.maximumQueueByteSize = maximumQueueSizeInBytes; this.log = log; - this.callbacks = callbacks; - this.looperThread = threadFactory.newThread(new Looper()); + this.looperThread = threadFactory.newThread(new Looper()); this.networkExecutor = networkExecutor; this.isShutDown = isShutDown; this.writeKey = writeKey; @@ -290,11 +285,7 @@ static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetrie } private void notifyCallbacksWithException(Batch batch, Exception exception) { - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.failure(message, exception); - } - } + // XXX failure } /** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */ @@ -308,12 +299,7 @@ boolean upload() { if (response.isSuccessful()) { client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence()); - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.success(message); - } - } - + // XXX success return false; } diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java index 31596e90..e1c282ba 100644 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java +++ b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java @@ -381,46 +381,12 @@ public void buildsWithThreadFactory() { assertThat(analytics).isNotNull(); } - @Test - public void nullCallback() { - try { - builder.callback(null); - fail("Should fail for null callback"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null callback"); - } - } - - @Test - public void duplicateCallback() { - Callback callback = mock(Callback.class); - try { - builder.callback(callback).callback(callback); - } catch (IllegalStateException e) { - assertThat(e).hasMessage("Callback is already registered."); - } - } - - @Test - public void buildsWithValidCallback() { - Analytics analytics = builder.callback(mock(Callback.class)).build(); - assertThat(analytics).isNotNull(); - } - @Test public void buildsWithForceTlsV1() { Analytics analytics = builder.forceTlsVersion1().build(); assertThat(analytics).isNotNull(); } - @Test - public void multipleCallbacks() { - Analytics analytics = - builder.callback(mock(Callback.class)).callback(mock(Callback.class)).build(); - - assertThat(analytics).isNotNull(); - } - @Test public void nullPlugin() { try { diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java index b58ec35b..a06bd808 100644 --- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java +++ b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java @@ -4,20 +4,16 @@ import static com.segment.analytics.internal.StopMessage.STOP; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.openMocks; import com.google.gson.Gson; -import com.segment.analytics.Callback; import com.segment.analytics.Log; import com.segment.analytics.TestUtils.MessageBuilderTest; import com.segment.analytics.http.SegmentService; @@ -28,7 +24,6 @@ import com.segment.analytics.messages.TrackMessage; import com.segment.backo.Backo; import com.squareup.burst.BurstJUnit4; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; @@ -47,7 +42,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; @@ -78,7 +72,6 @@ public class AnalyticsClientTest { @Spy LinkedBlockingQueue pendingQueue; @Mock SegmentService segmentService; @Mock ExecutorService networkExecutor; - @Mock Callback callback; @Mock UploadResponse response; AtomicBoolean isShutDown; @@ -106,7 +99,6 @@ AnalyticsClient newClient() { log, threadFactory, networkExecutor, - Collections.singletonList(callback), isShutDown, writeKey, new Gson()); @@ -272,7 +264,6 @@ public void flushHowManyTimesNecessaryToStayWithinLimit() throws InterruptedExce log, threadFactory, networkExecutor, - Collections.singletonList(callback), isShutDown, writeKey, new Gson()); @@ -367,8 +358,8 @@ public void batchRetriesForNetworkErrors() { // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. verify(segmentService, times(4)).upload(null, batch); - verify(callback).success(trackMessage); - } + //// verify(callback).success(trackMessage); + } @Test public void batchRetriesForHTTP5xxErrors() { @@ -392,8 +383,8 @@ public void batchRetriesForHTTP5xxErrors() { // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. verify(segmentService, times(4)).upload(null, batch); - verify(callback).success(trackMessage); - } + // verify(callback).success(trackMessage); + } @Test public void batchRetriesForHTTP429Errors() { @@ -416,8 +407,8 @@ public void batchRetriesForHTTP429Errors() { // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. verify(segmentService, times(4)).upload(null, batch); - verify(callback).success(trackMessage); - } + // verify(callback).success(trackMessage); + } @Test public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() { @@ -435,8 +426,8 @@ public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() { // Verify we only tried to upload once. verify(segmentService).upload(null, batch); - verify(callback).failure(eq(trackMessage), any(IOException.class)); - } + // verify(callback).failure(eq(trackMessage), any(IOException.class)); + } @Test public void batchDoesNotRetryForNonNetworkErrors() { @@ -452,8 +443,8 @@ public void batchDoesNotRetryForNonNetworkErrors() { // Verify we only tried to upload once. verify(segmentService).upload(null, batch); - verify(callback).failure(eq(trackMessage), any(RuntimeException.class)); - } + // verify(callback).failure(eq(trackMessage), any(RuntimeException.class)); + } @Test public void givesUpAfterMaxRetries() { @@ -477,17 +468,17 @@ public Call answer(InvocationOnMock invocation) { // DEFAULT_RETRIES == maxRetries // tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java verify(segmentService, times(11)).upload(null, batch); - verify(callback) - .failure( - eq(trackMessage), - argThat( - new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("11 retries exhausted"); - } - })); - } + // verify(callback) + // .failure( + // eq(trackMessage), + // argThat( + // new ArgumentMatcher() { + // @Override + // public boolean matches(IOException exception) { + // return exception.getMessage().equals("11 retries exhausted"); + // } + // })); + } @Test public void hasDefaultRetriesSetTo3() { @@ -511,17 +502,17 @@ public Call answer(InvocationOnMock invocation) { // DEFAULT_RETRIES == maxRetries // tries 11(one normal run + 10 retries) verify(segmentService, times(4)).upload(null, batch); - verify(callback) - .failure( - eq(trackMessage), - argThat( - new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("4 retries exhausted"); - } - })); - } + // verify(callback) + // .failure( + // eq(trackMessage), + // argThat( + // new ArgumentMatcher() { + // @Override + // public boolean matches(IOException exception) { + // return exception.getMessage().equals("4 retries exhausted"); + // } + // })); + } @Test public void enqueueWithRegularMessageWhenNotShutdown(MessageBuilderTest builder) @@ -563,8 +554,8 @@ public void shutdownWhenAlreadyShutDown() throws InterruptedException { client.shutdown(); verify(messageQueue, times(0)).put(any(Message.class)); - verifyNoInteractions(networkExecutor, callback, segmentService); - } + // verifyNoInteractions(networkExecutor, callback, segmentService); + } @Test public void shutdownWithNoMessageInTheQueue() throws InterruptedException { @@ -612,17 +603,17 @@ public Call answer(InvocationOnMock invocation) { // runs once but never retries verify(segmentService, times(1)).upload(null, batch); - verify(callback) - .failure( - eq(trackMessage), - argThat( - new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("1 retries exhausted"); - } - })); - } + // verify(callback) + // .failure( + // eq(trackMessage), + // argThat( + // new ArgumentMatcher() { + // @Override + // public boolean matches(IOException exception) { + // return exception.getMessage().equals("1 retries exhausted"); + // } + // })); + } /** * ********************************************************************************************** @@ -831,7 +822,6 @@ public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgu log, threadFactory, networkExecutor, - Collections.singletonList(callback), isShutDown, writeKey, new Gson()); @@ -874,7 +864,6 @@ public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgu log, threadFactory, networkExecutor, - Collections.singletonList(callback), isShutDown, writeKey, new Gson()); @@ -910,7 +899,6 @@ public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedExcep log, threadFactory, networkExecutor, - Collections.singletonList(callback), isShutDown, writeKey, new Gson()); From af74b24ed91b875d15fa419f5ce56a2743cf50b2 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Mon, 24 Mar 2025 17:55:37 +0100 Subject: [PATCH 06/13] remove backo, add failsafe --- analytics/pom.xml | 12 +- .../java/com/segment/analytics/Analytics.java | 40 +--- .../analytics/internal/AnalyticsClient.java | 218 +++++++----------- .../com/segment/analytics/SegmentTest.java | 41 +++- 4 files changed, 139 insertions(+), 172 deletions(-) diff --git a/analytics/pom.xml b/analytics/pom.xml index 6a97d231..3f6be8ef 100644 --- a/analytics/pom.xml +++ b/analytics/pom.xml @@ -39,10 +39,18 @@ findbugs provided + - com.segment.backo - backo + dev.failsafe + failsafe + 3.3.2 + + dev.failsafe + failsafe-retrofit + 3.3.2 + + junit junit diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java index d7444643..bccce351 100644 --- a/analytics/src/main/java/com/segment/analytics/Analytics.java +++ b/analytics/src/main/java/com/segment/analytics/Analytics.java @@ -139,7 +139,6 @@ public static class Builder { private ThreadFactory threadFactory; private int flushQueueSize; private int maximumFlushAttempts; - private int maximumQueueSizeInBytes; private long flushIntervalInMillis; private int queueCapacity; private boolean forceTlsV1 = false; @@ -267,17 +266,6 @@ public Builder flushQueueSize(int flushQueueSize) { return this; } - /** Set the queueSize at which flushes should be triggered. */ - @Beta - public Builder maximumQueueSizeInBytes(int bytes) { - if (bytes < 1) { - throw new IllegalArgumentException("maximumQueueSizeInBytes must not be less than 1."); - } - - this.maximumQueueSizeInBytes = bytes; - return this; - } - /** Set the interval at which the queue should be flushed. */ @Beta public Builder flushInterval(long flushInterval, TimeUnit unit) { @@ -369,9 +357,6 @@ public Analytics build() { if (flushQueueSize == 0) { flushQueueSize = Platform.get().defaultFlushQueueSize(); } - if (maximumQueueSizeInBytes == 0) { - maximumQueueSizeInBytes = MESSAGE_QUEUE_MAX_BYTE_SIZE; - } if (maximumFlushAttempts == 0) { maximumFlushAttempts = 3; } @@ -430,20 +415,17 @@ public void log(String message) { SegmentService segmentService = restAdapter.create(SegmentService.class); - AnalyticsClient analyticsClient = - AnalyticsClient.create( - endpoint, - segmentService, - queueCapacity, - flushQueueSize, - flushIntervalInMillis, - maximumFlushAttempts, - maximumQueueSizeInBytes, - log, - threadFactory, - networkExecutor, - writeKey, - gson); + AnalyticsClient analyticsClient = AnalyticsClient.create( + endpoint, + segmentService, + queueCapacity, + flushQueueSize, + flushIntervalInMillis, + log, + threadFactory, + networkExecutor, + writeKey, + gson); return new Analytics(analyticsClient, messageTransformers, messageInterceptors, log); } diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index c6cc9abd..9f9e25b0 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -5,24 +5,30 @@ import static com.segment.analytics.Log.Level.VERBOSE; import com.google.gson.Gson; -import com.segment.analytics.Callback; import com.segment.analytics.Log; import com.segment.analytics.http.SegmentService; import com.segment.analytics.http.UploadResponse; import com.segment.analytics.messages.Batch; import com.segment.analytics.messages.Message; -import com.segment.backo.Backo; +import dev.failsafe.CircuitBreaker; +import dev.failsafe.CircuitBreakerOpenException; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.RetryPolicy; +import dev.failsafe.retrofit.FailsafeCall; import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -38,7 +44,7 @@ public class AnalyticsClient { private static final int MSG_MAX_SIZE = 1024 * 32; private static final Charset ENCODING = StandardCharsets.UTF_8; private Gson gsonInstance; - private static final String instanceId = UUID.randomUUID().toString(); + private static final String instanceId = UUID.randomUUID().toString(); // TODO configurable ? static { Map library = new LinkedHashMap<>(); @@ -51,18 +57,16 @@ public class AnalyticsClient { } private final BlockingQueue messageQueue; - private final BlockingQueue pendingQueue; private final HttpUrl uploadUrl; private final SegmentService service; - private final int size; + private final int flushQueueSize; private final long flushIntervalInMillis; - private final int maximumRetries; - private final int maximumQueueByteSize; private final Log log; private final ExecutorService networkExecutor; private final Thread looperThread; private final AtomicBoolean isShutDown; private final String writeKey; + private final FailsafeExecutor> failsafe; public static AnalyticsClient create( HttpUrl uploadUrl, @@ -70,22 +74,17 @@ public static AnalyticsClient create( int queueCapacity, int flushQueueSize, long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, Log log, ThreadFactory threadFactory, ExecutorService networkExecutor, String writeKey, Gson gsonInstance) { return new AnalyticsClient( - new LinkedBlockingQueue(queueCapacity), new LinkedBlockingQueue(queueCapacity), uploadUrl, segmentService, flushQueueSize, flushIntervalInMillis, - maximumRetries, - maximumQueueSizeInBytes, log, threadFactory, networkExecutor, @@ -96,13 +95,10 @@ public static AnalyticsClient create( public AnalyticsClient( BlockingQueue messageQueue, - BlockingQueue pendingQueue, HttpUrl uploadUrl, SegmentService service, - int maxQueueSize, + int flushQueueSize, long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, Log log, ThreadFactory threadFactory, ExecutorService networkExecutor, @@ -110,13 +106,10 @@ public AnalyticsClient( String writeKey, Gson gsonInstance) { this.messageQueue = messageQueue; - this.pendingQueue = pendingQueue; this.uploadUrl = uploadUrl; this.service = service; - this.size = maxQueueSize; + this.flushQueueSize = flushQueueSize; this.flushIntervalInMillis = flushIntervalInMillis; - this.maximumRetries = maximumRetries; - this.maximumQueueByteSize = maximumQueueSizeInBytes; this.log = log; this.looperThread = threadFactory.newThread(new Looper()); this.networkExecutor = networkExecutor; @@ -124,6 +117,35 @@ public AnalyticsClient( this.writeKey = writeKey; this.gsonInstance = gsonInstance; looperThread.start(); + + CircuitBreaker> breaker = CircuitBreaker.>builder() + // 2 failure in 5 minute open the circuit + .withFailureThreshold(2, Duration.ofMinutes(5)) + // once open wait 1 minute to be half-open + .withDelay(Duration.ofMinutes(1)) + // after 1 success the circuit is closed + .withSuccessThreshold(1) + // 5xx or rate limit is an error + .handleResultIf(response -> is5xx(response.code()) || response.code() == 429) + .build(); + + RetryPolicy> retry = RetryPolicy.>builder() + .withMaxAttempts(3) + .withBackoff(1, 300, ChronoUnit.SECONDS) + .withJitter(.1) + // retry on IOException + .handle(IOException.class) + // retry on 5xx or rate limit + .handleResultIf(response -> is5xx(response.code()) || response.code() == 429) + .onRetriesExceeded(context -> { + throw new RuntimeException("retries"); + }) + .onAbort(context -> { + throw new RuntimeException("aborted"); + }) + .build(); + + this.failsafe = Failsafe.with(retry, breaker).with(networkExecutor); } public int messageSizeInBytes(Message message) { @@ -137,23 +159,14 @@ public boolean offer(Message message) { } public void enqueue(Message message) { - - enqueueSend(message); - } - - public void enqueueSend(Message message) { - if (isShutDown.get()) { - log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); - return; - } - - try { - messageQueue.put(message); - } catch (InterruptedException e) { - log.print(ERROR, e, "Interrupted while adding message %s.", message); - Thread.currentThread().interrupt(); + if (isShutDown.get()) { + log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); + return; + } + if (!messageQueue.offer(message)) { + handleError(message); + } } - } public void shutdown() { if (isShutDown.compareAndSet(false, true)) { @@ -171,6 +184,8 @@ public void shutdown() { public void shutdownAndWait(ExecutorService executor, String name) { try { + this.looperThread.interrupt(); + executor.shutdown(); final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); @@ -228,7 +243,7 @@ public void run() { } Boolean isBlockingSignal = message == null; - Boolean isOverflow = messages.size() >= size; + Boolean isOverflow = messages.size() >= flushQueueSize; if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); @@ -237,10 +252,22 @@ public void run() { "Batching %s message(s) into batch %s.", batch.batch().size(), batch.sequence()); - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); - currentBatchSize=0; + Call call = service.upload(uploadUrl, batch); + FailsafeCall failsafeCall = + FailsafeCall.with(failsafe).compose(call); + failsafeCall.executeAsync() + .thenAccept(r -> { + if(is5xx(r.code()) || r.code() == 429) { + handleError(batch, null); + } + }) + .exceptionally(t -> { + handleError(batch, t); + return null; + }); + + currentBatchSize = 0; messages.clear(); if (batchSizeLimitReached) { // If this is true that means the last message that would make us go over the limit @@ -253,109 +280,30 @@ public void run() { } } catch (InterruptedException e) { log.print(DEBUG, "Looper interrupted while polling for messages."); - Thread.currentThread().interrupt(); //XXX - } + // XXX CANCEL UPLOAD + } catch (Exception e) { + e.printStackTrace(); + } // SEND pending log.print(VERBOSE, "Looper stopped"); } + } - - static class BatchUploadTask implements Runnable { - private static final Backo BACKO = - Backo.builder() // - .base(TimeUnit.SECONDS, 15) // - .cap(TimeUnit.HOURS, 1) // - .jitter(1) // - .build(); - - private final AnalyticsClient client; - private final Backo backo; - final Batch batch; - private final int maxRetries; - - static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) { - return new BatchUploadTask(client, BACKO, batch, maxRetries); - } - - BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) { - this.client = client; - this.batch = batch; - this.backo = backo; - this.maxRetries = maxRetries; - } - - private void notifyCallbacksWithException(Batch batch, Exception exception) { - // XXX failure - } - - /** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */ - boolean upload() { - client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence()); - - try { - Call call = client.service.upload(client.uploadUrl, batch); - Response response = call.execute(); - - if (response.isSuccessful()) { - client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence()); - - // XXX success - return false; - } - - int status = response.code(); - if (is5xx(status)) { - client.log.print( - DEBUG, "Could not upload batch %s due to server error. Retrying.", batch.sequence()); - return true; - } else if (status == 429) { - client.log.print( - DEBUG, "Could not upload batch %s due to rate limiting. Retrying.", batch.sequence()); - return true; - } - - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); - notifyCallbacksWithException(batch, new IOException(response.errorBody().string())); - - return false; - } catch (IOException error) { - client.log.print(DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence()); - - return true; - } catch (Exception exception) { - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); - - notifyCallbacksWithException(batch, exception); - - return false; + + void handleError(Batch batch, Throwable t) { + if(t instanceof CompletionException && t.getCause() instanceof CircuitBreakerOpenException) { + System.err.println("OPEN"); } - } - - @Override - public void run() { - int attempt = 0; - for (; attempt <= maxRetries; attempt++) { - boolean retry = upload(); - if (!retry) return; - try { - backo.sleep(attempt); - } catch (InterruptedException e) { - client.log.print( - DEBUG, "Thread interrupted while backing off for batch %s.", batch.sequence()); - return; - } - } - - client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); - notifyCallbacksWithException( - batch, new IOException(Integer.toString(attempt) + " retries exhausted")); - } + + System.err.println("" + batch); + } + void handleError(Message message) { + System.err.println("" + message); + } private static boolean is5xx(int status) { return status >= 500 && status < 600; - } } - public static class BatchUtility { /** diff --git a/analytics/src/test/java/com/segment/analytics/SegmentTest.java b/analytics/src/test/java/com/segment/analytics/SegmentTest.java index 5af61174..864f9d51 100644 --- a/analytics/src/test/java/com/segment/analytics/SegmentTest.java +++ b/analytics/src/test/java/com/segment/analytics/SegmentTest.java @@ -6,6 +6,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.junit.WireMockRule; import com.github.tomakehurst.wiremock.stubbing.ServeEvent; import com.google.gson.Gson; @@ -27,12 +28,17 @@ import wiremock.com.fasterxml.jackson.core.JsonProcessingException; import wiremock.com.fasterxml.jackson.databind.JsonNode; import wiremock.com.fasterxml.jackson.databind.ObjectMapper; +import wiremock.com.google.common.util.concurrent.RateLimiter; public class SegmentTest { @Rule - public WireMockRule wireMockRule = - new WireMockRule(wireMockConfig().dynamicPort().gzipDisabled(true), false); + public WireMockRule wireMockRule = new WireMockRule( + wireMockConfig() + .port(8088) + // .dynamicPort() + .gzipDisabled(true), + false); Analytics analytics; @@ -48,8 +54,10 @@ public void confWireMock() { analytics = Analytics.builder("write-key") .endpoint(wireMockRule.baseUrl()) + // .endpoint("http://localhost:8888") .flushInterval(1, TimeUnit.SECONDS) - .queueCapacity(500) + .flushQueueSize(20) + .queueCapacity(50) // callback // http client .build(); @@ -57,10 +65,31 @@ public void confWireMock() { @Test public void test() throws Throwable { - analytics.enqueue(TrackMessage.builder("my-track").messageId("m1").userId("userId")); - analytics.enqueue(TrackMessage.builder("my-track").messageId("m2").userId("userId")); - Awaitility.await().until(() -> sentMessagesEqualsTo("m1", "m2")); + stubFor(post(urlEqualTo("/v1/import/")) + .willReturn(WireMock.aResponse().withStatus(503).withBody("fail"))); + + long start = System.currentTimeMillis(); + boolean upAgain = false; + int id = 0; + RateLimiter rate = RateLimiter.create(5); + while (true) { + if (rate.tryAcquire()) { + System.err.println("id " + id); + analytics.enqueue( + TrackMessage.builder("my-track").messageId("m" + id++).userId("userId")); + } + Thread.sleep(50); + + if (!upAgain && System.currentTimeMillis() - start > 120_000) { + upAgain = true; + stubFor(post(urlEqualTo("/v1/import/")).willReturn(okJson("{\"success\": \"true\"}"))); + System.err.println("UP AGAIN"); + } + } + + // analytics.enqueue(TrackMessage.builder("my-track").messageId("m2").userId("userId")); + // Awaitility.await().until(() -> sentMessagesEqualsTo("m1", "m2")); } @Test From 85ef9096174d71eee6674c5bc8064d9485526a85 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Tue, 25 Mar 2025 13:08:37 +0100 Subject: [PATCH 07/13] fallback appender --- analytics/pom.xml | 1 - .../analytics/internal/AnalyticsClient.java | 38 +- .../analytics/internal/FallbackAppender.java | 227 +++++ .../analytics/internal/FlushMessage.java | 66 -- .../analytics/internal/StopMessage.java | 66 -- .../com/segment/analytics/AnalyticsTest.java | 144 --- .../com/segment/analytics/SegmentTest.java | 69 +- .../internal/AnalyticsClientTest.java | 921 ------------------ 8 files changed, 280 insertions(+), 1252 deletions(-) create mode 100644 analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java delete mode 100644 analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java delete mode 100644 analytics/src/main/java/com/segment/analytics/internal/StopMessage.java delete mode 100644 analytics/src/test/java/com/segment/analytics/AnalyticsTest.java delete mode 100644 analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java diff --git a/analytics/pom.xml b/analytics/pom.xml index 3f6be8ef..4315a511 100644 --- a/analytics/pom.xml +++ b/analytics/pom.xml @@ -50,7 +50,6 @@ failsafe-retrofit 3.3.2 - junit junit diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 9f9e25b0..124d399a 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -41,7 +41,6 @@ public class AnalyticsClient { private static final Map CONTEXT; private static final int BATCH_MAX_SIZE = 1024 * 500; - private static final int MSG_MAX_SIZE = 1024 * 32; private static final Charset ENCODING = StandardCharsets.UTF_8; private Gson gsonInstance; private static final String instanceId = UUID.randomUUID().toString(); // TODO configurable ? @@ -67,6 +66,7 @@ public class AnalyticsClient { private final AtomicBoolean isShutDown; private final String writeKey; private final FailsafeExecutor> failsafe; + private final FallbackAppender fallback; public static AnalyticsClient create( HttpUrl uploadUrl, @@ -119,10 +119,10 @@ public AnalyticsClient( looperThread.start(); CircuitBreaker> breaker = CircuitBreaker.>builder() - // 2 failure in 5 minute open the circuit - .withFailureThreshold(2, Duration.ofMinutes(5)) - // once open wait 1 minute to be half-open - .withDelay(Duration.ofMinutes(1)) + // 5 failure in 2 minute open the circuit + .withFailureThreshold(5, Duration.ofMinutes(2)) + // once open wait 30 seconds to be half-open + .withDelay(Duration.ofSeconds(30)) // after 1 success the circuit is closed .withSuccessThreshold(1) // 5xx or rate limit is an error @@ -130,9 +130,9 @@ public AnalyticsClient( .build(); RetryPolicy> retry = RetryPolicy.>builder() - .withMaxAttempts(3) + .withMaxAttempts(5) .withBackoff(1, 300, ChronoUnit.SECONDS) - .withJitter(.1) + .withJitter(.2) // retry on IOException .handle(IOException.class) // retry on 5xx or rate limit @@ -146,6 +146,7 @@ public AnalyticsClient( .build(); this.failsafe = Failsafe.with(retry, breaker).with(networkExecutor); + this.fallback = new FallbackAppender(this); } public int messageSizeInBytes(Message message) { @@ -167,13 +168,16 @@ public void enqueue(Message message) { handleError(message); } } + + // FIXME closeable public void shutdown() { if (isShutDown.compareAndSet(false, true)) { final long start = System.currentTimeMillis(); // first let's tell the system to stop looperThread.interrupt(); + fallback.close(); shutdownAndWait(networkExecutor, "network"); @@ -182,10 +186,8 @@ public void shutdown() { } } - public void shutdownAndWait(ExecutorService executor, String name) { + private void shutdownAndWait(ExecutorService executor, String name) { try { - this.looperThread.interrupt(); - executor.shutdown(); final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); @@ -291,14 +293,18 @@ public void run() { } void handleError(Batch batch, Throwable t) { - if(t instanceof CompletionException && t.getCause() instanceof CircuitBreakerOpenException) { - System.err.println("OPEN"); + if(t instanceof CompletionException ) { + if(t.getCause() instanceof CircuitBreakerOpenException) { + System.err.println("OPEN"); + } + } + for(Message msg : batch.batch()) { + fallback.add(msg); } - - System.err.println("" + batch); } - void handleError(Message message) { - System.err.println("" + message); + + void handleError(Message msg) { + fallback.add(msg); } private static boolean is5xx(int status) { diff --git a/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java new file mode 100644 index 00000000..b80bdab4 --- /dev/null +++ b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java @@ -0,0 +1,227 @@ +package com.segment.analytics.internal; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.segment.analytics.gson.AutoValueAdapterFactory; +import com.segment.analytics.gson.ISO8601DateAdapter; +import com.segment.analytics.messages.Message; +import com.segment.analytics.messages.TrackMessage; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +public class FallbackAppender { + + private static final int FLUSH_MS = 100; + private static final int BATCH = 20; + private static final int LASTMESSAGE_RETRY_MS = 10_000; + private static final String PATH = "pending"; + + private final AnalyticsClient client; + private final BlockingQueue queue; + private final File file; + private final Lock lock = new ReentrantLock(); + private final Thread writer; + private final Thread reader; + private final Gson gson; + + private transient long lastMessage; + + public FallbackAppender(AnalyticsClient client) { + this.client = client; + this.file = new File(PATH); + this.queue = new ArrayBlockingQueue(100); + this.writer = new Thread(new FileWriter()); // XXX threadFactory daemon + this.reader = new Thread(new FileReader()); // XXX threadFactory daemon + this.gson = new GsonBuilder() + .registerTypeAdapterFactory(new AutoValueAdapterFactory()) + .registerTypeAdapter(Date.class, new ISO8601DateAdapter()) + .create(); + + file.delete(); // FIXME do not remove on start + + this.lastMessage = System.currentTimeMillis(); + this.writer.start(); + this.reader.start(); + } + + public void close() { + reader.interrupt(); + writer.interrupt(); + } + + // block !!! + public void add(Message msg) { + try { + queue.put(msg); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + class FileReader implements Runnable { + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + if (queue.isEmpty() && System.currentTimeMillis() - lastMessage > LASTMESSAGE_RETRY_MS) { + if (file.length() == 0) { + continue; + } + + List msgs; + lock.lock(); + try { + msgs = read(); + if (msgs.isEmpty()) { + continue; + } + // FIXME now its reading all the msgs and waits until all is processed + // it will be better to work with batch and truncate the file + file.delete(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + lastMessage = System.currentTimeMillis(); + continue; + } finally { + lock.unlock(); + } + + // FIXME batch + while (!msgs.isEmpty()) { + int reenqueued = 0; + boolean canEnqueue = true; + for (int i = msgs.size() - 1; canEnqueue && i >= 0; i--) { + Message msg = msgs.get(i); + canEnqueue = client.offer(msg); + if (canEnqueue) { + msgs.remove(i); + reenqueued++; + } + } + System.err.println("reenqueued " + reenqueued); + try { + Thread.sleep(1_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + lastMessage = System.currentTimeMillis(); + } + + try { + Thread.sleep(1_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + class FileWriter implements Runnable { + @Override + public void run() { + final List batch = new ArrayList<>(); + while (!Thread.currentThread().isInterrupted()) { + try { + final Message msg = queue.poll(FLUSH_MS, TimeUnit.MILLISECONDS); + if (msg == null) { + if (!batch.isEmpty()) { + write(batch); + } + } else { + batch.add(msg); + if (batch.size() >= BATCH) { + write(batch); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (!batch.isEmpty()) { + write(batch); + } + } + } + + List read() throws IOException { + if (file.exists()) { + try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) { + fileChannel.lock(0, Long.MAX_VALUE, true); + + final String[] lines = new String( + Channels.newInputStream(fileChannel).readAllBytes(), StandardCharsets.UTF_8) + .split(System.lineSeparator()); + return Arrays.stream(lines) + .map(m -> fromJson(m)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + } else { + return Collections.emptyList(); + } + } + + private void write(List batch) { + lock.lock(); + try (FileChannel fileChannel = FileChannel.open( + file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE)) { + fileChannel.lock(); + + final String lines = batch.stream() + .map(this::toJson) + .filter(Objects::nonNull) + .collect(Collectors.joining(System.lineSeparator())); + + OutputStream os = Channels.newOutputStream(fileChannel); + os.write(lines.getBytes(StandardCharsets.UTF_8)); + os.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8)); + fileChannel.force(true); + + batch.clear(); + + lastMessage = System.currentTimeMillis(); + } catch (IOException e) { + e.printStackTrace(); // FIXME + } finally { + lock.unlock(); + } + } + + private String toJson(final Message msg) { + try { + return gson.toJson(msg); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + + private Message fromJson(final String msg) { + try { + // FIXME only track + return gson.fromJson(msg, TrackMessage.class); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java b/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java deleted file mode 100644 index b3ee9dc2..00000000 --- a/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.segment.analytics.internal; - -import com.segment.analytics.messages.Message; -import java.util.Date; -import java.util.Map; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -class FlushMessage implements Message { - static final FlushMessage POISON = new FlushMessage(); - - private FlushMessage() {} - - @Nonnull - @Override - public Type type() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public String messageId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Date sentAt() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public Date timestamp() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map context() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String anonymousId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String userId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map integrations() { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() { - return "FlushMessage{}"; - } -} diff --git a/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java b/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java deleted file mode 100644 index eccd278c..00000000 --- a/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.segment.analytics.internal; - -import com.segment.analytics.messages.Message; -import java.util.Date; -import java.util.Map; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -class StopMessage implements Message { - static final StopMessage STOP = new StopMessage(); - - private StopMessage() {} - - @Nonnull - @Override - public Type type() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public String messageId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Date sentAt() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public Date timestamp() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map context() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String anonymousId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String userId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map integrations() { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() { - return "StopMessage{}"; - } -} diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java deleted file mode 100644 index 075ae003..00000000 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java +++ /dev/null @@ -1,144 +0,0 @@ -package com.segment.analytics; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - -import com.segment.analytics.TestUtils.MessageBuilderTest; -import com.segment.analytics.internal.AnalyticsClient; -import com.segment.analytics.messages.Message; -import com.segment.analytics.messages.MessageBuilder; -import com.squareup.burst.BurstJUnit4; -import java.lang.reflect.Field; -import java.time.LocalDateTime; -import java.time.temporal.ChronoUnit; -import java.util.Collections; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; - -@RunWith(BurstJUnit4.class) -public class AnalyticsTest { - @Mock AnalyticsClient client; - @Mock Log log; - @Mock MessageTransformer messageTransformer; - @Mock MessageInterceptor messageInterceptor; - Analytics analytics; - - @Before - public void setUp() { - initMocks(this); - - analytics = - new Analytics( - client, - Collections.singletonList(messageTransformer), - Collections.singletonList(messageInterceptor), - log); - } - - @Test - public void enqueueIsDispatched(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("prateek"); - Message message = messageBuilder.build(); - when(messageTransformer.transform(messageBuilder)).thenReturn(true); - when(messageInterceptor.intercept(any(Message.class))).thenReturn(message); - - analytics.enqueue(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor).intercept(any(Message.class)); - verify(client).enqueue(message); - } - - @Test - public void doesNotEnqueueWhenTransformerReturnsFalse(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("prateek"); - when(messageTransformer.transform(messageBuilder)).thenReturn(false); - - analytics.enqueue(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor, never()).intercept(any(Message.class)); - verify(client, never()).enqueue(any(Message.class)); - } - - @Test - public void doesNotEnqueueWhenInterceptorReturnsNull(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("prateek"); - when(messageTransformer.transform(messageBuilder)).thenReturn(true); - - analytics.enqueue(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor).intercept(any(Message.class)); - verify(client, never()).enqueue(any(Message.class)); - } - - @Test - public void shutdownIsDispatched() { - analytics.shutdown(); - - verify(client).shutdown(); - } - - @Test - public void offerIsDispatched(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("dummy"); - Message message = messageBuilder.build(); - when(messageTransformer.transform(messageBuilder)).thenReturn(true); - when(messageInterceptor.intercept(any(Message.class))).thenReturn(message); - - analytics.offer(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor).intercept(any(Message.class)); - verify(client).offer(message); - } - - @Test - public void threadSafeTest(MessageBuilderTest builder) - throws NoSuchFieldException, IllegalAccessException, InterruptedException { - // we want to test if msgs get lost during a multithreaded env - Analytics analytics = Analytics.builder("testWriteKeyForIssue321").build(); - // So we just want to spy on the client of an Analytics object created normally - Field clientField = analytics.getClass().getDeclaredField("client"); - clientField.setAccessible(true); - AnalyticsClient spy = spy((AnalyticsClient) clientField.get(analytics)); - clientField.set(analytics, spy); - - // we are going to run this test for a specific amount of seconds - int millisRunning = 200; - LocalDateTime initialTime = LocalDateTime.now(); - LocalDateTime now; - - // and a set number of threads will be using the library - ExecutorService service = Executors.newFixedThreadPool(20); - AtomicInteger counter = new AtomicInteger(); - - MessageBuilder messageBuilder = builder.get().userId("jorgen25"); - - do { - service.submit( - () -> { - analytics.enqueue(messageBuilder); - counter.incrementAndGet(); - }); - now = LocalDateTime.now(); - } while (initialTime.until(now, ChronoUnit.MILLIS) < millisRunning); - - service.shutdown(); - service.awaitTermination(5, TimeUnit.SECONDS); - - verify(spy, times(counter.get())).enqueue(any(Message.class)); - } -} diff --git a/analytics/src/test/java/com/segment/analytics/SegmentTest.java b/analytics/src/test/java/com/segment/analytics/SegmentTest.java index 864f9d51..be00286a 100644 --- a/analytics/src/test/java/com/segment/analytics/SegmentTest.java +++ b/analytics/src/test/java/com/segment/analytics/SegmentTest.java @@ -9,19 +9,16 @@ import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.junit.WireMockRule; import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.segment.analytics.gson.AutoValueAdapterFactory; -import com.segment.analytics.gson.ISO8601DateAdapter; import com.segment.analytics.messages.TrackMessage; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.awaitility.Awaitility; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -42,68 +39,59 @@ public class SegmentTest { Analytics analytics; - GsonBuilder gsonBuilder = new GsonBuilder() - .registerTypeAdapterFactory(new AutoValueAdapterFactory()) - .registerTypeAdapter(Date.class, new ISO8601DateAdapter()); - - Gson gson = gsonBuilder.create(); - @Before - public void confWireMock() { + public void confWireMockAndClient() { stubFor(post(urlEqualTo("/v1/import/")).willReturn(okJson("{\"success\": \"true\"}"))); analytics = Analytics.builder("write-key") .endpoint(wireMockRule.baseUrl()) - // .endpoint("http://localhost:8888") .flushInterval(1, TimeUnit.SECONDS) .flushQueueSize(20) .queueCapacity(50) - // callback // http client .build(); } + @After + public void tearDown() { + analytics.shutdown(); + } + @Test public void test() throws Throwable { stubFor(post(urlEqualTo("/v1/import/")) - .willReturn(WireMock.aResponse().withStatus(503).withBody("fail"))); + .willReturn( + WireMock.aResponse().withStatus(503).withBody("fail").withUniformRandomDelay(100, 1_000))); long start = System.currentTimeMillis(); boolean upAgain = false; int id = 0; + List ids = new ArrayList<>(); RateLimiter rate = RateLimiter.create(5); - while (true) { + while (System.currentTimeMillis() - start < 60_000) { if (rate.tryAcquire()) { - System.err.println("id " + id); + String msgid = "m" + id++; + ids.add(msgid); analytics.enqueue( - TrackMessage.builder("my-track").messageId("m" + id++).userId("userId")); + TrackMessage.builder("my-track").messageId(msgid).userId("userId")); + System.err.println("enqued " + msgid); } + Thread.sleep(50); - if (!upAgain && System.currentTimeMillis() - start > 120_000) { + if (!upAgain && System.currentTimeMillis() - start > 20_000) { upAgain = true; - stubFor(post(urlEqualTo("/v1/import/")).willReturn(okJson("{\"success\": \"true\"}"))); + stubFor(post(urlEqualTo("/v1/import/")) + .willReturn(okJson("{\"success\": \"true\"}").withUniformRandomDelay(100, 1_000))); System.err.println("UP AGAIN"); } } - // analytics.enqueue(TrackMessage.builder("my-track").messageId("m2").userId("userId")); - // Awaitility.await().until(() -> sentMessagesEqualsTo("m1", "m2")); - } - - @Test - public void testMore() throws Throwable { - System.err.println("wm at " + wireMockRule.baseUrl()); - int num = 100_000; - String[] expectedIds = new String[num]; - for (int i = 0; i < num; i++) { - String id = "m" + i; - expectedIds[i] = id; - analytics.enqueue(TrackMessage.builder("my-track").messageId(id).userId("userId")); - } - - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> sentMessagesEqualsTo(expectedIds)); + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> sentMessagesEqualsTo(ids.toArray(new String[ids.size()]))); } private static final ObjectMapper OM = new ObjectMapper(); @@ -112,9 +100,13 @@ private boolean sentMessagesEqualsTo(String... msgIds) { return new HashSet<>(sentMessages()).equals(new HashSet<>(Arrays.asList(msgIds))); } - private List sentMessages() { - List messageIds = new ArrayList<>(); + private Set sentMessages() { + Set messageIds = new HashSet<>(); for (ServeEvent event : wireMockRule.getAllServeEvents()) { + if (event.getResponse().getStatus() != 200) { + continue; + } + JsonNode batch; try { JsonNode json = OM.readTree(event.getRequest().getBodyAsString()); @@ -130,6 +122,7 @@ private List sentMessages() { messageIds.add(msgs.next().get("messageId").asText()); } } + System.err.println("Confirmed msgs : " + messageIds.size()); return messageIds; } } diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java deleted file mode 100644 index a06bd808..00000000 --- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java +++ /dev/null @@ -1,921 +0,0 @@ -package com.segment.analytics.internal; - -import static com.segment.analytics.internal.FlushMessage.POISON; -import static com.segment.analytics.internal.StopMessage.STOP; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.openMocks; - -import com.google.gson.Gson; -import com.segment.analytics.Log; -import com.segment.analytics.TestUtils.MessageBuilderTest; -import com.segment.analytics.http.SegmentService; -import com.segment.analytics.http.UploadResponse; -import com.segment.analytics.internal.AnalyticsClient.BatchUploadTask; -import com.segment.analytics.messages.Batch; -import com.segment.analytics.messages.Message; -import com.segment.analytics.messages.TrackMessage; -import com.segment.backo.Backo; -import com.squareup.burst.BurstJUnit4; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Queue; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import okhttp3.ResponseBody; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.Spy; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import retrofit2.Call; -import retrofit2.Response; -import retrofit2.mock.Calls; - -@RunWith(BurstJUnit4.class) // -public class AnalyticsClientTest { - // Backo instance for testing which trims down the wait times. - private static final Backo BACKO = - Backo.builder().base(TimeUnit.NANOSECONDS, 1).factor(1).build(); - - private int DEFAULT_RETRIES = 10; - private int MAX_BATCH_SIZE = 1024 * 500; // 500kb - private int MAX_MSG_SIZE = 1024 * 32; // 32kb //This is the limit for a message object - private int MSG_MAX_CREATE_SIZE = - MAX_MSG_SIZE - - 200; // Once we create msg object with this size it barely below 32 threshold so good - // for tests - private static String writeKey = "writeKey"; - - Log log = Log.NONE; - - ThreadFactory threadFactory; - @Spy LinkedBlockingQueue messageQueue; - @Spy LinkedBlockingQueue pendingQueue; - @Mock SegmentService segmentService; - @Mock ExecutorService networkExecutor; - @Mock UploadResponse response; - - AtomicBoolean isShutDown; - - @Before - public void setUp() { - openMocks(this); - - isShutDown = new AtomicBoolean(false); - threadFactory = Executors.defaultThreadFactory(); - } - - // Defers loading the client until tests can initialize all required - // dependencies. - AnalyticsClient newClient() { - return new AnalyticsClient( - messageQueue, - pendingQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE, - log, - threadFactory, - networkExecutor, - isShutDown, - writeKey, - new Gson()); - } - - @Test - public void enqueueAddsToQueue(MessageBuilderTest builder) throws InterruptedException { - AnalyticsClient client = newClient(); - - Message message = builder.get().userId("prateek").build(); - client.enqueue(message); - - verify(messageQueue).put(message); - } - - @Test - public void shutdown() throws InterruptedException { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - client.shutdown(); - - verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); - } - - /** Wait until the queue is drained. */ - static void wait(Queue queue) { - // noinspection StatementWithEmptyBody - while (queue.size() > 0) {} - } - - /** - * Verify that a {@link BatchUploadTask} was submitted to the executor, and return the {@link - * BatchUploadTask#batch} it was uploading.. - */ - static Batch captureBatch(ExecutorService executor) { - final ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(executor, timeout(1000)).submit(runnableArgumentCaptor.capture()); - final BatchUploadTask task = (BatchUploadTask) runnableArgumentCaptor.getValue(); - return task.batch; - } - - private static String generateDataOfSize(int msgSize) { - char[] chars = new char[msgSize]; - Arrays.fill(chars, 'a'); - - return new String(chars); - } - - private static String generateDataOfSizeSpecialChars( - int sizeInBytes, boolean slightlyBelowLimit) { - StringBuilder builder = new StringBuilder(); - Character[] specialChars = new Character[] {'$', '¢', 'ह', '€', '한', '©', '¶'}; - int currentSize = 0; - String smileyFace = "\uD83D\uDE01"; - // 😁 = '\uD83D\uDE01'; - Random rand = new Random(); - int loopCount = 1; - while (currentSize < sizeInBytes) { - int randomNum; - // decide if regular/special character - if (loopCount > 3 && loopCount % 4 == 0) { - randomNum = rand.nextInt(((specialChars.length - 1) - 0) + 1) + 0; - builder.append(specialChars[randomNum]); - } else if (loopCount > 9 && loopCount % 10 == 0) { - builder.append(smileyFace); - } else { - // random letter from a - z - randomNum = rand.nextInt(('z' - 'a') + 1) + 'a'; - builder.append((char) randomNum); - } - - // check size so far - String temp = builder.toString(); - currentSize = temp.getBytes(StandardCharsets.UTF_8).length; - if (slightlyBelowLimit && ((sizeInBytes - currentSize) < 500)) { - break; - } - loopCount++; - } - return builder.toString(); - } - - @Test - public void enqueueMaxTriggersFlush() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - // Enqueuing 51 messages (> 50) should trigger flush. - for (int i = 0; i < 51; i++) { - client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); - } - wait(messageQueue); - - // Verify that the executor saw the batch. - assertThat(captureBatch(networkExecutor).batch()).hasSize(50); - } - - @Test - public void shouldBeAbleToCalculateMessageSize() { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSize(1024 * 33)); - - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - // can't test for exact size cause other attributes come in play - assertThat(client.messageSizeInBytes(bigMessage)).isGreaterThan(1024 * 33); - } - - @Test - public void dontFlushUntilReachesMaxSize() throws InterruptedException { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - - properties.put("property2", generateDataOfSize(MAX_BATCH_SIZE - 200)); - - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - // throw new InterruptedException(e.getMessage()); - } - - wait(messageQueue); - - verify(networkExecutor, never()).submit(any(Runnable.class)); - } - - /** - * Modified this test case since we are changing logic to NOT allow messages bigger than 32 kbs - * individually to be enqueued, hence had to lower the size of the generated msg here. chose - * MSG_MAX_CREATE_SIZE because it will generate a message just below the limit of 32 kb after it - * creates a Message object modified the number of events that will be created since the batch - * creation logic was also changed to not allow batches larger than 500 kb meaning every 15/16 - * events the queue will be backPressured and poisoned/flushed (3 times) (purpose of test) AND - * there will be 4 batches submitted (15 msgs, 1 msg, 15 msg, 15 msg) so purpose of test case - * stands - * - * @throws InterruptedException - */ - @Test - public void flushHowManyTimesNecessaryToStayWithinLimit() throws InterruptedException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - pendingQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - - properties.put("property3", generateDataOfSize(MSG_MAX_CREATE_SIZE)); - - for (int i = 0; i < 46; i++) { - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - client.enqueue(bigMessage); - verify(messageQueue).put(bigMessage); - } - - wait(messageQueue); - /** - * modified from expected 4 to expected 3 times, since we removed the inner loop. The inner loop - * was forcing to message list created from the queue to keep making batches even if its a 1 - * message batch until the message list is empty, that was forcing the code to make one last - * batch of 1 msg in size bumping the number of times a batch would be submitted from 3 to 4 - */ - verify(networkExecutor, times(3)).submit(any(Runnable.class)); - } - - /** - * Had to slightly change test case since we are now modifying the logic to NOT allow messages - * above 32 KB in size So needed to change size of generated msg to MSG_MAX_CREATE_SIZE to keep - * purpose of test case intact which is to test the scenario for several messages eventually - * filling up the queue and flushing. Batches submitted will change from 1 to 2 because the queue - * will be backpressured at 16 (at this point queue is over the 500KB batch limit so its flushed - * and when batch is created 16 will be above 500kbs limit so it creates one batch for 15 msg and - * another one for the remaining single message so 500kb limit per batch is not violated - * - * @throws InterruptedException - */ - @Test - public void flushWhenMultipleMessagesReachesMaxSize() throws InterruptedException { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - properties.put("property3", generateDataOfSize(MSG_MAX_CREATE_SIZE)); - - for (int i = 0; i < 16; i++) { - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - client.enqueue(bigMessage); - } - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - verify(networkExecutor, times(2)).submit(any(Runnable.class)); - } - - @Test - public void enqueueBeforeMaxDoesNotTriggerFlush() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - // Enqueuing 5 messages (< 50) should not trigger flush. - for (int i = 0; i < 5; i++) { - client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); - } - wait(messageQueue); - - // Verify that the executor didn't see anything. - verify(networkExecutor, never()).submit(any(Runnable.class)); - } - - static Batch batchFor(Message message) { - return Batch.create( - Collections.emptyMap(), Collections.singletonList(message), writeKey); - } - - @Test - public void batchRetriesForNetworkErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - Response successResponse = Response.success(200, response); - Response failureResponse = Response.error(429, ResponseBody.create(null, "")); - - // Throw a network error 3 times. - when(segmentService.upload(null, batch)) - .thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); - //// verify(callback).success(trackMessage); - } - - @Test - public void batchRetriesForHTTP5xxErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error 3 times. - - Response successResponse = Response.success(200, response); - Response failResponse = - Response.error(500, ResponseBody.create(null, "Server Error")); - when(segmentService.upload(null, batch)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); - // verify(callback).success(trackMessage); - } - - @Test - public void batchRetriesForHTTP429Errors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error 3 times. - Response successResponse = Response.success(200, response); - Response failResponse = - Response.error(429, ResponseBody.create(null, "Rate Limited")); - when(segmentService.upload(null, batch)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); - // verify(callback).success(trackMessage); - } - - @Test - public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error that should not be retried. - Response failResponse = - Response.error(404, ResponseBody.create(null, "Not Found")); - when(segmentService.upload(null, batch)).thenReturn(Calls.response(failResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify we only tried to upload once. - verify(segmentService).upload(null, batch); - // verify(callback).failure(eq(trackMessage), any(IOException.class)); - } - - @Test - public void batchDoesNotRetryForNonNetworkErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - Call networkFailure = Calls.failure(new RuntimeException()); - when(segmentService.upload(null, batch)).thenReturn(networkFailure); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify we only tried to upload once. - verify(segmentService).upload(null, batch); - // verify(callback).failure(eq(trackMessage), any(RuntimeException.class)); - } - - @Test - public void givesUpAfterMaxRetries() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(null, batch)) - .thenAnswer( - new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = - Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10); - batchUploadTask.run(); - - // DEFAULT_RETRIES == maxRetries - // tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java - verify(segmentService, times(11)).upload(null, batch); - // verify(callback) - // .failure( - // eq(trackMessage), - // argThat( - // new ArgumentMatcher() { - // @Override - // public boolean matches(IOException exception) { - // return exception.getMessage().equals("11 retries exhausted"); - // } - // })); - } - - @Test - public void hasDefaultRetriesSetTo3() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(null, batch)) - .thenAnswer( - new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = - Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 3); - batchUploadTask.run(); - - // DEFAULT_RETRIES == maxRetries - // tries 11(one normal run + 10 retries) - verify(segmentService, times(4)).upload(null, batch); - // verify(callback) - // .failure( - // eq(trackMessage), - // argThat( - // new ArgumentMatcher() { - // @Override - // public boolean matches(IOException exception) { - // return exception.getMessage().equals("4 retries exhausted"); - // } - // })); - } - - @Test - public void enqueueWithRegularMessageWhenNotShutdown(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient client = newClient(); - - final Message message = builder.get().userId("foo").build(); - client.enqueue(message); - - verify(messageQueue).put(message); - } - - @Test - public void enqueueWithRegularMessageWhenShutdown(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.enqueue(builder.get().userId("foo").build()); - - verify(messageQueue, times(0)).put(any(Message.class)); - } - - @Test - public void enqueueWithStopMessageWhenShutdown() throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.enqueue(STOP); - - verify(messageQueue).put(STOP); - } - - @Test - public void shutdownWhenAlreadyShutDown() throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.shutdown(); - - verify(messageQueue, times(0)).put(any(Message.class)); - // verifyNoInteractions(networkExecutor, callback, segmentService); - } - - @Test - public void shutdownWithNoMessageInTheQueue() throws InterruptedException { - AnalyticsClient client = newClient(); - client.shutdown(); - - verify(messageQueue).put(STOP); - verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); - verifyNoMoreInteractions(networkExecutor); - } - - @Test - public void shutdownWithMessagesInTheQueue(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient client = newClient(); - - client.enqueue(builder.get().userId("foo").build()); - client.shutdown(); - - verify(messageQueue).put(STOP); - verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); - verify(networkExecutor).submit(any(AnalyticsClient.BatchUploadTask.class)); - } - - @Test - public void neverRetries() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(null, batch)) - .thenAnswer( - new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = - Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 0); - batchUploadTask.run(); - - // runs once but never retries - verify(segmentService, times(1)).upload(null, batch); - // verify(callback) - // .failure( - // eq(trackMessage), - // argThat( - // new ArgumentMatcher() { - // @Override - // public boolean matches(IOException exception) { - // return exception.getMessage().equals("1 retries exhausted"); - // } - // })); - } - - /** - * ********************************************************************************************** - * Test cases for Size check - * ********************************************************************************************* - */ - - /** Individual Size check happy path regular chars */ - @Test - public void checkForIndividualMessageSizeLessThanLimit() { - AnalyticsClient client = newClient(); - int msgSize = 1024 * 31; // 31KB - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSize(msgSize)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - client.enqueue(bigMessage); - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isLessThanOrEqualTo(sizeLimit); - } - - /** Individual Size check sad path regular chars (over the limit) */ - @Test - public void checkForIndividualMessageSizeOverLimit() throws IllegalArgumentException { - AnalyticsClient client = newClient(); - int msgSize = MAX_MSG_SIZE + 1; // BARELY over the limit - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSize(msgSize)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isGreaterThan(sizeLimit); - } - - /** Individual Size check happy path special chars */ - @Test - public void checkForIndividualMessageSizeSpecialCharsLessThanLimit() { - AnalyticsClient client = newClient(); - int msgSize = MAX_MSG_SIZE; // 32KB - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - - Map properties = new HashMap(); - properties.put("property1", generateDataOfSizeSpecialChars(msgSize, true)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - client.enqueue(bigMessage); - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isLessThanOrEqualTo(sizeLimit); - } - - /** Individual Size check sad path special chars (over the limit) */ - @Test - public void checkForIndividualMessageSizeSpecialCharsAboveLimit() { - AnalyticsClient client = newClient(); - int msgSize = MAX_MSG_SIZE; // 32KB - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSizeSpecialChars(msgSize, false)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isGreaterThan(sizeLimit); - } - - /** - * ***************************************************************************************************************** - * Test cases for enqueue modified logic - * *************************************************************************************************************** - */ - @Test - public void enqueueVerifyPoisonIsNotCheckedForSize() throws InterruptedException { - AnalyticsClient clientSpy = spy(newClient()); - - clientSpy.enqueue(POISON); - verify(messageQueue).put(POISON); - verify(clientSpy, never()).messageSizeInBytes(POISON); - } - - @Test - public void enqueueVerifyStopIsNotCheckedForSize() throws InterruptedException { - AnalyticsClient clientSpy = spy(newClient()); - - clientSpy.enqueue(STOP); - verify(messageQueue).put(STOP); - verify(clientSpy, never()).messageSizeInBytes(STOP); - } - - @Test - public void enqueueVerifyRegularMessageIsEnqueuedAndCheckedForSize(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient clientSpy = spy(newClient()); - - Message message = builder.get().userId("jorgen25").build(); - clientSpy.enqueue(message); - verify(messageQueue).put(message); - verify(clientSpy, times(1)).messageSizeInBytes(message); - } - - /** - * This test case was to prove the limit in batch is not being respected so will probably delete - * it later NOTE: Used to be a test case created to prove huge messages above the limit are still - * being submitted in batch modified it to prove they are not anymore after changing logic in - * analyticsClient - * - * @param builder - * @throws InterruptedException - */ - @Test - public void enqueueSingleMessageAboveLimitWhenNotShutdown(MessageBuilderTest builder) - throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = newClient(); - - // Message is above batch limit - final String massData = generateDataOfSizeSpecialChars(MAX_MSG_SIZE, false); - Map integrationOpts = new HashMap<>(); - integrationOpts.put("massData", massData); - Message message = - builder.get().userId("foo").integrationOptions("someKey", integrationOpts).build(); - - try { - client.enqueue(message); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - wait(messageQueue); - - // Message is above MSG/BATCH size limit so it should not be put in queue - verify(messageQueue, never()).put(message); - // And since it was never in the queue, it was never submitted in batch - verify(networkExecutor, never()).submit(any(AnalyticsClient.BatchUploadTask.class)); - } - - @Test - public void enqueueVerifyRegularMessagesSpecialCharactersBelowLimit(MessageBuilderTest builder) - throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = newClient(); - int msgSize = 1024 * 18; // 18KB - - for (int i = 0; i < 2; i++) { - final String data = generateDataOfSizeSpecialChars(msgSize, true); - Map integrationOpts = new HashMap<>(); - integrationOpts.put("data", data); - Message message = - builder.get().userId("jorgen25").integrationOptions("someKey", integrationOpts).build(); - client.enqueue(message); - verify(messageQueue).put(message); - } - client.enqueue(POISON); - verify(messageQueue).put(POISON); - - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - - verify(networkExecutor, times(1)).submit(any(AnalyticsClient.BatchUploadTask.class)); - } - - /** - * ****************************************************************************************************************** - * Test cases for Batch creation logic - * **************************************************************************************************************** - */ - - /** - * Several messages are enqueued and then submitted in a batch - * - * @throws InterruptedException - */ - @Test - public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - pendingQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - properties.put("property3", generateDataOfSizeSpecialChars(((int) (MAX_MSG_SIZE * 0.9)), true)); - - for (int i = 0; i < 15; i++) { - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("jorgen25").properties(properties).build(); - client.enqueue(bigMessage); - verify(messageQueue).put(bigMessage); - } - client.enqueue(POISON); - wait(messageQueue); - - client.shutdown(); - while (!isShutDown.get()) {} - verify(networkExecutor, times(1)).submit(any(Runnable.class)); - } - - /** - * Enqueued several messages above threshold of 500Kbs so queue gets backpressured at some point - * and several batches have to be created to not violate threshold - * - * @throws InterruptedException - */ - @Test - public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - pendingQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - properties.put("property3", generateDataOfSizeSpecialChars(MAX_MSG_SIZE, true)); - - for (int i = 0; i < 100; i++) { - TrackMessage message = - TrackMessage.builder("Big Event").userId("jorgen25").properties(properties).build(); - client.enqueue(message); - verify(messageQueue).put(message); - } - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - - verify(networkExecutor, times(8)).submit(any(Runnable.class)); - } - - @Test - public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - pendingQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - properties.put("property3", generateDataOfSizeSpecialChars(1024 * 8, true)); - - for (int i = 0; i < 600; i++) { - TrackMessage message = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - client.enqueue(message); - verify(messageQueue).put(message); - } - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - - verify(networkExecutor, times(21)).submit(any(Runnable.class)); - } -} From ab780a4649d0885bf2cb1e5f82e984e90aaf87c1 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Tue, 25 Mar 2025 18:35:08 +0100 Subject: [PATCH 08/13] values --- .../analytics/internal/AnalyticsClient.java | 16 ++++---- .../analytics/internal/FallbackAppender.java | 5 +-- .../com/segment/analytics/SegmentTest.java | 37 +++++++++++++------ 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 124d399a..fc31cc0d 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -119,14 +119,17 @@ public AnalyticsClient( looperThread.start(); CircuitBreaker> breaker = CircuitBreaker.>builder() - // 5 failure in 2 minute open the circuit - .withFailureThreshold(5, Duration.ofMinutes(2)) + // 10 failure in 2 minute open the circuit + .withFailureThreshold(10, Duration.ofMinutes(2)) // once open wait 30 seconds to be half-open .withDelay(Duration.ofSeconds(30)) // after 1 success the circuit is closed .withSuccessThreshold(1) // 5xx or rate limit is an error .handleResultIf(response -> is5xx(response.code()) || response.code() == 429) + .onOpen(el -> System.err.println("***\nOPEN\n***")) + .onHalfOpen(el -> System.err.println("***\nHALF OPEN\n***")) + .onClose(el -> System.err.println("***\nCLOSED\n***")) .build(); RetryPolicy> retry = RetryPolicy.>builder() @@ -137,12 +140,6 @@ public AnalyticsClient( .handle(IOException.class) // retry on 5xx or rate limit .handleResultIf(response -> is5xx(response.code()) || response.code() == 429) - .onRetriesExceeded(context -> { - throw new RuntimeException("retries"); - }) - .onAbort(context -> { - throw new RuntimeException("aborted"); - }) .build(); this.failsafe = Failsafe.with(retry, breaker).with(networkExecutor); @@ -167,6 +164,9 @@ public void enqueue(Message message) { if (!messageQueue.offer(message)) { handleError(message); } + else { + System.err.println("enqueued " + message.messageId()); + } } diff --git a/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java index b80bdab4..2ee0ee10 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java +++ b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java @@ -69,6 +69,7 @@ public void close() { // block !!! public void add(Message msg) { try { + System.err.println("failed " + msg.messageId()); queue.put(msg); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -105,17 +106,15 @@ public void run() { // FIXME batch while (!msgs.isEmpty()) { - int reenqueued = 0; boolean canEnqueue = true; for (int i = msgs.size() - 1; canEnqueue && i >= 0; i--) { Message msg = msgs.get(i); canEnqueue = client.offer(msg); if (canEnqueue) { msgs.remove(i); - reenqueued++; + System.err.println("reenqueued " + msg.messageId()); } } - System.err.println("reenqueued " + reenqueued); try { Thread.sleep(1_000); } catch (InterruptedException e) { diff --git a/analytics/src/test/java/com/segment/analytics/SegmentTest.java b/analytics/src/test/java/com/segment/analytics/SegmentTest.java index be00286a..26474649 100644 --- a/analytics/src/test/java/com/segment/analytics/SegmentTest.java +++ b/analytics/src/test/java/com/segment/analytics/SegmentTest.java @@ -16,7 +16,10 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; @@ -64,23 +67,30 @@ public void test() throws Throwable { .willReturn( WireMock.aResponse().withStatus(503).withBody("fail").withUniformRandomDelay(100, 1_000))); + int requestsPerSecond = 10; + int numClients = 10; + int timeToRun = 90_000; + int timeToRestore = 30_000; + long start = System.currentTimeMillis(); boolean upAgain = false; - int id = 0; + final AtomicInteger id = new AtomicInteger(0); List ids = new ArrayList<>(); - RateLimiter rate = RateLimiter.create(5); - while (System.currentTimeMillis() - start < 60_000) { + + RateLimiter rate = RateLimiter.create(requestsPerSecond); + ExecutorService exec = Executors.newWorkStealingPool(numClients); + + while (System.currentTimeMillis() - start < timeToRun) { if (rate.tryAcquire()) { - String msgid = "m" + id++; - ids.add(msgid); - analytics.enqueue( - TrackMessage.builder("my-track").messageId(msgid).userId("userId")); - System.err.println("enqued " + msgid); + exec.submit(() -> { + String msgid = "m" + id.getAndIncrement(); + ids.add(msgid); + analytics.enqueue( + TrackMessage.builder("my-track").messageId(msgid).userId("userId")); + }); } - - Thread.sleep(50); - - if (!upAgain && System.currentTimeMillis() - start > 20_000) { + Thread.sleep(1); + if (!upAgain && System.currentTimeMillis() - start > timeToRestore) { upAgain = true; stubFor(post(urlEqualTo("/v1/import/")) .willReturn(okJson("{\"success\": \"true\"}").withUniformRandomDelay(100, 1_000))); @@ -92,6 +102,9 @@ public void test() throws Throwable { .atMost(10, TimeUnit.MINUTES) .pollInterval(1, TimeUnit.SECONDS) .until(() -> sentMessagesEqualsTo(ids.toArray(new String[ids.size()]))); + + exec.shutdownNow(); + exec.awaitTermination(10, TimeUnit.SECONDS); } private static final ObjectMapper OM = new ObjectMapper(); From 2d06abfc5c80c6d0e60fcf1bc52481b3e16eae37 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Wed, 26 Mar 2025 14:37:16 +0100 Subject: [PATCH 09/13] doc --- ARQUITECTURE.md | 60 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 ARQUITECTURE.md diff --git a/ARQUITECTURE.md b/ARQUITECTURE.md new file mode 100644 index 00000000..e06d772f --- /dev/null +++ b/ARQUITECTURE.md @@ -0,0 +1,60 @@ +```mermaid + +sequenceDiagram + box Data Consolidation + participant ConsolidationService + end + + participant SegmentClient + box HTTP + participant QueueHttp + participant LooperHttp + participant SegmentAPI + end + box File + participant QueueFile + participant WriteFile + participant File + participant WatchFile + end + + activate ConsolidationService + ConsolidationService->>+SegmentClient: enqueue + SegmentClient<<->>QueueHttp: offer + alt QueueHttp overflow + SegmentClient<<->>QueueFile: put + end + SegmentClient->>-ConsolidationService: + deactivate ConsolidationService + + loop consume QueueHttp + LooperHttp->>QueueHttp:take + activate LooperHttp + end + LooperHttp->>SegmentAPI: batchUpload + note over LooperHttp,SegmentAPI: Batch + note over LooperHttp,SegmentAPI: CircuitBreaker and Retry + note over LooperHttp,SegmentAPI: HTTP requests submited to a pool + deactivate LooperHttp + + alt retry exhausted or circuit open + note over LooperHttp: pool threads + LooperHttp->>QueueFile: put + end + + loop consume QueueFile + WriteFile->>QueueFile:take + activate WriteFile + end + WriteFile->>File: write + note over WriteFile: Batch and save file + deactivate WriteFile + + note over WatchFile: check last written + activate WatchFile + loop watch QueueFile + WatchFile->>File: read and remove + WatchFile->>QueueHttp: offer + end + deactivate WatchFile +``` From f73e156d013c4eb13faab4433dce04eaa40f1e30 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Mon, 31 Mar 2025 09:37:31 +0200 Subject: [PATCH 10/13] stole ReversedLinesFileReader at commons-io:2.18.0 --- analytics/pom.xml | 5 + .../internal/ReversedLinesFileReader.java | 541 ++++++++++++++++++ 2 files changed, 546 insertions(+) create mode 100644 analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java diff --git a/analytics/pom.xml b/analytics/pom.xml index 4315a511..8e61a10a 100644 --- a/analytics/pom.xml +++ b/analytics/pom.xml @@ -50,6 +50,11 @@ failsafe-retrofit 3.3.2 + + commons-io + commons-io + 2.18.0 + junit junit diff --git a/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java b/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java new file mode 100644 index 00000000..8ff1b35a --- /dev/null +++ b/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.segment.analytics.internal; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.FileSystem; +import org.apache.commons.io.StandardLineSeparator; +import org.apache.commons.io.build.AbstractStreamBuilder; + +/** + * Reads lines in a file reversely (similar to a BufferedReader, but starting at the last line). Useful for e.g. searching in log files. + *

+ * To build an instance, use {@link Builder}. + *

+ * + * @see Builder + * @since 2.2 + */ +public class ReversedLinesFileReader implements Closeable { + + // @formatter:off + /** + * Builds a new {@link ReversedLinesFileReader}. + * + *

+ * For example: + *

+ *
{@code
+     * ReversedLinesFileReader r = ReversedLinesFileReader.builder()
+     *   .setPath(path)
+     *   .setBufferSize(4096)
+     *   .setCharset(StandardCharsets.UTF_8)
+     *   .get();}
+     * 
+ * + * @see #get() + * @since 2.12.0 + */ + // @formatter:on + public static class Builder extends AbstractStreamBuilder { + + /** + * Constructs a new {@link Builder}. + */ + public Builder() { + setBufferSizeDefault(DEFAULT_BLOCK_SIZE); + setBufferSize(DEFAULT_BLOCK_SIZE); + } + + /** + * Builds a new {@link ReversedLinesFileReader}. + *

+ * You must set input that supports {@link #getInputStream()} on this builder, otherwise, this method throws an exception. + *

+ *

+ * This builder use the following aspects: + *

+ *
    + *
  • {@link #getInputStream()}
  • + *
  • {@link #getBufferSize()}
  • + *
  • {@link #getCharset()}
  • + *
+ * + * @return a new instance. + * @throws IllegalStateException if the {@code origin} is {@code null}. + * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}. + * @throws IOException if an I/O error occurs. + * @see #getPath() + * @see #getBufferSize() + * @see #getCharset() + */ + @Override + public ReversedLinesFileReader get() throws IOException { + return new ReversedLinesFileReader(getPath(), getBufferSize(), getCharset()); + } + } + + private final class FilePart { + private final long no; + + private final byte[] data; + + private byte[] leftOver; + + private int currentLastBytePos; + + /** + * Constructs a new instance. + * + * @param no the part number + * @param length its length + * @param leftOverOfLastFilePart remainder + * @throws IOException if there is a problem reading the file + */ + private FilePart(final long no, final int length, final byte[] leftOverOfLastFilePart) throws IOException { + this.no = no; + final int dataLength = length + (leftOverOfLastFilePart != null ? leftOverOfLastFilePart.length : 0); + this.data = new byte[dataLength]; + final long off = (no - 1) * blockSize; + + // read data + if (no > 0 /* file not empty */) { + channel.position(off); + final int countRead = channel.read(ByteBuffer.wrap(data, 0, length)); + if (countRead != length) { + throw new IllegalStateException("Count of requested bytes and actually read bytes don't match"); + } + } + // copy left over part into data arr + if (leftOverOfLastFilePart != null) { + System.arraycopy(leftOverOfLastFilePart, 0, data, length, leftOverOfLastFilePart.length); + } + this.currentLastBytePos = data.length - 1; + this.leftOver = null; + } + + /** + * Constructs the buffer containing any leftover bytes. + */ + private void createLeftOver() { + final int lineLengthBytes = currentLastBytePos + 1; + if (lineLengthBytes > 0) { + // create left over for next block + leftOver = Arrays.copyOf(data, lineLengthBytes); + } else { + leftOver = null; + } + currentLastBytePos = -1; + } + + /** + * Finds the new-line sequence and return its length. + * + * @param data buffer to scan + * @param i start offset in buffer + * @return length of newline sequence or 0 if none found + */ + private int getNewLineMatchByteCount(final byte[] data, final int i) { + for (final byte[] newLineSequence : newLineSequences) { + boolean match = true; + for (int j = newLineSequence.length - 1; j >= 0; j--) { + final int k = i + j - (newLineSequence.length - 1); + match &= k >= 0 && data[k] == newLineSequence[j]; + } + if (match) { + return newLineSequence.length; + } + } + return 0; + } + + /** + * Reads a line. + * + * @return the line or null + */ + private String readLine() { // NOPMD Bug in PMD + + String line = null; + int newLineMatchByteCount; + + final boolean isLastFilePart = no == 1; + + int i = currentLastBytePos; + while (i > -1) { + + if (!isLastFilePart && i < avoidNewlineSplitBufferSize) { + // avoidNewlineSplitBuffer: for all except the last file part we + // take a few bytes to the next file part to avoid splitting of newlines + createLeftOver(); + break; // skip last few bytes and leave it to the next file part + } + + // check for newline + if ((newLineMatchByteCount = getNewLineMatchByteCount(data, i)) > 0 /* found newline */) { + final int lineStart = i + 1; + final int lineLengthBytes = currentLastBytePos - lineStart + 1; + + if (lineLengthBytes < 0) { + throw new IllegalStateException("Unexpected negative line length=" + lineLengthBytes); + } + final byte[] lineData = Arrays.copyOfRange(data, lineStart, lineStart + lineLengthBytes); + + line = new String(lineData, charset); + + currentLastBytePos = i - newLineMatchByteCount; + break; // found line + } + + // move cursor + i -= byteDecrement; + + // end of file part handling + if (i < 0) { + createLeftOver(); + break; // end of file part + } + } + + // last file part handling + if (isLastFilePart && leftOver != null) { + // there will be no line break anymore, this is the first line of the file + line = new String(leftOver, charset); + leftOver = null; + } + + return line; + } + + /** + * Handles block rollover + * + * @return the new FilePart or null + * @throws IOException if there was a problem reading the file + */ + private FilePart rollOver() throws IOException { + + if (currentLastBytePos > -1) { + throw new IllegalStateException("Current currentLastCharPos unexpectedly positive... " + + "last readLine() should have returned something! currentLastCharPos=" + currentLastBytePos); + } + + if (no > 1) { + return new FilePart(no - 1, blockSize, leftOver); + } + // NO 1 was the last FilePart, we're finished + if (leftOver != null) { + throw new IllegalStateException("Unexpected leftover of the last block: leftOverOfThisFilePart=" + + new String(leftOver, charset)); + } + return null; + } + } + + private static final String EMPTY_STRING = ""; + + private static final int DEFAULT_BLOCK_SIZE = FileSystem.getCurrent().getBlockSize(); + + /** + * Constructs a new {@link Builder}. + * + * @return a new {@link Builder}. + * @since 2.12.0 + */ + public static Builder builder() { + return new Builder(); + } + + private final int blockSize; + private final Charset charset; + private final SeekableByteChannel channel; + private final long totalByteLength; + private final long totalBlockCount; + private final byte[][] newLineSequences; + private final int avoidNewlineSplitBufferSize; + private final int byteDecrement; + private FilePart currentFilePart; + private boolean trailingNewlineOfFileSkipped; + + /** + * Constructs a ReversedLinesFileReader with default block size of 4KB and the + * platform's default encoding. + * + * @param file the file to be read + * @throws IOException if an I/O error occurs. + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final File file) throws IOException { + this(file, DEFAULT_BLOCK_SIZE, Charset.defaultCharset()); + } + + /** + * Constructs a ReversedLinesFileReader with default block size of 4KB and the + * specified encoding. + * + * @param file the file to be read + * @param charset the charset to use, null uses the default Charset. + * @throws IOException if an I/O error occurs. + * @since 2.5 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final File file, final Charset charset) throws IOException { + this(file.toPath(), charset); + } + + /** + * Constructs a ReversedLinesFileReader with the given block size and encoding. + * + * @param file the file to be read + * @param blockSize size of the internal buffer (for ideal performance this + * should match with the block size of the underlying file + * system). + * @param charset the encoding of the file, null uses the default Charset. + * @throws IOException if an I/O error occurs. + * @since 2.3 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final File file, final int blockSize, final Charset charset) throws IOException { + this(file.toPath(), blockSize, charset); + } + + /** + * Constructs a ReversedLinesFileReader with the given block size and encoding. + * + * @param file the file to be read + * @param blockSize size of the internal buffer (for ideal performance this + * should match with the block size of the underlying file + * system). + * @param charsetName the encoding of the file, null uses the default Charset. + * @throws IOException if an I/O error occurs + * @throws java.nio.charset.UnsupportedCharsetException if the encoding is not supported + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final File file, final int blockSize, final String charsetName) throws IOException { + this(file.toPath(), blockSize, charsetName); + } + + /** + * Constructs a ReversedLinesFileReader with default block size of 4KB and the + * specified encoding. + * + * @param file the file to be read + * @param charset the charset to use, null uses the default Charset. + * @throws IOException if an I/O error occurs. + * @since 2.7 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final Path file, final Charset charset) throws IOException { + this(file, DEFAULT_BLOCK_SIZE, charset); + } + + /** + * Constructs a ReversedLinesFileReader with the given block size and encoding. + * + * @param file the file to be read + * @param blockSize size of the internal buffer (for ideal performance this + * should match with the block size of the underlying file + * system). + * @param charset the encoding of the file, null uses the default Charset. + * @throws IOException if an I/O error occurs. + * @since 2.7 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final Path file, final int blockSize, final Charset charset) throws IOException { + this.blockSize = blockSize; + this.charset = Charsets.toCharset(charset); + + // --- check & prepare encoding --- + final CharsetEncoder charsetEncoder = this.charset.newEncoder(); + final float maxBytesPerChar = charsetEncoder.maxBytesPerChar(); + if (maxBytesPerChar == 1f || this.charset == StandardCharsets.UTF_8) { + // all one byte encodings are no problem + byteDecrement = 1; + } else if (this.charset == Charset.forName("Shift_JIS") + || // Same as for UTF-8 + // http://www.herongyang.com/Unicode/JIS-Shift-JIS-Encoding.html + this.charset == Charset.forName("windows-31j") + || // Windows code page 932 (Japanese) + this.charset == Charset.forName("x-windows-949") + || // Windows code page 949 (Korean) + this.charset == Charset.forName("gbk") + || // Windows code page 936 (Simplified Chinese) + this.charset == Charset.forName("x-windows-950")) { // Windows code page 950 (Traditional Chinese) + byteDecrement = 1; + } else if (this.charset == StandardCharsets.UTF_16BE || this.charset == StandardCharsets.UTF_16LE) { + // UTF-16 new line sequences are not allowed as second tuple of four byte + // sequences, + // however byte order has to be specified + byteDecrement = 2; + } else if (this.charset == StandardCharsets.UTF_16) { + throw new UnsupportedEncodingException( + "For UTF-16, you need to specify the byte order (use UTF-16BE or " + "UTF-16LE)"); + } else { + throw new UnsupportedEncodingException( + "Encoding " + charset + " is not supported yet (feel free to " + "submit a patch)"); + } + + // NOTE: The new line sequences are matched in the order given, so it is + // important that \r\n is BEFORE \n + this.newLineSequences = new byte[][] { + StandardLineSeparator.CRLF.getBytes(this.charset), + StandardLineSeparator.LF.getBytes(this.charset), + StandardLineSeparator.CR.getBytes(this.charset) + }; + + this.avoidNewlineSplitBufferSize = newLineSequences[0].length; + + // Open file + this.channel = Files.newByteChannel(file, StandardOpenOption.READ); + this.totalByteLength = channel.size(); + int lastBlockLength = (int) (this.totalByteLength % blockSize); + if (lastBlockLength > 0) { + this.totalBlockCount = this.totalByteLength / blockSize + 1; + } else { + this.totalBlockCount = this.totalByteLength / blockSize; + if (this.totalByteLength > 0) { + lastBlockLength = blockSize; + } + } + this.currentFilePart = new FilePart(totalBlockCount, lastBlockLength, null); + } + + /** + * Constructs a ReversedLinesFileReader with the given block size and encoding. + * + * @param file the file to be read + * @param blockSize size of the internal buffer (for ideal performance this + * should match with the block size of the underlying file + * system). + * @param charsetName the encoding of the file, null uses the default Charset. + * @throws IOException if an I/O error occurs + * @throws java.nio.charset.UnsupportedCharsetException if the encoding is not supported + * @since 2.7 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final Path file, final int blockSize, final String charsetName) throws IOException { + this(file, blockSize, Charsets.toCharset(charsetName)); + } + + /** + * Closes underlying resources. + * + * @throws IOException if an I/O error occurs. + */ + @Override + public void close() throws IOException { + channel.close(); + } + + /** + * Returns the lines of the file from bottom to top. + * + * @return the next line or null if the start of the file is reached + * @throws IOException if an I/O error occurs. + */ + public String readLine() throws IOException { + + String line = currentFilePart.readLine(); + while (line == null) { + currentFilePart = currentFilePart.rollOver(); + if (currentFilePart == null) { + // no more FileParts: we're done, leave line set to null + break; + } + line = currentFilePart.readLine(); + } + + // aligned behavior with BufferedReader that doesn't return a last, empty line + if (EMPTY_STRING.equals(line) && !trailingNewlineOfFileSkipped) { + trailingNewlineOfFileSkipped = true; + line = readLine(); + } + + return line; + } + + /** + * Returns {@code lineCount} lines of the file from bottom to top. + *

+ * If there are less than {@code lineCount} lines in the file, then that's what + * you get. + *

+ *

+ * Note: You can easily flip the result with {@link Collections#reverse(List)}. + *

+ * + * @param lineCount How many lines to read. + * @return A new list + * @throws IOException if an I/O error occurs. + * @since 2.8.0 + */ + public List readLines(final int lineCount) throws IOException { + if (lineCount < 0) { + throw new IllegalArgumentException("lineCount < 0"); + } + final ArrayList arrayList = new ArrayList<>(lineCount); + for (int i = 0; i < lineCount; i++) { + final String line = readLine(); + if (line == null) { + return arrayList; + } + arrayList.add(line); + } + return arrayList; + } + + /** + * Returns the last {@code lineCount} lines of the file. + *

+ * If there are less than {@code lineCount} lines in the file, then that's what + * you get. + *

+ * + * @param lineCount How many lines to read. + * @return A String. + * @throws IOException if an I/O error occurs. + * @since 2.8.0 + */ + public String toString(final int lineCount) throws IOException { + final List lines = readLines(lineCount); + Collections.reverse(lines); + return lines.isEmpty() ? EMPTY_STRING : String.join(System.lineSeparator(), lines) + System.lineSeparator(); + } +} From 0d2527e51861969230463b7fe15becdfe9dc3710 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Mon, 31 Mar 2025 09:37:42 +0200 Subject: [PATCH 11/13] ReversedLinesFileReader truncate consumed lines --- .../internal/ReversedLinesFileReader.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java b/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java index 8ff1b35a..09a7a773 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java +++ b/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java @@ -21,11 +21,11 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.nio.channels.SeekableByteChannel; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; @@ -278,7 +278,8 @@ public static Builder builder() { private final int blockSize; private final Charset charset; - private final SeekableByteChannel channel; + private final FileChannel channel; + private final FileLock fileLock; private final long totalByteLength; private final long totalBlockCount; private final byte[][] newLineSequences; @@ -422,7 +423,8 @@ public ReversedLinesFileReader(final Path file, final int blockSize, final Chars this.avoidNewlineSplitBufferSize = newLineSequences[0].length; // Open file - this.channel = Files.newByteChannel(file, StandardOpenOption.READ); + this.channel = FileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE); + this.fileLock = channel.lock(); this.totalByteLength = channel.size(); int lastBlockLength = (int) (this.totalByteLength % blockSize); if (lastBlockLength > 0) { @@ -461,6 +463,7 @@ public ReversedLinesFileReader(final Path file, final int blockSize, final Strin */ @Override public void close() throws IOException { + fileLock.release(); channel.close(); } @@ -514,10 +517,18 @@ public List readLines(final int lineCount) throws IOException { for (int i = 0; i < lineCount; i++) { final String line = readLine(); if (line == null) { + channel.truncate(0); return arrayList; } arrayList.add(line); } + + long truncateTo = (this.currentFilePart.no - 1) * blockSize; + truncateTo += this.currentFilePart.currentLastBytePos + 1; + channel.truncate(truncateTo); + + channel.force(true); + return arrayList; } From 389875f6cd8525d9edf7959df67c83b933f44817 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Mon, 31 Mar 2025 09:37:48 +0200 Subject: [PATCH 12/13] truncate lines on resend --- .../analytics/internal/FallbackAppender.java | 77 +++++++++---------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java index 2ee0ee10..1f85e392 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java +++ b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java @@ -11,10 +11,10 @@ import java.io.OutputStream; import java.nio.channels.Channels; import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.charset.StandardCharsets; import java.nio.file.StandardOpenOption; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; @@ -25,6 +25,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import org.apache.commons.io.FileSystem; public class FallbackAppender { @@ -86,25 +87,18 @@ public void run() { } List msgs; - lock.lock(); try { - msgs = read(); + msgs = truncate(20); // XXX messageSize if (msgs.isEmpty()) { continue; } - // FIXME now its reading all the msgs and waits until all is processed - // it will be better to work with batch and truncate the file - file.delete(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); lastMessage = System.currentTimeMillis(); continue; - } finally { - lock.unlock(); } - // FIXME batch while (!msgs.isEmpty()) { boolean canEnqueue = true; for (int i = msgs.size() - 1; canEnqueue && i >= 0; i--) { @@ -113,16 +107,16 @@ public void run() { if (canEnqueue) { msgs.remove(i); System.err.println("reenqueued " + msg.messageId()); + } else { + // slow down next iteration when http queue overflow + try { + Thread.sleep(1_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } - try { - Thread.sleep(1_000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } } - - lastMessage = System.currentTimeMillis(); } try { @@ -161,38 +155,43 @@ public void run() { } } - List read() throws IOException { - if (file.exists()) { - try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) { - fileChannel.lock(0, Long.MAX_VALUE, true); - - final String[] lines = new String( - Channels.newInputStream(fileChannel).readAllBytes(), StandardCharsets.UTF_8) - .split(System.lineSeparator()); - return Arrays.stream(lines) - .map(m -> fromJson(m)) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - } - } else { + List truncate(int numMessages) throws IOException { + lock.lock(); + + if (!file.exists()) { + lock.unlock(); return Collections.emptyList(); } + + try (ReversedLinesFileReader reader = ReversedLinesFileReader.builder() + .setPath(file.toPath()) + .setBufferSize(FileSystem.getCurrent().getBlockSize()) + .setCharset(StandardCharsets.UTF_8) + .get()) { + + return reader.readLines(numMessages).stream() + .map(this::fromJson) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } finally { + lock.unlock(); + } } + private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8); + private void write(List batch) { lock.lock(); try (FileChannel fileChannel = FileChannel.open( - file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE)) { - fileChannel.lock(); + file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + OutputStream os = Channels.newOutputStream(fileChannel); + FileLock fileLock = fileChannel.lock(); ) { - final String lines = batch.stream() - .map(this::toJson) - .filter(Objects::nonNull) - .collect(Collectors.joining(System.lineSeparator())); + for (Message msg : batch) { + os.write(toJson(msg).getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } - OutputStream os = Channels.newOutputStream(fileChannel); - os.write(lines.getBytes(StandardCharsets.UTF_8)); - os.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8)); fileChannel.force(true); batch.clear(); From ac0bc1fed9492f0c16ca3aa75842e3d8ef42fa76 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Mon, 31 Mar 2025 16:54:36 +0200 Subject: [PATCH 13/13] stop retry on rate limit --- .../com/segment/analytics/internal/AnalyticsClient.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index fc31cc0d..981a948f 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -138,8 +138,10 @@ public AnalyticsClient( .withJitter(.2) // retry on IOException .handle(IOException.class) - // retry on 5xx or rate limit - .handleResultIf(response -> is5xx(response.code()) || response.code() == 429) + // retry on 5xx + .handleResultIf(response -> is5xx(response.code())) + // stop retry on rate limit + .abortIf(response -> response.code() == 429) .build(); this.failsafe = Failsafe.with(retry, breaker).with(networkExecutor);