diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index d11129ce8d89d..92940b487b364 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -64,6 +64,42 @@
+
+ org.apache.curator
+ curator-client
+ 2.4.0
+
+
+ org.apache.curator
+ curator-framework
+ 2.4.0
+
+
+ org.apache.curator
+ curator-recipes
+ 2.4.0
+
+
+ org.apache.curator
+ curator-test
+ 2.4.0
+ test
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.5
+
+
+ io.netty
+ netty
+
+
+ org.jboss.netty
+ netty
+
+
+
org.scala-lang
scala-library
diff --git a/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkPushSink.java b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkPushSink.java
new file mode 100644
index 0000000000000..a057d3c152ba6
--- /dev/null
+++ b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkPushSink.java
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.streaming.flume.sink;
+
+import org.apache.flume.api.*;
+import org.apache.flume.sink.AbstractRpcSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class SparkPushSink extends AbstractRpcSink {
+ private static final Logger logger = LoggerFactory.getLogger(SparkPushSink.class);
+
+ @Override
+ protected RpcClient initializeRpcClient(Properties props) {
+ logger.info("Attempting to create Avro Rpc client.");
+ SparkRpcClient client = new SparkRpcClient(props);
+ client.configure(props);
+ return client;
+ }
+}
\ No newline at end of file
diff --git a/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkRpcClient.java b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkRpcClient.java
new file mode 100644
index 0000000000000..7108feb2bb989
--- /dev/null
+++ b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkRpcClient.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.*;
+import org.apache.spark.streaming.flume.sink.utils.LogicalHostRouter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+ * configuration example:
+ * agent.sinks.ls1.hostname = benchmark
+ * agent.sinks.ls1.router.path=192.168.59.128:2181/spark [zookeeper path to logical host]
+ * agent.sinks.ls1.port = 0
+ * agent.sinks.ls1.router.retry.times=1 [optional]
+ * agent.sinks.ls1.router.retry.interval=1000 [optional]
+ */
+public class SparkRpcClient extends AbstractRpcClient implements RpcClient {
+ private static final Logger logger = LoggerFactory.getLogger(SparkRpcClient.class);
+ private static final String HOSTNAME_KEY = "hostname";
+ private static final String HOST_ROUTER_PATH = "router.path";
+ private static final String HOST_ROUTER_RETRY_TIMES = "router.retry.times";
+ private static final String HOST_ROUTER_RETRY_INTERVAL = "router.retry.interval";
+ private LogicalHostRouter router;
+ private Integer maxTries = 1;
+ private volatile boolean isActive = false;
+ private Properties configurationProperties;
+ private final ClientPool clientPool = new ClientPool();
+
+ private class ClientHandler {
+ private HostInfo hostInfo = null;
+ private Properties props = null;
+ private RpcClient client = null;
+ private volatile boolean isConnected = false;
+
+ public ClientHandler(HostInfo hostInfo, Properties props) {
+ this.hostInfo = hostInfo;
+ this.props = props;
+ }
+
+ public synchronized void close() {
+ if (isConnected) {
+ client.close();
+ isConnected = false;
+ client = null;
+ logger.info("closed client");
+ }
+ }
+
+ public synchronized RpcClient getClient() {
+ if (client == null) {
+ Properties props = new Properties();
+ props.putAll(configurationProperties);
+ props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
+ RpcClientFactory.ClientType.DEFAULT.name());
+ props.put(RpcClientConfigurationConstants.CONFIG_HOSTS,
+ hostInfo.getReferenceName());
+ props.put(
+ RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX
+ + hostInfo.getReferenceName(),
+ hostInfo.getHostName() + ":" + hostInfo.getPortNumber());
+ props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
+ RpcClientConfigurationConstants.DEFAULT_CLIENT_TYPE);
+ client = RpcClientFactory.getInstance(props);
+ isConnected = true;
+ logger.debug("create new RpcClient:" + hostInfo.getHostName() + ":" + hostInfo.getPortNumber());
+ }
+ return client;
+ }
+ }
+
+ private class ClientPool implements LogicalHostRouter.LogicalHostRouterListener {
+ private List clientHandlers = new CopyOnWriteArrayList();
+ private CountDownLatch emptyLatch = new CountDownLatch(1);
+ private AtomicInteger currentClientIndex = new AtomicInteger(0);
+
+ @Override
+ public void physicalHostAdded(String logicalHost, LogicalHostRouter.PhysicalHost physicalHost) {
+ logger.info("receive host added info " + physicalHost.toString());
+ HostInfo hostInfo = new HostInfo("h1", physicalHost.getIp(), physicalHost.getPort());
+ addHost(hostInfo);
+ }
+
+ @Override
+ public void physicalHostRemoved(String logicalHost, LogicalHostRouter.PhysicalHost physicalHost) {
+ logger.info("receive host removed info " + physicalHost.toString());
+ HostInfo hostInfo = new HostInfo("h1", physicalHost.getIp(), physicalHost.getPort());
+ removeHost(hostInfo);
+ }
+
+ private boolean isSameHost(HostInfo left, HostInfo right) {
+ return left.getHostName().equals(right.getHostName()) && left.getPortNumber() == right.getPortNumber();
+ }
+
+ public void addHost(HostInfo hostInfo) {
+ logger.info("add host " + hostInfo.getHostName() + ":" + hostInfo.getPortNumber());
+ for (ClientHandler handler : clientHandlers) {
+ if (isSameHost(handler.hostInfo, hostInfo)) {
+ return;
+ }
+ }
+ clientHandlers.add(new ClientHandler(hostInfo, configurationProperties));
+ emptyLatch.countDown();
+ logger.info("host added");
+ }
+
+ public void removeHost(HostInfo hostInfo) {
+ for (ClientHandler handler : clientHandlers) {
+ if (isSameHost(handler.hostInfo, hostInfo)) {
+ clientHandlers.remove(handler);
+ return;
+ }
+ }
+ }
+
+ public ClientHandler getClientHandler() {
+ int index = currentClientIndex.getAndIncrement();
+ if (currentClientIndex.get() >= clientHandlers.size()) {
+ currentClientIndex.set(0);
+ }
+ if (index >= clientHandlers.size()) {
+ index = 0;
+ }
+ try {
+ emptyLatch.await();
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage());
+ }
+ Preconditions.checkElementIndex(index, clientHandlers.size());
+ return clientHandlers.get(index);
+ }
+
+ public void close() {
+ FlumeException closeException = null;
+ int nSuccess = clientHandlers.size();
+ for (ClientHandler handler : clientHandlers) {
+ try {
+ handler.close();
+ ++nSuccess;
+ } catch (FlumeException e) {
+ closeException = e;
+ }
+ }
+ clientHandlers.clear();
+ if (closeException != null) {
+ throw new FlumeException("Close Exception total: " + clientHandlers.size()
+ + " success: " + nSuccess, closeException);
+ }
+ }
+ }
+
+ public SparkRpcClient(Properties props) {
+ this.configurationProperties = props;
+ }
+
+ private int getInteger(Properties prop, String key, int defaultValue) {
+ String value = prop.getProperty(key);
+ if (value != null && value.trim().length() > 0) {
+ try {
+ return Integer.parseInt(value.trim());
+ } catch (NumberFormatException e) {
+ logger.warn("invalid " + key + " is set, value: " + value);
+ }
+ }
+ return defaultValue;
+ }
+
+ //This function has to be synchronized to establish a happens-before
+ //relationship for different threads that access this object
+ //since shared data structures are created here.
+ private synchronized void configureHosts(Properties properties)
+ throws FlumeException {
+ if (isActive) {
+ logger.error("This client was already configured, " +
+ "cannot reconfigure.");
+ throw new FlumeException("This client was already configured, " +
+ "cannot reconfigure.");
+ }
+
+ String routerPath = properties.getProperty(HOST_ROUTER_PATH);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(routerPath), HOST_ROUTER_PATH + " is empty");
+ LogicalHostRouter.Conf conf = LogicalHostRouter.Conf.fromRouterPath(routerPath);
+ String routerRetryTimes = properties.getProperty(HOST_ROUTER_RETRY_TIMES);
+ if (routerRetryTimes != null) {
+ conf.setRetryTimes(Integer.parseInt(routerRetryTimes));
+ }
+ String routerRetryInterval = properties.getProperty(HOST_ROUTER_RETRY_INTERVAL);
+ if (routerRetryInterval != null) {
+ conf.setRetryInterval(Integer.parseInt(routerRetryInterval));
+ }
+ if (this.router != null) {
+ this.router.stop();
+ }
+ try {
+ router = new LogicalHostRouter(conf);
+ router.start();
+ String logicalHost = properties.getProperty(HOSTNAME_KEY);
+ List logicalHosts = new ArrayList();
+ logicalHosts.add(logicalHost);
+ router.registerListener(clientPool, logicalHosts);
+ List physicalHosts = router.getPhysicalHosts(logicalHost);
+ maxTries = Math.max(1, physicalHosts.size());
+ for (LogicalHostRouter.PhysicalHost host : physicalHosts) {
+ HostInfo hostInfo = new HostInfo("h1", host.getIp(), host.getPort());
+ clientPool.addHost(hostInfo);
+ }
+ } catch (IOException e) {
+ logger.error("failed to read hosts ", e);
+ throw new FlumeException("This client read hosts failed " + e.getMessage());
+ }
+ maxTries = getInteger(properties, RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS, maxTries);
+ batchSize = getInteger(properties, RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
+ RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE);
+ if (batchSize < 1) {
+ logger.warn("A batch-size less than 1 was specified: " + batchSize
+ + ". Using default instead.");
+ batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
+ }
+ isActive = true;
+ }
+
+ /**
+ * Tries to append an event to the currently connected client. If it cannot
+ * send the event, it tries to send to next available host
+ *
+ * @param event The event to be appended.
+ * @throws org.apache.flume.EventDeliveryException
+ */
+ @Override
+ public void append(Event event) throws EventDeliveryException {
+ synchronized (this) {
+ if (!isActive) {
+ logger.error("Attempting to append to an already closed client.");
+ throw new EventDeliveryException(
+ "Attempting to append to an already closed client.");
+ }
+ }
+ // Sit in an finite loop and try to append!
+ ClientHandler clientHandler = null;
+ for (int tries = 0; tries < maxTries; ++tries) {
+ try {
+ clientHandler = clientPool.getClientHandler();
+ clientHandler.getClient().append(event);
+ return;
+ } catch (EventDeliveryException e) {
+ // Could not send event through this client, try to pick another client.
+ logger.warn("Client failed. Exception follows: ", e);
+ clientHandler.close();
+ } catch (Exception e2) {
+ logger.error("Failed to send event: " + e2.getMessage());
+ if (clientHandler != null) {
+ clientHandler.close();
+ }
+ throw new EventDeliveryException(
+ "Failed to send event. Exception follows: ", e2);
+ }
+ }
+ logger.error("Tried many times, could not send event.");
+ throw new EventDeliveryException("Failed to send the event!");
+ }
+
+ /**
+ * Tries to append a list of events to the currently connected client. If it
+ * cannot send the event, it tries to send to next available host
+ *
+ * @param events The events to be appended.
+ * @throws EventDeliveryException
+ */
+ @Override
+ public void appendBatch(List events)
+ throws EventDeliveryException {
+ synchronized (this) {
+ if (!isActive) {
+ logger.error("Attempting to append to an already closed client.");
+ throw new EventDeliveryException(
+ "Attempting to append to an already closed client!");
+ }
+ }
+ ClientHandler clientHandler = null;
+ for (int tries = 0; tries < maxTries; ++tries) {
+ try {
+ clientHandler = clientPool.getClientHandler();
+ clientHandler.getClient().appendBatch(events);
+ return;
+ } catch (EventDeliveryException e) {
+ // Could not send event through this client, try to pick another client.
+ logger.warn("Client failed. Exception follows: " + e.getMessage());
+ clientHandler.close();
+ } catch (Exception e1) {
+ logger.error("No clients active: " + e1.getMessage());
+ if (clientHandler != null) {
+ clientHandler.close();
+ }
+ throw new EventDeliveryException("No clients currently active. " +
+ "Exception follows: ", e1);
+ }
+ }
+ logger.error("Tried many times, could not send event.");
+ throw new EventDeliveryException("Failed to send the event!");
+ }
+
+ // Returns false if and only if this client has been closed explicitly.
+ // Should we check if any clients are active, if none are then return false?
+ // This method has to be lightweight, so not checking if hosts are active.
+ @Override
+ public synchronized boolean isActive() {
+ return isActive;
+ }
+
+ /**
+ * Close the connection. This function is safe to call over and over.
+ */
+ @Override
+ public synchronized void close() throws FlumeException {
+ clientPool.close();
+ this.router.unregisterListener(clientPool);
+ this.router.stop();
+ }
+
+ @Override
+ public void configure(Properties properties) throws FlumeException {
+ configurationProperties = new Properties();
+ configurationProperties.putAll(properties);
+ configureHosts(configurationProperties);
+ }
+}
diff --git a/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/LogicalHostRouter.java b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/LogicalHostRouter.java
new file mode 100644
index 0000000000000..437d1a4f65a6e
--- /dev/null
+++ b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/LogicalHostRouter.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume.sink.utils;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/*
+ * LogicalHostRouter supplies a map between logical host and physical host.
+ * A physical host denotes a real socket address, which consists of one host and port. host:port
+ * A logical host consists of several physical hosts. [host1:port1, host2:port2, ...]
+ * user config a logical host instead of a physical host:port.
+ */
+public final class LogicalHostRouter {
+ private static final Logger LOG = LoggerFactory.getLogger(LogicalHostRouter.class);
+ private final static int LISTENER_EXECUTORS_NUMBER = 2;
+
+ private final Conf conf;
+ private ExecutorService executors = Executors.newFixedThreadPool(LISTENER_EXECUTORS_NUMBER);
+ private List listeners = new CopyOnWriteArrayList();
+ private ZkProxy zkProxy = null;
+
+ public LogicalHostRouter(Conf conf) {
+ this.conf = conf;
+ this.zkProxy = ZkProxy.get(new ZkProxy.Conf(conf.zkAddress, conf.zkRetryTimes,
+ conf.zkRetryIntervalInMs));
+ }
+
+ public void start() throws IOException {
+ try {
+ LOG.info("begin to start logical host router");
+ zkProxy.start();
+ } catch (Exception e) {
+ LOG.error("failed to start zkProxy:" + e.getMessage());
+ throw new IOException("failed to start logical host router:" + e.getMessage());
+ }
+ }
+
+ public void stop() {
+ LOG.info("stop logical host router");
+ try {
+ zkProxy.stop();
+ this.executors.shutdown();
+ this.executors.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ this.executors.shutdownNow();
+ } catch (Exception e) {
+ LOG.error("failed to stop " + e.getMessage());
+ }
+ }
+
+ // register a physical host to logical host
+ public void registerPhysicalHost(String logicalHost, PhysicalHost hostInfo) throws IOException {
+ String zkNodePath = getZkNodePath(logicalHost, hostInfo);
+ try {
+ zkProxy.create(zkNodePath, new byte[0], ZkProxy.ZkNodeMode.EPHEMERAL, true);
+ } catch (Exception e) {
+ throw new IOException("failed to register to " + zkNodePath, e);
+ }
+ }
+
+ // remove a physical host from logical host
+ public void unregisterPhysicalHost(String logicalHost, PhysicalHost hostInfo) throws
+ IOException {
+ String zkNodePath = getZkNodePath(logicalHost, hostInfo);
+ try {
+ zkProxy.delete(zkNodePath);
+ } catch (Exception e) {
+ throw new IOException("failed to unregister " + zkNodePath, e);
+ }
+ }
+
+ /*
+ * register a listener to watch physical hosts changes.
+ * @param watchedLogicalHosts logical hosts to be watched
+ *
+ * Notice: Currently, LogicalHostAdded event is not supported
+ */
+ public synchronized void registerListener(LogicalHostRouterListener listener, List watchedLogicalHosts) throws IOException {
+ try {
+ for (String logicalHost : watchedLogicalHosts) {
+ PhysicalHostUpdateEventListener zkListener = new PhysicalHostUpdateEventListener();
+ String zkPath = getZkNodePath(logicalHost);
+ zkProxy.addEventListener(zkPath, zkListener);
+ }
+ } catch (Exception e) {
+ LOG.error("failed to registerListener:" + e.getMessage());
+ throw new IOException(e);
+ }
+ listeners.add(listener);
+ }
+
+ public void unregisterListener(LogicalHostRouterListener listener) {
+ listeners.remove(listener);
+ }
+
+ public List getLogicalHosts() throws IOException {
+ String zkNodePath = conf.zkPath;
+ try {
+ return zkProxy.getChildren(zkNodePath);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public List getPhysicalHosts(String logicalHost) throws IOException {
+ String zkNodePath = getZkNodePath(logicalHost);
+ List results = new ArrayList();
+ try {
+ if (zkProxy.checkExists(zkNodePath)) {
+ List children = zkProxy.getChildren(zkNodePath);
+ for (String child : children) {
+ results.add(getPhysicalHost(child));
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("failed to get physical hosts from " + zkNodePath, e);
+ }
+ return results;
+ }
+
+ private void processPhysicalHostAddedEvent(String logicalHost, PhysicalHost physicalHost) {
+ for (LogicalHostRouterListener listener : listeners) {
+ listener.physicalHostAdded(logicalHost, physicalHost);
+ }
+ }
+
+ private void processPhysicalHostRemovedEvent(String logicalHost, PhysicalHost physicalHost) {
+ for (LogicalHostRouterListener listener : listeners) {
+ listener.physicalHostRemoved(logicalHost, physicalHost);
+ }
+ }
+
+ private PhysicalHost getPhysicalHost(String zkNodeName) {
+ String[] addr = zkNodeName.split(":");
+ Preconditions.checkState(addr.length == 2, addr + " is not in host:port format");
+ PhysicalHost host = new PhysicalHost(addr[0], Integer.parseInt(addr[1]));
+ return host;
+ }
+
+ private String getZkNodeName(PhysicalHost host) {
+ return host.ip + ":" + host.port;
+ }
+
+ private String getZkNodePath(String logicalHost) {
+ return conf.zkPath + "/" + logicalHost;
+ }
+
+ private String getZkNodePath(String logicalHost, PhysicalHost hostInfo) {
+ return getZkNodePath(logicalHost) + "/" + getZkNodeName(hostInfo);
+ }
+
+ /*
+ * Listen such event that a physical host is added or removed from a logical host
+ */
+ public static interface LogicalHostRouterListener {
+ void physicalHostAdded(final String logicalHost, final PhysicalHost hostInfo);
+
+ void physicalHostRemoved(final String logicalHost, final PhysicalHost hostInfo);
+ }
+
+ public static class PhysicalHost {
+ private String ip;
+ private int port;
+
+ public PhysicalHost(String ip, int port) {
+ this.ip = ip;
+ this.port = port;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public String toString() {
+ return ip + ":" + port;
+ }
+ }
+
+ public static class Conf {
+ private String zkAddress;
+ private String zkPath;
+ private int zkRetryIntervalInMs = 1000;
+ private int zkRetryTimes = 3;
+
+ public Conf setZkAddress(String zkAddress) {
+ this.zkAddress = zkAddress;
+ return this;
+ }
+
+ public Conf setZkPath(String zkPath) {
+ this.zkPath = zkPath;
+ return this;
+ }
+
+ public Conf setRetryTimes(int retryTimes) {
+ this.zkRetryTimes = retryTimes;
+ return this;
+ }
+
+ public Conf setRetryInterval(int retryIntervalInMs) {
+ this.zkRetryIntervalInMs = retryIntervalInMs;
+ return this;
+ }
+
+ public static Conf fromRouterPath(String routerPath) {
+ int index = routerPath.indexOf("/");
+ return new Conf().setZkAddress(routerPath.substring(0,
+ index)).setZkPath(routerPath.substring(index));
+ }
+ }
+
+ private class PhysicalHostUpdateEventListener implements ZkProxy.ChildrenEventListener {
+ @Override
+ public void childAddedEvent(String nodePath, byte[] data) {
+ final Info info = getInfo(nodePath);
+ if (info != null) {
+ executors.execute(new Runnable() {
+ @Override
+ public void run() {
+ processPhysicalHostAddedEvent(info.logicalHost, info.physicalHost);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void childRemovedEvent(String nodePath, byte[] data) {
+ final Info info = getInfo(nodePath);
+ if (info != null) {
+ executors.execute(new Runnable() {
+ @Override
+ public void run() {
+ processPhysicalHostRemovedEvent(info.logicalHost, info.physicalHost);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void childUpdatedEvent(String nodePath, byte[] data) {
+ }
+
+ // extract logicalhost and its physical hosts from zkNodePath
+ // eg. /path/to/logicalhost/host:port => {logicalhost, (host, port)}
+ private Info getInfo(String zkNodePath) {
+ int preIndex = zkNodePath.indexOf(conf.zkPath);
+ if (preIndex < 0) {
+ return null;
+ }
+ String[] parts = zkNodePath.substring(conf.zkPath.length() + 1).split("/");
+ if (parts.length == 2) {
+ Info info = new Info();
+ String[] hostInfo = parts[1].split(":");
+ if (hostInfo.length == 2) {
+ try {
+ info.logicalHost = parts[0];
+ info.physicalHost = new PhysicalHost(hostInfo[0],
+ Integer.parseInt(hostInfo[1]));
+ return info;
+ } catch (NumberFormatException e) {
+ LOG.error(e.getMessage());
+ }
+ }
+ }
+ return null;
+ }
+
+ private class Info {
+ String logicalHost;
+ PhysicalHost physicalHost;
+ }
+ }
+
+}
diff --git a/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/ZkProxy.java b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/ZkProxy.java
new file mode 100644
index 0000000000000..743681926511b
--- /dev/null
+++ b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/ZkProxy.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume.sink.utils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.curator.framework.recipes.cache.*;
+
+public class ZkProxy {
+ private static final Logger logger = LoggerFactory.getLogger(ZkProxy.class);
+ private static final byte[] BYTE_NULL = new byte[0];
+ // use Conf as key ?
+ private static final ConcurrentHashMap zkProxies = new ConcurrentHashMap();
+ private final ConcurrentHashMap listeners = new ConcurrentHashMap();
+ private final AtomicInteger referenceNumber = new AtomicInteger(0);
+ private final Conf conf;
+ private CuratorFramework curatorClient;
+ private AtomicBoolean started = new AtomicBoolean(false);
+
+ //initialize assignments cache
+ public static class Conf {
+ public Conf(String zkAddress, int retryTimes, int retryIntervalInMs) {
+ this.address = zkAddress;
+ this.retryTimes = retryTimes;
+ this.retryIntervalInMs = retryIntervalInMs;
+ }
+
+ private String address;
+ private int retryTimes;
+ private int retryIntervalInMs;
+ }
+
+ public enum ZkNodeMode {
+ PERSISTENT,
+ EPHEMERAL,
+ }
+
+ public interface ChildrenEventListener {
+ public void childAddedEvent(String nodePath, byte[] data);
+ public void childRemovedEvent(String nodePath, byte[] data);
+ public void childUpdatedEvent(String nodePath, byte[] data);
+ }
+
+ public class PathChildrenEventDispatch implements PathChildrenCacheListener {
+ private final PathChildrenCache pathCache;
+ private final List listeners = new CopyOnWriteArrayList();
+ private AtomicBoolean started = new AtomicBoolean(false);
+
+ public PathChildrenEventDispatch(String zkNodePath) {
+ pathCache = new PathChildrenCache(curatorClient, zkNodePath, false);
+ pathCache.getListenable().addListener(this);
+ }
+
+ public void addListener(ChildrenEventListener listener) {
+ logger.info("pathCache add listener ");
+ this.listeners.add(listener);
+ }
+
+ public void start() throws Exception {
+ if (started.compareAndSet(false, true)) {
+ pathCache.start(); // PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ logger.info("pathCache started");
+ }
+ }
+
+ public void close() {
+ if (started.compareAndSet(true, false)) {
+ try {
+ pathCache.close();
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+ String path = event.getData().getPath();
+ byte[] bytes = event.getData().getData();
+ logger.debug("receive event " + path + " type:" + event.getType());
+ switch (event.getType()) {
+ case CHILD_ADDED: {
+ for (ChildrenEventListener listener : listeners) {
+ listener.childAddedEvent(path, bytes);
+ }
+ break;
+ }
+ case CHILD_UPDATED: {
+ for (ChildrenEventListener listener : listeners) {
+ listener.childUpdatedEvent(path, bytes);
+ }
+ break;
+ }
+ case CHILD_REMOVED: {
+ for (ChildrenEventListener listener : listeners) {
+ listener.childRemovedEvent(path, bytes);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ }
+
+ private ZkProxy(Conf conf) {
+ this.conf = conf;
+ logger.info("new zkProxy " + conf.address);
+ }
+
+ public static ZkProxy get(Conf conf) {
+ if (!zkProxies.containsKey(conf.address)) {
+ zkProxies.putIfAbsent(conf.address, new ZkProxy(conf));
+ }
+ return zkProxies.get(conf.address);
+ }
+
+ public void start() {
+ logger.info("ZkProxy starting ...");
+ synchronized (started) {
+ if (referenceNumber.getAndIncrement() == 0) {
+ curatorClient = CuratorFrameworkFactory.newClient(conf.address,
+ new RetryNTimes(conf.retryTimes, conf.retryIntervalInMs));
+ curatorClient.start();
+ started.set(true);
+ }
+ }
+ logger.info("ZkProxy started success!");
+ }
+
+ public boolean isStarted() {
+ return started.get();
+ }
+
+ public void stop() {
+ logger.info("ZkProxy stopping ...");
+ synchronized (started) {
+ if (referenceNumber.decrementAndGet() == 0) {
+ started.set(false);
+ for (Map.Entry entry : listeners.entrySet()) {
+ entry.getValue().close();
+ }
+ listeners.clear();
+ curatorClient.close();
+ logger.info("ZkProxy has stopped!");
+ }
+ }
+ }
+
+ public void create(String zkNodePath, byte[] value, ZkNodeMode mode,
+ boolean creatingParentsIfNeeded) throws Exception {
+ Preconditions.checkState(started.get());
+ if (curatorClient.checkExists().forPath(zkNodePath) != null) {
+ logger.error(zkNodePath + " was already exists!");
+ throw new Exception(zkNodePath + " was already exists!");
+ }
+
+ // If value = null, Curator will set zkNode a default value: IP addr.
+ // But it's not expected, so we set value to byte[0] if value is null
+ byte[] bytes = value == null ? BYTE_NULL : value;
+ if (creatingParentsIfNeeded) {
+ curatorClient.create().creatingParentsIfNeeded().withMode(getZkCreateMode(mode)).forPath(zkNodePath, bytes);
+ } else {
+ curatorClient.create().withMode(getZkCreateMode(mode)).forPath(zkNodePath, bytes);
+ }
+ }
+
+ public boolean checkExists(String zkNodePath) throws Exception {
+ Preconditions.checkState(started.get());
+ return curatorClient.checkExists().forPath(zkNodePath) != null;
+ }
+
+ // Delete a node and return its value
+ // Notice: The deleted zkNodePath will not be listened any more.
+ // if you want listen zkNodePath after deleted, you need call addEventListener() again.
+ public byte[] delete(String zkNodePath) throws Exception {
+ Preconditions.checkState(started.get());
+ byte[] value = BYTE_NULL;
+ if (curatorClient.checkExists().forPath(zkNodePath) != null) {
+ try {
+ value = get(zkNodePath);
+ } catch (Exception e) {
+ logger.warn("get({}) in delete() Exception {}", zkNodePath, e);
+ }
+ curatorClient.delete().guaranteed().forPath(zkNodePath);
+ } else {
+ logger.warn("Failed to remove {}, path does not exist.", zkNodePath);
+ }
+ return value;
+ }
+
+ public byte[] get(String zkNodePath) throws Exception {
+ Preconditions.checkState(started.get());
+ return curatorClient.getData().forPath(zkNodePath);
+ }
+
+ public List getChildren(String zkNodePath) throws Exception {
+ Preconditions.checkState(started.get());
+ return curatorClient.getChildren().forPath(zkNodePath);
+ }
+
+ public void set(String zkNodePath, byte[] value) throws Exception {
+ Preconditions.checkState(started.get());
+ curatorClient.setData().forPath(zkNodePath, value);
+ }
+
+ // Add a listener to a zkNode, which will Keep watching the change of the zkNode.
+ // When the zkNodePath is deleted, the listener will be invalid.
+ public synchronized void addEventListener(String zkNodePath, final ChildrenEventListener eventListener) {
+ try {
+ logger.info("addEventListener on path " + zkNodePath);
+ PathChildrenEventDispatch dispatch = listeners.get(zkNodePath);
+ if (dispatch == null) {
+ dispatch = new PathChildrenEventDispatch(zkNodePath);
+ dispatch.start();
+ listeners.put(zkNodePath, dispatch);
+ }
+ dispatch.addListener(eventListener);
+ } catch (Exception e) {
+ logger.error("checkExists {} Exception.{}" + zkNodePath, e);
+ }
+ }
+
+ private CreateMode getZkCreateMode(ZkNodeMode mode) {
+ return mode == ZkNodeMode.EPHEMERAL ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
+ }
+}
+
diff --git a/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestLogicalHostRouter.java b/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestLogicalHostRouter.java
new file mode 100644
index 0000000000000..239b5c593b15b
--- /dev/null
+++ b/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestLogicalHostRouter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume.sink.utils;
+
+import com.google.common.io.Files;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class TestLogicalHostRouter {
+ private TestingServer zkServer;
+ private final int port = 2284;
+ private final String zkPath = "/router";
+
+ @Before
+ public void setUp() throws Exception {
+ File tempDir = Files.createTempDir();
+ zkServer = new TestingServer(port, tempDir);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ zkServer.stop();
+ }
+
+ private LogicalHostRouter getRouter() {
+ String zkAddr = zkServer.getConnectString();
+ LogicalHostRouter.Conf conf = new LogicalHostRouter.Conf().setZkAddress(zkAddr)
+ .setZkPath(zkPath);
+ return new LogicalHostRouter(conf);
+ }
+
+ private ZkProxy getZkProxy() {
+ return ZkProxy.get(new ZkProxy.Conf(zkServer.getConnectString(), 1, 1000));
+ }
+
+ @Test
+ public void testStartStop() throws IOException {
+ LogicalHostRouter router = getRouter();
+ router.start();
+ router.stop();
+ }
+
+ @Test
+ public void testGetLogicalHosts() throws Exception {
+ ZkProxy zkProxy = getZkProxy();
+ try {
+ zkProxy.start();
+ zkProxy.create(zkPath + "/A/phy1:2012", null, ZkProxy.ZkNodeMode.EPHEMERAL, true);
+ zkProxy.create(zkPath + "/B/phy1:2012", null, ZkProxy.ZkNodeMode.EPHEMERAL, true);
+ } finally {
+ zkProxy.stop();
+ }
+ LogicalHostRouter router = getRouter();
+ try {
+ router.start();
+ List logicalHosts = router.getLogicalHosts();
+ Assert.assertEquals("A", logicalHosts.get(0));
+ Assert.assertEquals("B", logicalHosts.get(1));
+ } finally {
+ router.stop();
+ }
+ }
+
+ @Test
+ public void testGetPhysicalHosts() throws Exception {
+ ZkProxy zkProxy = getZkProxy();
+ LogicalHostRouter router = getRouter();
+ try {
+ zkProxy.start();
+ zkProxy.create(zkPath + "/A/phy1:2012", null, ZkProxy.ZkNodeMode.EPHEMERAL, true);
+ router.start();
+ List hosts = router.getPhysicalHosts("A");
+ Assert.assertEquals(1, hosts.size());
+ Assert.assertEquals("phy1", hosts.get(0).getIp());
+ Assert.assertEquals(2012, hosts.get(0).getPort());
+ } finally {
+ zkProxy.stop();
+ router.stop();
+ }
+ }
+
+ @Test
+ public void testRegisterPhysicalHost() throws IOException {
+ LogicalHostRouter router = getRouter();
+ router.start();
+ try {
+ String logicalHost = "spark";
+ List hosts = router.getPhysicalHosts(logicalHost);
+ Assert.assertEquals(0, hosts.size());
+ router.registerPhysicalHost(logicalHost, new LogicalHostRouter.PhysicalHost("127.0.0.1", 80));
+ hosts = router.getPhysicalHosts(logicalHost);
+ Assert.assertEquals(1, hosts.size());
+ Assert.assertEquals("127.0.0.1", hosts.get(0).getIp());
+ Assert.assertEquals(80, hosts.get(0).getPort());
+ } finally {
+ router.stop();
+ }
+ }
+}
diff --git a/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestZkProxy.java b/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestZkProxy.java
new file mode 100644
index 0000000000000..0cdefc856600d
--- /dev/null
+++ b/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestZkProxy.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink.utils;
+
+import com.google.common.io.Files;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.curator.test.TestingServer;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.List;
+
+public class TestZkProxy implements Serializable {
+ private TestingServer server;
+ private int port = 2283;
+
+ @Before
+ public void setUp() throws Exception {
+ File file = Files.createTempDir();
+ System.err.println("tempPath:" + file.getAbsolutePath());
+ server = new TestingServer(port, file);
+ }
+
+ private ZkProxy getZkProxy() {
+ return ZkProxy.get(new ZkProxy.Conf(server.getConnectString(), 1, 1000));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.stop();
+ }
+
+ @Test
+ public void testState() {
+ ZkProxy proxy = getZkProxy();
+ Assert.assertEquals(false, proxy.isStarted());
+ proxy.start();
+ Assert.assertEquals(true, proxy.isStarted());
+ proxy.stop();
+ Assert.assertEquals(false, proxy.isStarted());
+ }
+
+ @Test
+ public void testCreateNormal() throws Exception {
+ ZkProxy proxy = getZkProxy();
+ proxy.start();
+ String path = "/proxy";
+ proxy.create(path, null, ZkProxy.ZkNodeMode.EPHEMERAL, true);
+ Assert.assertTrue(proxy.checkExists(path));
+ proxy.stop();
+ }
+
+ @Test
+ public void testCreateWithValue() throws Exception {
+ ZkProxy proxy = getZkProxy();
+ proxy.start();
+ String path = "/path";
+ String value = "my value";
+ proxy.create(path, value.getBytes(), ZkProxy.ZkNodeMode.PERSISTENT, true);
+ String realValue = new String(proxy.get(path));
+ Assert.assertEquals(value, realValue);
+ proxy.stop();
+ }
+
+ @Test
+ public void testCreateCascade() throws Exception {
+ ZkProxy proxy = getZkProxy();
+ proxy.start();
+ String path = "/a/b/c";
+ String value = "my value";
+ boolean occureException = false;
+ try {
+ proxy.create(path, value.getBytes(), ZkProxy.ZkNodeMode.PERSISTENT, false);
+ } catch (Exception e) {
+ occureException = true;
+ }
+ Assert.assertTrue(occureException);
+
+ proxy.create(path, value.getBytes(), ZkProxy.ZkNodeMode.PERSISTENT, true);
+ Assert.assertTrue(proxy.checkExists(path));
+ proxy.stop();
+ }
+
+ @Test
+ public void testSet() throws Exception {
+ ZkProxy proxy = getZkProxy();
+ proxy.start();
+ String path = "/p";
+ String value = "value1";
+ proxy.create(path, value.getBytes(), ZkProxy.ZkNodeMode.PERSISTENT, true);
+ Assert.assertEquals(value, new String(proxy.get(path)));
+ value = "value2";
+ proxy.set(path, value.getBytes());
+ Assert.assertEquals(value, new String(proxy.get(path)));
+ proxy.stop();
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ ZkProxy proxy = getZkProxy();
+ proxy.start();
+ String path = "/proxy";
+ proxy.create(path, null, ZkProxy.ZkNodeMode.EPHEMERAL, true);
+ proxy.delete(path);
+ Assert.assertTrue(!proxy.checkExists(path));
+ proxy.stop();
+ }
+
+ @Test
+ public void testGetChildren() throws Exception {
+ ZkProxy proxy = getZkProxy();
+ proxy.start();
+ proxy.create("/root/child1", null, ZkProxy.ZkNodeMode.PERSISTENT, true);
+ proxy.create("/root/child2", null, ZkProxy.ZkNodeMode.PERSISTENT, false);
+ List children = proxy.getChildren("/root");
+ Assert.assertTrue(children.size() == 2);
+ Assert.assertTrue(children.contains("child1"));
+ Assert.assertTrue(children.contains("child2"));
+ proxy.stop();
+ }
+}
\ No newline at end of file
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 4b2ea45fb81d0..202de964ec5ae 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -17,11 +17,14 @@
package org.apache.spark.streaming.flume
-import java.net.InetSocketAddress
+import java.net.{ServerSocket, InetSocketAddress}
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
import java.util.concurrent.Executors
+import org.apache.spark.streaming.flume.sink.utils.LogicalHostRouter
+import org.apache.spark.streaming.flume.sink.utils.LogicalHostRouter.PhysicalHost
+
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -30,7 +33,7 @@ import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
@@ -51,8 +54,17 @@ class FlumeInputDStream[T: ClassTag](
enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
+ def this(@transient ssc : StreamingContext, receiverPath: String,
+ storageLevel : StorageLevel, enableDecompression : Boolean) = {
+ this(ssc, receiverPath, -1, storageLevel, enableDecompression)
+ }
+
override def getReceiver(): Receiver[SparkFlumeEvent] = {
- new FlumeReceiver(host, port, storageLevel, enableDecompression)
+ if (port FlumePushingEventCount!= -1) {
+ new FlumeReceiver(host, port, storageLevel, enableDecompression)
+ } else {
+ new DynamicFlumeReceiver(host, storageLevel, enableDecompression)
+ }
}
}
@@ -121,7 +133,7 @@ private[streaming] object SparkFlumeEvent {
/** A simple server that implements Flume's Avro protocol. */
private[streaming]
-class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
+class FlumeEventServer(receiver : Receiver[SparkFlumeEvent]) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
@@ -148,30 +160,13 @@ class FlumeReceiver(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
var server: NettyServer = null
- private def initServer() = {
- if (enableDecompression) {
- val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool())
- val channelPipelineFactory = new CompressionChannelPipelineFactory()
-
- new NettyServer(
- responder,
- new InetSocketAddress(host, port),
- channelFactory,
- channelPipelineFactory,
- null)
- } else {
- new NettyServer(responder, new InetSocketAddress(host, port))
- }
- }
-
def onStart() {
synchronized {
if (server == null) {
- server = initServer()
+ server = FlumeReceiver.initServer(responder, host, port, enableDecompression)
server.start()
} else {
- logWarning("Flume receiver being asked to start more then once with out close")
+ logWarning("Flume receiver being asked to start more then once without close")
}
}
logInfo("Flume receiver started")
@@ -188,13 +183,34 @@ class FlumeReceiver(
}
override def preferredLocation = Some(host)
-
- /** A Netty Pipeline factory that will decompress incoming data from
+}
+
+//private[Streaming]
+object FlumeReceiver {
+ def initServer(responder : SpecificResponder, host : String, port : Int,
+ enableDecompression : Boolean) : NettyServer = {
+ if (enableDecompression) {
+ val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool())
+ val channelPipelineFactory = new CompressionChannelPipelineFactory()
+
+ new NettyServer(
+ responder,
+ new InetSocketAddress(host, port),
+ channelFactory,
+ channelPipelineFactory,
+ null)
+ } else {
+ new NettyServer(responder, new InetSocketAddress(host, port))
+ }
+ }
+
+ /** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
* The compression on the return is required because Flume requires
- * a successful response to indicate it can remove the event/batch
- * from the configured channel
+ * a successful response to indicate it can remove the event/batch
+ * from the configured channel
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
@@ -205,6 +221,57 @@ class FlumeReceiver(
pipeline.addFirst("deflater", encoder)
pipeline.addFirst("inflater", new ZlibDecoder())
pipeline
+ }
}
}
+
+/** A NetworkReceiver which listens for events using the Flume Avro interface.
+ * @param address zookeeperAddress/path/to/logicalhost
+ */
+private[streaming]
+class DynamicFlumeReceiver(
+ address : String,
+ storageLevel: StorageLevel,
+ enableDecompression : Boolean
+) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+ val logicalHost = address.substring(address.lastIndexOf("/") + 1)
+ val routerPath = address.substring(0, address.lastIndexOf("/"))
+ lazy val routerConf = LogicalHostRouter.Conf.fromRouterPath(routerPath)
+ lazy val hostRouter = new LogicalHostRouter(routerConf)
+ lazy val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ lazy val hostName = Utils.localIpAddress // InetAddress.getLocalHost.getHostAddress
+ lazy val hostPort = selectFreePort
+ var server : NettyServer = null
+ def onStart() {
+ synchronized {
+ if (server == null) {
+ server = FlumeReceiver.initServer(responder, hostName, hostPort, enableDecompression)
+ server.start()
+ hostRouter.start()
+ hostRouter.registerPhysicalHost(logicalHost, getPhysicalHost)
+ } else {
+ logWarning("Flume receiver being asked to start more then once without close")
+ }
+ }
+ logInfo("Flume receiver started")
+ }
+
+ private def getPhysicalHost = new PhysicalHost(hostName, hostPort)
+
+ private def selectFreePort : Int = {
+ val serverSocket : ServerSocket = new ServerSocket(0)
+ val port = serverSocket.getLocalPort()
+ serverSocket.close()
+ logInfo("select a free port " + port)
+ port
+ }
+
+ def onStop() {
+ hostRouter.unregisterPhysicalHost(logicalHost, getPhysicalHost)
+ hostRouter.stop()
+ server.close()
+ logInfo("Flume receiver stopped")
+ }
+ override def preferredLocation = None
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 4b732c1592ab2..3dbdb698dd178 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object FlumeUtils {
@@ -243,4 +243,27 @@ object FlumeUtils {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
}
+
+ /**
+ * Creates multi input stream from a Flume source.
+ * @param receiverPath A zookeeper path to store receivers, format:zkAddress/path/to/receivers
+ * eg. localhost:2181/spark/receivers
+ * @param numReceivers the number of receivers to be started
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createMultiStream(
+ ssc: StreamingContext,
+ receiverPath: String,
+ numReceivers : Int,
+ storageLevel : StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ enableDecompression : Boolean = false
+ ): DStream[SparkFlumeEvent] = {
+ var inputStream : DStream[SparkFlumeEvent] = new FlumeInputDStream[SparkFlumeEvent](ssc,
+ receiverPath, storageLevel, enableDecompression)
+ for (i <- 1 until numReceivers) {
+ inputStream = inputStream.union(new FlumeInputDStream[SparkFlumeEvent](ssc, receiverPath,
+ storageLevel, enableDecompression))
+ }
+ inputStream
+ }
}