From 625df20a28ad9be577a3e31e72b0471a3bec5844 Mon Sep 17 00:00:00 2001 From: Mike Davis Date: Mon, 12 Aug 2019 09:44:13 -0700 Subject: [PATCH] Prevent negative batch config values. --- .../ab/event/BatchEventProcessor.java | 76 +++++++++---- .../com/optimizely/ab/EventHandlerRule.java | 14 +++ .../ab/event/BatchEventProcessorTest.java | 102 +++++++++++------- 3 files changed, 132 insertions(+), 60 deletions(-) diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index a322e8e6b..c99d3bb52 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -58,9 +58,9 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable { private final BlockingQueue eventQueue; private final EventHandler eventHandler; - private final int batchSize; - private final long flushInterval; - private final long timeoutMillis; + final int batchSize; + final long flushInterval; + final long timeoutMillis; private final ExecutorService executor; private final NotificationCenter notificationCenter; @@ -70,21 +70,11 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable { private BatchEventProcessor(BlockingQueue eventQueue, EventHandler eventHandler, Integer batchSize, Long flushInterval, Long timeoutMillis, ExecutorService executor, NotificationCenter notificationCenter) { this.eventHandler = eventHandler; this.eventQueue = eventQueue; - this.batchSize = batchSize == null ? PropertyUtils.getInteger(CONFIG_BATCH_SIZE, DEFAULT_BATCH_SIZE) : batchSize; - this.flushInterval = flushInterval == null ? PropertyUtils.getLong(CONFIG_BATCH_INTERVAL, DEFAULT_BATCH_INTERVAL) : flushInterval; - this.timeoutMillis = timeoutMillis == null ? PropertyUtils.getLong(CONFIG_CLOSE_TIMEOUT, DEFAULT_TIMEOUT_INTERVAL) : timeoutMillis; + this.batchSize = batchSize; + this.flushInterval = flushInterval; + this.timeoutMillis = timeoutMillis; this.notificationCenter = notificationCenter; - - if (executor == null) { - final ThreadFactory threadFactory = Executors.defaultThreadFactory(); - this.executor = Executors.newSingleThreadExecutor(runnable -> { - Thread thread = threadFactory.newThread(runnable); - thread.setDaemon(true); - return thread; - }); - } else { - this.executor = executor; - } + this.executor = executor; } public synchronized void start() { @@ -240,42 +230,64 @@ public static Builder builder() { public static class Builder { private BlockingQueue eventQueue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); private EventHandler eventHandler = null; - private Integer batchSize = null; - private Long flushInterval = null; + private Integer batchSize = PropertyUtils.getInteger(CONFIG_BATCH_SIZE, DEFAULT_BATCH_SIZE); + private Long flushInterval = PropertyUtils.getLong(CONFIG_BATCH_INTERVAL, DEFAULT_BATCH_INTERVAL); + private Long timeoutMillis = PropertyUtils.getLong(CONFIG_CLOSE_TIMEOUT, DEFAULT_TIMEOUT_INTERVAL); private ExecutorService executor = null; private NotificationCenter notificationCenter = null; - private Long timeoutMillis = null; + /** + * {@link EventHandler} implementation used to dispatch events to Optimizely. + */ public Builder withEventHandler(EventHandler eventHandler) { this.eventHandler = eventHandler; return this; } + /** + * EventQueue is the underlying BlockingQueue used to buffer events before being added to the batch payload. + */ public Builder withEventQueue(BlockingQueue eventQueue) { this.eventQueue = eventQueue; return this; } + /** + * BatchSize is the maximum number of events contained within a single event batch. + */ public Builder withBatchSize(Integer batchSize) { this.batchSize = batchSize; return this; } + /** + * FlushInterval is the maximum duration, in milliseconds, that an event will remain in flight before + * being flushed to the event dispatcher. + */ public Builder withFlushInterval(Long flushInterval) { this.flushInterval = flushInterval; return this; } + /** + * ExecutorService used to execute the {@link EventConsumer} thread. + */ public Builder withExecutor(ExecutorService executor) { this.executor = executor; return this; } + /** + * Timeout is the maximum time to wait for the EventProcessor to close. + */ public Builder withTimeout(long duration, TimeUnit timeUnit) { this.timeoutMillis = timeUnit.toMillis(duration); return this; } + /** + * NotificationCenter used to notify when event batches are flushed. + */ public Builder withNotificationCenter(NotificationCenter notificationCenter) { this.notificationCenter = notificationCenter; return this; @@ -286,6 +298,30 @@ public BatchEventProcessor build() { } public BatchEventProcessor build(boolean shouldStart) { + if (batchSize < 0) { + logger.warn("Invalid batchSize of {}, Defaulting to {}", batchSize, DEFAULT_BATCH_SIZE); + batchSize = DEFAULT_BATCH_SIZE; + } + + if (flushInterval < 0) { + logger.warn("Invalid flushInterval of {}, Defaulting to {}", flushInterval, DEFAULT_BATCH_INTERVAL); + flushInterval = DEFAULT_BATCH_INTERVAL; + } + + if (timeoutMillis < 0) { + logger.warn("Invalid timeoutMillis of {}, Defaulting to {}", timeoutMillis, DEFAULT_TIMEOUT_INTERVAL); + timeoutMillis = DEFAULT_TIMEOUT_INTERVAL; + } + + if (executor == null) { + final ThreadFactory threadFactory = Executors.defaultThreadFactory(); + executor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = threadFactory.newThread(runnable); + thread.setDaemon(true); + return thread; + }); + } + BatchEventProcessor batchEventProcessor = new BatchEventProcessor(eventQueue, eventHandler, batchSize, flushInterval, timeoutMillis, executor, notificationCenter); if (shouldStart) { diff --git a/core-api/src/test/java/com/optimizely/ab/EventHandlerRule.java b/core-api/src/test/java/com/optimizely/ab/EventHandlerRule.java index 2c0f2aaa4..4245a8f8a 100644 --- a/core-api/src/test/java/com/optimizely/ab/EventHandlerRule.java +++ b/core-api/src/test/java/com/optimizely/ab/EventHandlerRule.java @@ -51,6 +51,8 @@ public class EventHandlerRule implements EventHandler, TestRule { private List expectedEvents; private LinkedList actualEvents; + private int actualCalls; + private Integer expectedCalls; @Override public Statement apply(final Statement base, Description description) { @@ -71,12 +73,19 @@ public void evaluate() throws Throwable { private void before() { expectedEvents = new LinkedList<>(); actualEvents = new LinkedList<>(); + + expectedCalls = null; + actualCalls = 0; } private void after() { } private void verify() { + if (expectedCalls != null) { + assertEquals(expectedCalls.intValue(), actualCalls); + } + assertEquals(expectedEvents.size(), actualEvents.size()); ListIterator expectedIterator = expectedEvents.listIterator(); @@ -90,6 +99,10 @@ private void verify() { } } + public void expectCalls(int expected) { + expectedCalls = expected; + } + public void expectImpression(String experientId, String variationId, String userId) { expectImpression(experientId, variationId, userId, Collections.emptyMap()); } @@ -119,6 +132,7 @@ public void expect(String experientId, String variationId, String eventName, Str @Override public void dispatchEvent(LogEvent logEvent) { logger.info("Receiving event: {}", logEvent); + actualCalls++; List visitors = logEvent.getEventBatch().getVisitors(); diff --git a/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java b/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java index b91fc2174..b9d45fc88 100644 --- a/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java +++ b/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -93,11 +94,12 @@ public void testFlushOnMaxTimeout() throws Exception { eventHandlerRule.expectConversion(EVENT_NAME, USER_ID); if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { - fail("Exceeded timeout waiting for notification."); + fail("Exceeded timeout waiting for events to flush."); } eventProcessor.close(); assertEquals(0, eventQueue.size()); + eventHandlerRule.expectCalls(1); } @Test @@ -116,17 +118,17 @@ public void testFlushMaxBatchSize() throws Exception { eventHandlerRule.expectConversion(eventName, USER_ID); } - countDownLatch.await(); + if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { + fail("Exceeded timeout waiting for events to flush."); + } + assertEquals(0, eventQueue.size()); + eventHandlerRule.expectCalls(1); } @Test public void testFlush() throws Exception { - CountDownLatch countDownLatch = new CountDownLatch(2); - setEventProcessor(logEvent -> { - eventHandlerRule.dispatchEvent(logEvent); - countDownLatch.countDown(); - }); + setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent)); UserEvent userEvent = buildConversionEvent(EVENT_NAME); eventProcessor.process(userEvent); @@ -137,18 +139,12 @@ public void testFlush() throws Exception { eventProcessor.flush(); eventHandlerRule.expectConversion(EVENT_NAME, USER_ID); - if (!countDownLatch.await(MAX_DURATION_MS / 2, TimeUnit.MILLISECONDS)) { - fail("Exceeded timeout waiting for notification."); - } + eventHandlerRule.expectCalls(2); } @Test public void testFlushOnMismatchRevision() throws Exception { - CountDownLatch countDownLatch = new CountDownLatch(2); - setEventProcessor(logEvent -> { - eventHandlerRule.dispatchEvent(logEvent); - countDownLatch.countDown(); - }); + setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent)); ProjectConfig projectConfig1 = mock(ProjectConfig.class); when(projectConfig1.getRevision()).thenReturn("1"); @@ -165,18 +161,12 @@ public void testFlushOnMismatchRevision() throws Exception { eventHandlerRule.expectConversion(EVENT_NAME, USER_ID); eventProcessor.close(); - if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { - fail("Exceeded timeout waiting for notification."); - } + eventHandlerRule.expectCalls(2); } @Test public void testFlushOnMismatchProjectId() throws Exception { - CountDownLatch countDownLatch = new CountDownLatch(2); - setEventProcessor(logEvent -> { - eventHandlerRule.dispatchEvent(logEvent); - countDownLatch.countDown(); - }); + setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent)); ProjectConfig projectConfig1 = mock(ProjectConfig.class); when(projectConfig1.getRevision()).thenReturn("1"); @@ -193,18 +183,12 @@ public void testFlushOnMismatchProjectId() throws Exception { eventHandlerRule.expectConversion(EVENT_NAME, USER_ID); eventProcessor.close(); - if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { - fail("Exceeded timeout waiting for notification."); - } + eventHandlerRule.expectCalls(2); } @Test public void testStopAndStart() throws Exception { - CountDownLatch countDownLatch = new CountDownLatch(2); - setEventProcessor(logEvent -> { - eventHandlerRule.dispatchEvent(logEvent); - countDownLatch.countDown(); - }); + setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent)); UserEvent userEvent = buildConversionEvent(EVENT_NAME); eventProcessor.process(userEvent); @@ -218,31 +202,27 @@ public void testStopAndStart() throws Exception { eventProcessor.start(); eventProcessor.close(); - if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { - fail("Exceeded timeout waiting for notification."); - } + eventHandlerRule.expectCalls(2); } @Test public void testNotificationCenter() throws Exception { - CountDownLatch countDownLatch = new CountDownLatch(1); - notificationCenter.addNotificationHandler(LogEvent.class, x -> countDownLatch.countDown()); + AtomicInteger counter = new AtomicInteger(); + notificationCenter.addNotificationHandler(LogEvent.class, x -> counter.incrementAndGet()); setEventProcessor(logEvent -> {}); UserEvent userEvent = buildConversionEvent(EVENT_NAME); eventProcessor.process(userEvent); eventProcessor.close(); - if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { - fail("Exceeded timeout waiting for notification."); - } + assertEquals(1, counter.intValue()); } @Test public void testCloseTimeout() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); setEventProcessor(logEvent -> { - if (!countDownLatch.await(TIMEOUT_MS * 2, TimeUnit.SECONDS)) { + if (!countDownLatch.await(TIMEOUT_MS * 2, TimeUnit.MILLISECONDS)) { fail("Exceeded timeout waiting for close."); } }); @@ -266,6 +246,48 @@ public void testCloseEventHandler() throws Exception { verify((AutoCloseable) mockEventHandler).close(); } + @Test + public void testInvalidBatchSizeUsesDefault() { + eventProcessor = BatchEventProcessor.builder() + .withEventQueue(eventQueue) + .withBatchSize(-1) + .withFlushInterval(MAX_DURATION_MS) + .withEventHandler(new NoopEventHandler()) + .withNotificationCenter(notificationCenter) + .withTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS) + .build(); + + assertEquals(eventProcessor.batchSize, BatchEventProcessor.DEFAULT_BATCH_SIZE); + } + + @Test + public void testInvalidFlushIntervalUsesDefault() { + eventProcessor = BatchEventProcessor.builder() + .withEventQueue(eventQueue) + .withBatchSize(MAX_BATCH_SIZE) + .withFlushInterval(-1L) + .withEventHandler(new NoopEventHandler()) + .withNotificationCenter(notificationCenter) + .withTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS) + .build(); + + assertEquals(eventProcessor.flushInterval, BatchEventProcessor.DEFAULT_BATCH_INTERVAL); + } + + @Test + public void testInvalidTimeoutUsesDefault() { + eventProcessor = BatchEventProcessor.builder() + .withEventQueue(eventQueue) + .withBatchSize(MAX_BATCH_SIZE) + .withFlushInterval(MAX_DURATION_MS) + .withEventHandler(new NoopEventHandler()) + .withNotificationCenter(notificationCenter) + .withTimeout(-1L, TimeUnit.MILLISECONDS) + .build(); + + assertEquals(eventProcessor.timeoutMillis, BatchEventProcessor.DEFAULT_TIMEOUT_INTERVAL); + } + private void setEventProcessor(EventHandler eventHandler) { eventProcessor = BatchEventProcessor.builder() .withEventQueue(eventQueue)