Skip to content

Commit 13e26fb

Browse files
Sanil15Boris S
authored andcommitted
SAMZA-1759: Stream Assert utilities for low level and high level api for TestFramework
Adding utilities and corresponding test for low and high level api Author: Sanil Jain <[email protected]> Reviewers: Shanthoosh Venkataraman <[email protected]> Closes apache#568 from Sanil15/SAMZA-1759 and squashes the following commits: a4861089 [Sanil Jain] Reverting back travis increase for wait time 876a3a58 [Sanil Jain] Increase travis timeout 9e6482b1 [Sanil Jain] Fixing travis build, removing unused imports 526244e8 [Sanil Jain] Merge branch 'master' into SAMZA-1759 9f489acf [Sanil Jain] Moving tests that use MessageStreamAssert to same package name in test folder to use package private a93e5a14 [Sanil Jain] Marking collection transient to ensure newer api changes work 5e6d3ed1 [Sanil Jain] Making MessageStreamAssert package private a5a521cc [Sanil Jain] Splitting operator assertions outside StreamAssert to MessageStreamAssert, addressing review, renaming utils d1e64180 [Sanil Jain] Cleaning unused imports ff218ff7 [Sanil Jain] Removing contains method for operator level assertios for high level api c5768772 [Sanil Jain] Merge branch 'SAMZA-1759' of https://github.com/Sanil15/samza into SAMZA-1759 c69d1bbb [Sanil Jain] StreamAssert Utilities for Low level and High Level Api, Adding More Test for Low Level api for testing multiple partitions and in mulithreaded mode e3c8e2a5 [Sanil Jain] StreamAssert Utilities for Low level and High Level Api, Adding More Test for Low Level api for testing multiple partitions and in mulithreaded mode
1 parent 5587ebf commit 13e26fb

16 files changed

+449
-168
lines changed

.travis.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
# The ASF licenses this file to You under the Apache License, Version 2.0
66
# (the "License"); you may not use this file except in compliance with
77
# the License. You may obtain a copy of the License at
8-
#
8+
#
99
# http://www.apache.org/licenses/LICENSE-2.0
10-
#
10+
#
1111
# Unless required by applicable law or agreed to in writing, software
1212
# distributed under the License is distributed on an "AS IS" BASIS,
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16-
#
16+
#
1717

