diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index 201f6e8ee..f10c134b3 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -48,6 +48,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable { public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout"; public static final int DEFAULT_QUEUE_CAPACITY = 1000; + public static final int DEFAULT_EMPTY_COUNT = 2; public static final int DEFAULT_BATCH_SIZE = 10; public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30); public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5); @@ -129,19 +130,26 @@ public class EventConsumer implements Runnable { @Override public void run() { try { + int emptyCount = 0; + while (true) { - if (System.currentTimeMillis() > deadline) { + if (System.currentTimeMillis() >= deadline) { logger.debug("Deadline exceeded flushing current batch."); flush(); + deadline = System.currentTimeMillis() + flushInterval; } - Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS); + long timeout = deadline - System.currentTimeMillis(); + Object item = emptyCount > DEFAULT_EMPTY_COUNT ? eventQueue.take() : eventQueue.poll(timeout, TimeUnit.MILLISECONDS); + if (item == null) { - logger.debug("Empty item, sleeping for 50ms."); - Thread.sleep(50); + logger.debug("Empty item after waiting flush interval."); + emptyCount++; continue; } + emptyCount = 0; + if (item == SHUTDOWN_SIGNAL) { logger.info("Received shutdown signal."); break; diff --git a/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java b/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java index 8f8e94a8a..5f42e9a3f 100644 --- a/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java +++ b/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java @@ -84,43 +84,45 @@ public void testDrainOnClose() throws Exception { } @Test - public void testFlushOnMaxTimeout() throws Exception { + public void testFlushMaxBatchSize() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); setEventProcessor(logEvent -> { + assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size()); eventHandlerRule.dispatchEvent(logEvent); countDownLatch.countDown(); }); - UserEvent userEvent = buildConversionEvent(EVENT_NAME); - eventProcessor.process(userEvent); - eventHandlerRule.expectConversion(EVENT_NAME, USER_ID); + for (int i = 0; i < MAX_BATCH_SIZE; i++) { + String eventName = EVENT_NAME + i; + UserEvent userEvent = buildConversionEvent(eventName); + eventProcessor.process(userEvent); + eventHandlerRule.expectConversion(eventName, USER_ID); + } if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { fail("Exceeded timeout waiting for events to flush."); } - eventProcessor.close(); assertEquals(0, eventQueue.size()); eventHandlerRule.expectCalls(1); } @Test - public void testFlushMaxBatchSize() throws Exception { + public void testFlushOnMaxTimeout() throws Exception { + UserEvent userEvent = buildConversionEvent(EVENT_NAME); + CountDownLatch countDownLatch = new CountDownLatch(1); setEventProcessor(logEvent -> { - assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size()); eventHandlerRule.dispatchEvent(logEvent); countDownLatch.countDown(); }); - for (int i = 0; i < MAX_BATCH_SIZE; i++) { - String eventName = EVENT_NAME + i; - UserEvent userEvent = buildConversionEvent(eventName); - eventProcessor.process(userEvent); - eventHandlerRule.expectConversion(eventName, USER_ID); - } + eventProcessor.process(userEvent); + eventHandlerRule.expectConversion(EVENT_NAME, USER_ID); - if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { + eventProcessor.close(); + + if (!countDownLatch.await( TIMEOUT_MS * 3, TimeUnit.MILLISECONDS)) { fail("Exceeded timeout waiting for events to flush."); }