3030
3131import java .util .Collections ;
3232import java .util .concurrent .*;
33+ import java .util .concurrent .atomic .AtomicInteger ;
3334
3435import static org .junit .Assert .*;
3536import static org .mockito .Mockito .*;
@@ -93,11 +94,12 @@ public void testFlushOnMaxTimeout() throws Exception {
9394 eventHandlerRule .expectConversion (EVENT_NAME , USER_ID );
9495
9596 if (!countDownLatch .await (MAX_DURATION_MS * 3 , TimeUnit .MILLISECONDS )) {
96- fail ("Exceeded timeout waiting for notification ." );
97+ fail ("Exceeded timeout waiting for events to flush ." );
9798 }
9899
99100 eventProcessor .close ();
100101 assertEquals (0 , eventQueue .size ());
102+ eventHandlerRule .expectCalls (1 );
101103 }
102104
103105 @ Test
@@ -116,17 +118,17 @@ public void testFlushMaxBatchSize() throws Exception {
116118 eventHandlerRule .expectConversion (eventName , USER_ID );
117119 }
118120
119- countDownLatch .await ();
121+ if (!countDownLatch .await (MAX_DURATION_MS * 3 , TimeUnit .MILLISECONDS )) {
122+ fail ("Exceeded timeout waiting for events to flush." );
123+ }
124+
120125 assertEquals (0 , eventQueue .size ());
126+ eventHandlerRule .expectCalls (1 );
121127 }
122128
123129 @ Test
124130 public void testFlush () throws Exception {
125- CountDownLatch countDownLatch = new CountDownLatch (2 );
126- setEventProcessor (logEvent -> {
127- eventHandlerRule .dispatchEvent (logEvent );
128- countDownLatch .countDown ();
129- });
131+ setEventProcessor (logEvent -> eventHandlerRule .dispatchEvent (logEvent ));
130132
131133 UserEvent userEvent = buildConversionEvent (EVENT_NAME );
132134 eventProcessor .process (userEvent );
@@ -137,18 +139,12 @@ public void testFlush() throws Exception {
137139 eventProcessor .flush ();
138140 eventHandlerRule .expectConversion (EVENT_NAME , USER_ID );
139141
140- if (!countDownLatch .await (MAX_DURATION_MS / 2 , TimeUnit .MILLISECONDS )) {
141- fail ("Exceeded timeout waiting for notification." );
142- }
142+ eventHandlerRule .expectCalls (2 );
143143 }
144144
145145 @ Test
146146 public void testFlushOnMismatchRevision () throws Exception {
147- CountDownLatch countDownLatch = new CountDownLatch (2 );
148- setEventProcessor (logEvent -> {
149- eventHandlerRule .dispatchEvent (logEvent );
150- countDownLatch .countDown ();
151- });
147+ setEventProcessor (logEvent -> eventHandlerRule .dispatchEvent (logEvent ));
152148
153149 ProjectConfig projectConfig1 = mock (ProjectConfig .class );
154150 when (projectConfig1 .getRevision ()).thenReturn ("1" );
@@ -165,18 +161,12 @@ public void testFlushOnMismatchRevision() throws Exception {
165161 eventHandlerRule .expectConversion (EVENT_NAME , USER_ID );
166162
167163 eventProcessor .close ();
168- if (!countDownLatch .await (MAX_DURATION_MS * 3 , TimeUnit .MILLISECONDS )) {
169- fail ("Exceeded timeout waiting for notification." );
170- }
164+ eventHandlerRule .expectCalls (2 );
171165 }
172166
173167 @ Test
174168 public void testFlushOnMismatchProjectId () throws Exception {
175- CountDownLatch countDownLatch = new CountDownLatch (2 );
176- setEventProcessor (logEvent -> {
177- eventHandlerRule .dispatchEvent (logEvent );
178- countDownLatch .countDown ();
179- });
169+ setEventProcessor (logEvent -> eventHandlerRule .dispatchEvent (logEvent ));
180170
181171 ProjectConfig projectConfig1 = mock (ProjectConfig .class );
182172 when (projectConfig1 .getRevision ()).thenReturn ("1" );
@@ -193,18 +183,12 @@ public void testFlushOnMismatchProjectId() throws Exception {
193183 eventHandlerRule .expectConversion (EVENT_NAME , USER_ID );
194184
195185 eventProcessor .close ();
196- if (!countDownLatch .await (MAX_DURATION_MS * 3 , TimeUnit .MILLISECONDS )) {
197- fail ("Exceeded timeout waiting for notification." );
198- }
186+ eventHandlerRule .expectCalls (2 );
199187 }
200188
201189 @ Test
202190 public void testStopAndStart () throws Exception {
203- CountDownLatch countDownLatch = new CountDownLatch (2 );
204- setEventProcessor (logEvent -> {
205- eventHandlerRule .dispatchEvent (logEvent );
206- countDownLatch .countDown ();
207- });
191+ setEventProcessor (logEvent -> eventHandlerRule .dispatchEvent (logEvent ));
208192
209193 UserEvent userEvent = buildConversionEvent (EVENT_NAME );
210194 eventProcessor .process (userEvent );
@@ -218,31 +202,27 @@ public void testStopAndStart() throws Exception {
218202 eventProcessor .start ();
219203
220204 eventProcessor .close ();
221- if (!countDownLatch .await (MAX_DURATION_MS * 3 , TimeUnit .MILLISECONDS )) {
222- fail ("Exceeded timeout waiting for notification." );
223- }
205+ eventHandlerRule .expectCalls (2 );
224206 }
225207
226208 @ Test
227209 public void testNotificationCenter () throws Exception {
228- CountDownLatch countDownLatch = new CountDownLatch ( 1 );
229- notificationCenter .addNotificationHandler (LogEvent .class , x -> countDownLatch . countDown ());
210+ AtomicInteger counter = new AtomicInteger ( );
211+ notificationCenter .addNotificationHandler (LogEvent .class , x -> counter . incrementAndGet ());
230212 setEventProcessor (logEvent -> {});
231213
232214 UserEvent userEvent = buildConversionEvent (EVENT_NAME );
233215 eventProcessor .process (userEvent );
234216 eventProcessor .close ();
235217
236- if (!countDownLatch .await (MAX_DURATION_MS * 3 , TimeUnit .MILLISECONDS )) {
237- fail ("Exceeded timeout waiting for notification." );
238- }
218+ assertEquals (1 , counter .intValue ());
239219 }
240220
241221 @ Test
242222 public void testCloseTimeout () throws Exception {
243223 CountDownLatch countDownLatch = new CountDownLatch (1 );
244224 setEventProcessor (logEvent -> {
245- if (!countDownLatch .await (TIMEOUT_MS * 2 , TimeUnit .SECONDS )) {
225+ if (!countDownLatch .await (TIMEOUT_MS * 2 , TimeUnit .MILLISECONDS )) {
246226 fail ("Exceeded timeout waiting for close." );
247227 }
248228 });
@@ -266,6 +246,48 @@ public void testCloseEventHandler() throws Exception {
266246 verify ((AutoCloseable ) mockEventHandler ).close ();
267247 }
268248
249+ @ Test
250+ public void testInvalidBatchSizeUsesDefault () {
251+ eventProcessor = BatchEventProcessor .builder ()
252+ .withEventQueue (eventQueue )
253+ .withBatchSize (-1 )
254+ .withFlushInterval (MAX_DURATION_MS )
255+ .withEventHandler (new NoopEventHandler ())
256+ .withNotificationCenter (notificationCenter )
257+ .withTimeout (TIMEOUT_MS , TimeUnit .MILLISECONDS )
258+ .build ();
259+
260+ assertEquals (eventProcessor .batchSize , BatchEventProcessor .DEFAULT_BATCH_SIZE );
261+ }
262+
263+ @ Test
264+ public void testInvalidFlushIntervalUsesDefault () {
265+ eventProcessor = BatchEventProcessor .builder ()
266+ .withEventQueue (eventQueue )
267+ .withBatchSize (MAX_BATCH_SIZE )
268+ .withFlushInterval (-1L )
269+ .withEventHandler (new NoopEventHandler ())
270+ .withNotificationCenter (notificationCenter )
271+ .withTimeout (TIMEOUT_MS , TimeUnit .MILLISECONDS )
272+ .build ();
273+
274+ assertEquals (eventProcessor .flushInterval , BatchEventProcessor .DEFAULT_BATCH_INTERVAL );
275+ }
276+
277+ @ Test
278+ public void testInvalidTimeoutUsesDefault () {
279+ eventProcessor = BatchEventProcessor .builder ()
280+ .withEventQueue (eventQueue )
281+ .withBatchSize (MAX_BATCH_SIZE )
282+ .withFlushInterval (MAX_DURATION_MS )
283+ .withEventHandler (new NoopEventHandler ())
284+ .withNotificationCenter (notificationCenter )
285+ .withTimeout (-1L , TimeUnit .MILLISECONDS )
286+ .build ();
287+
288+ assertEquals (eventProcessor .timeoutMillis , BatchEventProcessor .DEFAULT_TIMEOUT_INTERVAL );
289+ }
290+
269291 private void setEventProcessor (EventHandler eventHandler ) {
270292 eventProcessor = BatchEventProcessor .builder ()
271293 .withEventQueue (eventQueue )
0 commit comments