Skip to content

Commit 5156239

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents de708f5 + 12b6d30 commit 5156239

File tree

125 files changed

+4609
-2850
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

125 files changed

+4609
-2850
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ jdk:
3030
- oraclejdk8
3131

3232
script:
33-
## travis_wait increases build idle-wait time from 10 minutes to 20 minutes.
34-
- travis_wait 20 ./gradlew clean build
33+
## travis_wait increases build idle-wait time from 10 minutes to 30 minutes.
34+
- travis_wait 30 ./gradlew clean build
3535
- type sonar-scanner &>/dev/null; if [ $? -eq 0 ]; then sonar-scanner; else echo "Not running sonar"; fi
3636

3737
before_cache:
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.application;
20+
21+
import java.util.Map;
22+
import org.apache.samza.annotation.InterfaceStability;
23+
import org.apache.samza.config.Config;
24+
import org.apache.samza.metrics.MetricsReporterFactory;
25+
import org.apache.samza.operators.ContextManager;
26+
import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
27+
28+
29+
/**
30+
* The interface class to describe the configuration, input and output streams, and processing logic in a {@link SamzaApplication}.
31+
* <p>
32+
* Sub-classes {@link StreamApplicationDescriptor} and {@link TaskApplicationDescriptor} are specific interfaces for applications
33+
* written in high-level {@link StreamApplication} and low-level {@link TaskApplication} APIs, respectively.
34+
*
35+
* @param <S> sub-class of user application descriptor.
36+
*/
37+
@InterfaceStability.Evolving
38+
public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
39+
40+
/**
41+
* Get the {@link Config} of the application
42+
* @return config of the application
43+
*/
44+
Config getConfig();
45+
46+
/**
47+
* Sets the {@link ContextManager} for this application.
48+
* <p>
49+
* Setting the {@link ContextManager} is optional. The provided {@link ContextManager} can be used to build the shared
50+
* context between the operator functions within a task instance
51+
*
52+
* TODO: this should be replaced by the shared context factory when SAMZA-1714 is fixed.
53+
54+
* @param contextManager the {@link ContextManager} to use for the application
55+
* @return type {@code S} of {@link ApplicationDescriptor} with {@code contextManager} set as its {@link ContextManager}
56+
*/
57+
S withContextManager(ContextManager contextManager);
58+
59+
/**
60+
* Sets the {@link ProcessorLifecycleListenerFactory} for this application.
61+
*
62+
* <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
63+
* plug in optional code to be invoked in different stages before/after the main processing logic is started/stopped in
64+
* the application.
65+
*
66+
* @param listenerFactory the user implemented {@link ProcessorLifecycleListenerFactory} that creates lifecycle listener
67+
* with callback methods before and after the start/stop of each StreamProcessor in the application
68+
* @return type {@code S} of {@link ApplicationDescriptor} with {@code listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
69+
*/
70+
S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory);
71+
72+
/**
73+
* Sets a set of customized {@link MetricsReporterFactory}s in the application
74+
*
75+
* @param reporterFactories the map of customized {@link MetricsReporterFactory}s to be used
76+
* @return type {@code S} of {@link ApplicationDescriptor} with {@code reporterFactories}
77+
*/
78+
S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories);
79+
80+
}
Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,25 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
20-
package org.apache.samza.processor;
19+
package org.apache.samza.application;
2120

2221
import org.apache.samza.annotation.InterfaceStability;
2322

2423