1818
language: java
1919

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.test.framework;
21+
22+
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.collect.Iterables;
24+
import java.io.IOException;
25+
import java.io.ObjectInputStream;
26+
import org.apache.samza.config.Config;
27+
import org.apache.samza.operators.MessageStream;
28+
import org.apache.samza.operators.functions.SinkFunction;
29+
import org.apache.samza.serializers.KVSerde;
30+
import org.apache.samza.serializers.Serde;
31+
import org.apache.samza.serializers.StringSerde;
32+
import org.apache.samza.system.SystemStreamPartition;
33+
import org.apache.samza.task.MessageCollector;
34+
import org.apache.samza.task.TaskContext;
35+
import org.apache.samza.task.TaskCoordinator;
36+
import org.hamcrest.Matchers;
37+
38+
import java.util.ArrayList;
39+
import java.util.Collection;
40+
import java.util.Collections;
41+
import java.util.HashSet;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.Set;
45+
import java.util.Timer;
46+
import java.util.TimerTask;
47+
import java.util.concurrent.ConcurrentHashMap;
48+
import java.util.concurrent.CountDownLatch;
49+
50+
import static org.junit.Assert.assertThat;
51+
52+
/**
53+
* An assertion on the content of a {@link MessageStream}.
54+
*
55+
* <pre>Example: {@code
56+
* MessageStream<String> stream = streamGraph.getInputStream("input", serde).map(some_function)...;
57+
* ...
58+
* MessageStreamAssert.that(id, stream, stringSerde).containsInAnyOrder(Arrays.asList("a", "b", "c"));
59+
* }</pre>
60+
*
61+
*/
62+
@VisibleForTesting
63+
class MessageStreamAssert<M> {
64+
private final static Map<String, CountDownLatch> LATCHES = new ConcurrentHashMap<>();
65+
private final static CountDownLatch PLACE_HOLDER = new CountDownLatch(0);
66+
67+
private final String id;
68+
private final MessageStream<M> messageStream;
69+
private final Serde<M> serde;
70+
private boolean checkEachTask = false;
71+
72+
/**
73+
* Constructors a MessageStreamAssert with an id and serde
74+
* @param id unique id
75+
* @param messageStream represents messageStream that you want to assert on
76+
* @param serde serde used to desialize messageStream
77+
* @param <M> represents type of Message
78+
* @return MessageStreamAssert that returns the the messages in the stream
79+
*/
80+
public static <M> MessageStreamAssert<M> that(String id, MessageStream<M> messageStream, Serde<M> serde) {
81+
return new MessageStreamAssert<>(id, messageStream, serde);
82+
}
83+
84+
private MessageStreamAssert(String id, MessageStream<M> messageStream, Serde<M> serde) {
85+
this.id = id;
86+
this.messageStream = messageStream;
87+
this.serde = serde;
88+
}
89+
90+
public MessageStreamAssert forEachTask() {
91+
checkEachTask = true;
92+
return this;
93+
}
94+
95+
public void containsInAnyOrder(final Collection<M> expected) {
96+
LATCHES.putIfAbsent(id, PLACE_HOLDER);
97+
final MessageStream<M> streamToCheck = checkEachTask
98+
? messageStream
99+
: messageStream
100+
.partitionBy(m -> null, m -> m, KVSerde.of(new StringSerde(), serde), null)
101+
.map(kv -> kv.value);
102+
103+
streamToCheck.sink(new CheckAgainstExpected<M>(id, expected, checkEachTask));
104+
}
105+
106+
public static void waitForComplete() {
107+
try {
108+
while (!LATCHES.isEmpty()) {
109+
final Set<String> ids = new HashSet<>(LATCHES.keySet());
110+
for (String id : ids) {
111+
while (LATCHES.get(id) == PLACE_HOLDER) {
112+
Thread.sleep(100);
113+
}
114+
115+
final CountDownLatch latch = LATCHES.get(id);
116+
if (latch != null) {
117+
latch.await();
118+
LATCHES.remove(id);
119+
}
120+
}
121+
}
122+
} catch (Exception e) {
123+
throw new RuntimeException(e);
124+
}
125+
}
126+
127+
private static final class CheckAgainstExpected<M> implements SinkFunction<M> {
128+
private static final long TIMEOUT = 5000L;
129+
130+
private final String id;
131+
private final boolean checkEachTask;
132+
private final transient Collection<M> expected;
133+
134+
135+
private transient Timer timer = new Timer();
136+
private transient List<M> actual = Collections.synchronizedList(new ArrayList<>());
137+
private transient TimerTask timerTask = new TimerTask() {
138+
@Override
139+
public void run() {
140+
check();
141+
}
142+
};
143+
144+
CheckAgainstExpected(String id, Collection<M> expected, boolean checkEachTask) {
145+
this.id = id;
146+
this.expected = expected;
147+
this.checkEachTask = checkEachTask;
148+
}
149+
150+
@Override
151+
public void init(Config config, TaskContext context) {
152+
final SystemStreamPartition ssp = Iterables.getFirst(context.getSystemStreamPartitions(), null);
153+
if (ssp != null || ssp.getPartition().getPartitionId() == 0) {
154+
final int count = checkEachTask ? context.getSamzaContainerContext().taskNames.size() : 1;
155+
LATCHES.put(id, new CountDownLatch(count));
156+
timer.schedule(timerTask, TIMEOUT);
157+
}
158+
}
159+
160+
@Override
161+
public void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
162+
actual.add(message);
163+
164+
if (actual.size() >= expected.size()) {
165+
timerTask.cancel();
166+
check();
167+
}
168+
}
169+
170+
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
171+
in.defaultReadObject();
172+
timer = new Timer();
173+
actual = Collections.synchronizedList(new ArrayList<>());
174+
timerTask = new TimerTask() {
175+
@Override
176+
public void run() {
177+
check();
178+
}
179+
};
180+
}
181+
182+
private void check() {
183+
final CountDownLatch latch = LATCHES.get(id);
184+
try {
185+
assertThat(actual, Matchers.containsInAnyOrder((M[]) expected.toArray()));
186+
throw new IllegalArgumentException("asdas");
187+
} finally {
188+
latch.countDown();
189+
}
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)