2524
/**
26-
* This class listens to the life cycle events in a {@link StreamProcessor},
25+
* The base interface for all user-implemented applications in Samza.
26+
* <p>
27+
* The main processing logic of the user application should be implemented in {@link SamzaApplication#describe(ApplicationDescriptor)}
28+
* method. Sub-classes {@link StreamApplication} and {@link TaskApplication} are specific interfaces for applications
29+
* written in high-level DAG and low-level task APIs, respectively.
2730
*/
2831
@InterfaceStability.Evolving
29-
public interface StreamProcessorLifecycleListener {
30-
/**
31-
* Callback when the {@link StreamProcessor} is started
32-
* This callback is invoked only once when {@link org.apache.samza.container.SamzaContainer} starts for the first time
33-
* in the {@link StreamProcessor}. When there is a re-balance of tasks/partitions among the processors, the container
34-
* may temporarily be "paused" and re-started again. For such re-starts, this callback is NOT invoked.
35-
*/
36-
void onStart();
32+
public interface SamzaApplication<S extends ApplicationDescriptor> {
3733

3834
/**
39-
* Callback when the {@link StreamProcessor} is shut down.
35+
* Describes the user processing logic via {@link ApplicationDescriptor}
36+
*
37+
* @param appDesc the {@link ApplicationDescriptor} object to describe user application logic
4038
*/
41-
void onShutdown();
42-
43-
/**
44-
* Callback when the {@link StreamProcessor} fails
45-
* @param t Cause of the failure
46-
*/
47-
void onFailure(Throwable t);
48-
39+
void describe(S appDesc);
4940
}

samza-api/src/main/java/org/apache/samza/application/StreamApplication.java

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,27 @@
1919
package org.apache.samza.application;
2020

2121
import org.apache.samza.annotation.InterfaceStability;
22-
import org.apache.samza.config.Config;
23-
import org.apache.samza.operators.ContextManager;
24-
import org.apache.samza.operators.MessageStream;
25-
import org.apache.samza.operators.OutputStream;
26-
import org.apache.samza.operators.StreamGraph;
27-
import org.apache.samza.operators.functions.InitableFunction;
28-
import org.apache.samza.task.StreamTask;
29-
import org.apache.samza.task.TaskContext;
3022

3123
/**
32-
* Describes and initializes the transforms for processing message streams and generating results.
24+
* Describes and initializes the transforms for processing message streams and generating results in high-level API.
3325
* <p>
3426
* The following example removes page views older than 1 hour from the input stream:
3527
* <pre>{@code
36-
* public class PageViewCounter implements StreamApplication {
37-
* public void init(StreamGraph graph, Config config) {
38-
* MessageStream<PageViewEvent> pageViewEvents =
39-
* graph.getInputStream("pageViewEvents", (k, m) -> (PageViewEvent) m);
40-
* OutputStream<String, PageViewEvent, PageViewEvent> recentPageViewEvents =
41-
* graph.getOutputStream("recentPageViewEvents", m -> m.memberId, m -> m);
28+
* public class PageViewFilter implements StreamApplication {
29+
* public void describe(StreamAppDescriptor appDesc) {
30+
* KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
31+
* KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
32+
* trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
33+
*
34+
* KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
35+
* trackingSystem.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
36+
*
37+
* MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
38+
* OutputStream<PageViewEvent> recentPageViewEvents = appDesc.getOutputStream(outputStreamDescriptor);
4239
*
4340
* pageViewEvents
4441
* .filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
45-
* .sendTo(filteredPageViewEvents);
42+
* .sendTo(recentPageViewEvents);
4643
* }
4744
* }
4845
* }</pre>
@@ -52,46 +49,28 @@
5249
* public static void main(String[] args) {
5350
* CommandLine cmdLine = new CommandLine();
5451
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
55-
* PageViewCounter app = new PageViewCounter();
56-
* LocalApplicationRunner runner = new LocalApplicationRunner(config);
57-
* runner.run(app);
52+
* PageViewFilter app = new PageViewFilter();
53+
* ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
54+
* runner.run();
5855
* runner.waitForFinish();
5956
* }
6057
* }</pre>
6158
*
6259
* <p>
63-
* Implementation Notes: Currently StreamApplications are wrapped in a {@link StreamTask} during execution.
64-
* A new StreamApplication instance will be created and initialized with a user-defined {@link StreamGraph}
65-
* when planning the execution. The {@link StreamGraph} and the functions implemented for transforms are required to
66-
* be serializable. The execution planner will generate a serialized DAG which will be deserialized in each {@link StreamTask}
67-
* instance used for processing incoming messages. Execution is synchronous and thread-safe within each {@link StreamTask}.
60+
* Implementation Notes: Currently {@link StreamApplication}s are wrapped in a {@link org.apache.samza.task.StreamTask}
61+
* during execution. The execution planner will generate a serialized DAG which will be deserialized in each
62+
* {@link org.apache.samza.task.StreamTask} instance used for processing incoming messages. Execution is synchronous
63+
* and thread-safe within each {@link org.apache.samza.task.StreamTask}.
6864
*
6965
* <p>
66+
* A {@link StreamApplication} implementation must have a proper fully-qualified class name and a default constructor
67+
* with no parameters to ensure successful instantiation in both local and remote environments.
7068
* Functions implemented for transforms in StreamApplications ({@link org.apache.samza.operators.functions.MapFunction},
7169
* {@link org.apache.samza.operators.functions.FilterFunction} for e.g.) are initable and closable. They are initialized
72-
* before messages are delivered to them and closed after their execution when the {@link StreamTask} instance is closed.
73-
* See {@link InitableFunction} and {@link org.apache.samza.operators.functions.ClosableFunction}.
70+
* before messages are delivered to them and closed after their execution when the {@link org.apache.samza.task.StreamTask}
71+
* instance is closed. See {@link org.apache.samza.operators.functions.InitableFunction} and {@link org.apache.samza.operators.functions.ClosableFunction}.
72+
* Function implementations are required to be {@link java.io.Serializable}.
7473
*/
75-
@InterfaceStability.Unstable
76-
public interface StreamApplication {
77-
78-
/**
79-
* Describes and initializes the transforms for processing message streams and generating results.
80-
* <p>
81-
* The {@link StreamGraph} provides access to input and output streams. Input {@link MessageStream}s can be
82-
* transformed into other {@link MessageStream}s or sent to an {@link OutputStream} using the {@link MessageStream}
83-
* operators.
84-
* <p>
85-
* Most operators accept custom functions for doing the transformations. These functions are {@link InitableFunction}s
86-
* and are provided the {@link Config} and {@link TaskContext} during their own initialization. The config and the
87-
* context can be used, for example, to create custom metrics or access durable state stores.
88-
* <p>
89-
* A shared context between {@link InitableFunction}s for different operators within a task instance can be set
90-
* up by providing a {@link ContextManager} using {@link StreamGraph#withContextManager}.
91-
*
92-
* @param graph the {@link StreamGraph} to get input/output streams from
93-
* @param config the configuration for the application
94-
*/
95-
void init(StreamGraph graph, Config config);
96-
74+
@InterfaceStability.Evolving
75+
public interface StreamApplication extends SamzaApplication<StreamApplicationDescriptor> {
9776
}

samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java renamed to samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,41 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.samza.operators;
19+
package org.apache.samza.application;
2020

2121
import org.apache.samza.annotation.InterfaceStability;
22+
import org.apache.samza.operators.KV;
23+
import org.apache.samza.operators.MessageStream;
24+
import org.apache.samza.operators.OutputStream;
25+
import org.apache.samza.operators.TableDescriptor;
2226
import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
2327
import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
2428
import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
2529
import org.apache.samza.table.Table;
2630

2731

2832
/**
29-
* Provides access to {@link MessageStream}s and {@link OutputStream}s used to describe application logic.
33+
* The interface class to describe a {@link SamzaApplication} in high-level API in Samza.
3034
*/
31-
@InterfaceStability.Unstable
32-
public interface StreamGraph {
35+
@InterfaceStability.Evolving
36+
public interface StreamApplicationDescriptor extends ApplicationDescriptor<StreamApplicationDescriptor> {
3337

3438
/**
3539
* Sets the default SystemDescriptor to use for intermediate streams. This is equivalent to setting
3640
* {@code job.default.system} and its properties in configuration.
3741
* <p>
38-
* If the default system descriptor is set, it must be set <b>before</b> creating any intermediate streams.
42+
* If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
3943
* <p>
40-
* If the intermediate stream is created with a stream-level Serde, they will be used, else the serde specified
44+
* If an input/output stream is created with a stream-level Serde, they will be used, else the serde specified
4145
* for the {@code job.default.system} in configuration will be used.
4246
* <p>
4347
* Providing an incompatible message type for the intermediate streams that use the default serde will result in
4448
* {@link ClassCastException}s at runtime.
4549
*
4650
* @param defaultSystemDescriptor the default system descriptor to use
51+
* @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
4752
*/
48-
void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
53+
StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
4954

5055
/**
5156
* Gets the input {@link MessageStream} corresponding to the {@code inputDescriptor}.
@@ -105,16 +110,4 @@ public interface StreamGraph {
105110
* @throws IllegalStateException when invoked multiple times with the same {@link TableDescriptor}
106111
*/
107112
<K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
108-
109-
/**
110-
* Sets the {@link ContextManager} for this {@link StreamGraph}.
111-
* <p>
112-
* The provided {@link ContextManager} can be used to setup shared context between the operator functions
113-
* within a task instance
114-
*
115-
* @param contextManager the {@link ContextManager} to use for the {@link StreamGraph}
116-
* @return the {@link StreamGraph} with {@code contextManager} set as its {@link ContextManager}
117-
*/
118-
StreamGraph withContextManager(ContextManager contextManager);
119-
120-
}
113+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.application;
20+
21+
import org.apache.samza.annotation.InterfaceStability;
22+
23+
24+
/**
25+
* Describes and initializes the transforms for processing message streams and generating results in low-level API. Your
26+
* application is expected to implement this interface.
27+
* <p>
28+
* The following example removes page views older than 1 hour from the input stream:
29+
* <pre>{@code
30+
* public class PageViewFilter implements TaskApplication {
31+
* public void describe(TaskAppDescriptor appDesc) {
32+
* KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor(PageViewTask.SYSTEM);
33+
* KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
34+
* trackingSystem.getInputDescriptor(PageViewTask.TASK_INPUT, new JsonSerdeV2<>(PageViewEvent.class));
35+
*
36+
* KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
37+
* trackingSystem.getOutputDescriptor(PageViewTask.TASK_OUTPUT, new JsonSerdeV2<>(PageViewEvent.class)));
38+
*
39+
* appDesc.addInputStream(inputStreamDescriptor);
40+
* appDesc.addOutputStream(outputStreamDescriptor);
41+
* appDesc.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
42+
* }
43+
* }
44+
*
45+
* public class PageViewTask implements StreamTask {
46+
* final static String TASK_INPUT = "pageViewEvents";
47+
* final static String TASK_OUTPUT = "recentPageViewEvents";
48+
* final static String SYSTEM = "kafka";
49+
*
50+
* public void process(IncomingMessageEnvelope message, MessageCollector collector,
51+
* TaskCoordinator coordinator) {
52+
* PageViewEvent m = (PageViewEvent) message.getValue();
53+
* if (m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis()) {
54+
* collector.send(new OutgoingMessageEnvelope(new SystemStream(SYSTEM, TASK_OUTPUT),
55+
* message.getKey(), message.getKey(), m));
56+
* }
57+
* }
58+
* }
59+
* }</pre>
60+
*
61+
*<p>
62+
* The example above can be run using an ApplicationRunner:
63+
* <pre>{@code
64+
* public static void main(String[] args) {
65+
* CommandLine cmdLine = new CommandLine();
66+
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
67+
* PageViewFilter app = new PageViewFilter();
68+
* ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
69+
* runner.run();
70+
* runner.waitForFinish();
71+
* }
72+
* }</pre>
73+
*
74+
* <p>
75+
* Implementation Notes: {@link TaskApplication} allow users to instantiate {@link org.apache.samza.task.StreamTask} or
76+
* {@link org.apache.samza.task.AsyncStreamTask} when describing the processing logic. A new {@link TaskApplicationDescriptor }
77+
* instance will be created and described by the user-defined {@link TaskApplication} when planning the execution.
78+
* {@link org.apache.samza.task.TaskFactory} is required to be serializable.
79+
*
80+
* <p>
81+
* The user-implemented {@link TaskApplication} class must be a class with proper fully-qualified class name and
82+
* a default constructor with no parameters to ensure successful instantiation in both local and remote environments.
83+
*/
84+
@InterfaceStability.Evolving
85+
public interface TaskApplication extends SamzaApplication<TaskApplicationDescriptor> {
86+
}

0 commit comments

Comments
 (0)