From f4660c5cb41d9b5ef737b38e7e38abf3b2f2e31c Mon Sep 17 00:00:00 2001 From: joyyoj Date: Tue, 3 Jun 2014 21:15:11 +0800 Subject: [PATCH 1/3] [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not read properly --- .../org/apache/spark/streaming/flume/FlumeInputDStream.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 5be33f1d5c428..ed35e34ad45ab 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -71,12 +71,12 @@ class SparkFlumeEvent() extends Externalizable { for (i <- 0 until numHeaders) { val keyLength = in.readInt() val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) + in.readFully(keyBuff) val key : String = Utils.deserialize(keyBuff) val valLength = in.readInt() val valBuff = new Array[Byte](valLength) - in.read(valBuff) + in.readFully(valBuff) val value : String = Utils.deserialize(valBuff) headers.put(key, value) From 6bb8372b6637d61691a9254ae367a27e41b98e2c Mon Sep 17 00:00:00 2001 From: joyyoj Date: Thu, 31 Jul 2014 23:26:15 +0800 Subject: [PATCH 2/3] SparkPushSink --- .../spark/flume/sink/SparkPushSink.java | 37 ++ .../spark/flume/sink/SparkRpcClient.java | 349 ++++++++++++++++++ .../flume/sink/utils/LogicalHostRouter.java | 303 +++++++++++++++ .../spark/flume/sink/utils/ZkProxy.java | 267 ++++++++++++++ .../sink/utils/TestLogicalHostRouter.java | 121 ++++++ .../spark/flume/sink/utils/TestZkProxy.java | 138 +++++++ 6 files changed, 1215 insertions(+) create mode 100644 external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkPushSink.java create mode 100644 external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkRpcClient.java create mode 100644 external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/LogicalHostRouter.java create mode 100644 external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/ZkProxy.java create mode 100644 external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestLogicalHostRouter.java create mode 100644 external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestZkProxy.java diff --git a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkPushSink.java b/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkPushSink.java new file mode 100644 index 0000000000000..c826b49f56fb0 --- /dev/null +++ b/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkPushSink.java @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.flume.sink; + +import org.apache.flume.api.*; +import org.apache.flume.sink.AbstractRpcSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class SparkPushSink extends AbstractRpcSink { + private static final Logger logger = LoggerFactory.getLogger(SparkPushSink.class); + + @Override + protected RpcClient initializeRpcClient(Properties props) { + logger.info("Attempting to create Avro Rpc client."); + SparkRpcClient client = new SparkRpcClient(props); + client.configure(props); + return client; + } +} \ No newline at end of file diff --git a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkRpcClient.java b/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkRpcClient.java new file mode 100644 index 0000000000000..cd4e5ac57246b --- /dev/null +++ b/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkRpcClient.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.flume.sink; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.api.*; +import org.apache.spark.flume.sink.utils.LogicalHostRouter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +public class SparkRpcClient extends AbstractRpcClient implements RpcClient { + private static final Logger logger = LoggerFactory.getLogger(SparkRpcClient.class); + private static final String HOSTNAME_KEY = "hostname"; + private static final String HOST_ROUTER_PATH = "router.path"; + private static final String HOST_ROUTER_RETRY_TIMES = "router.retry.times"; + private static final String HOST_ROUTER_RETRY_INTERVAL = "router.retry.interval"; + private LogicalHostRouter router; + private Integer maxTries = 1; + private volatile boolean isActive = false; + private Properties configurationProperties; + private final ClientPool clientPool = new ClientPool(); + + private class ClientHandler { + private HostInfo hostInfo = null; + private Properties props = null; + private RpcClient client = null; + private volatile boolean isConnected = false; + + public ClientHandler(HostInfo hostInfo, Properties props) { + this.hostInfo = hostInfo; + this.props = props; + } + + public synchronized void close() { + if (isConnected) { + client.close(); + isConnected = false; + client = null; + logger.info("closed client"); + } + } + + public synchronized RpcClient getClient() { + if (client == null) { + Properties props = new Properties(); + props.putAll(configurationProperties); + props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, + RpcClientFactory.ClientType.DEFAULT.name()); + props.put(RpcClientConfigurationConstants.CONFIG_HOSTS, + hostInfo.getReferenceName()); + props.put( + RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + + hostInfo.getReferenceName(), + hostInfo.getHostName() + ":" + hostInfo.getPortNumber()); + props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, + RpcClientConfigurationConstants.DEFAULT_CLIENT_TYPE); + client = RpcClientFactory.getInstance(props); +// client = new NettyAvroRpcClient(); +// ((NettyAvroRpcClient) client).configure(props); + isConnected = true; + logger.debug("create new RpcClient:" + hostInfo.getHostName() + ":" + hostInfo.getPortNumber()); + } + return client; + } + } + + private class ClientPool implements LogicalHostRouter.LogicalHostRouterListener { + private List clientHandlers = new CopyOnWriteArrayList(); + private CountDownLatch emptyLatch = new CountDownLatch(1); + private AtomicInteger currentClientIndex = new AtomicInteger(0); + + @Override + public void physicalHostAdded(String logicalHost, LogicalHostRouter.PhysicalHost physicalHost) { + logger.info("receive host added info " + physicalHost.toString()); + HostInfo hostInfo = new HostInfo("h1", physicalHost.getIp(), physicalHost.getPort()); + addHost(hostInfo); + } + + @Override + public void physicalHostRemoved(String logicalHost, LogicalHostRouter.PhysicalHost physicalHost) { + logger.info("receive host removed info " + physicalHost.toString()); + HostInfo hostInfo = new HostInfo("h1", physicalHost.getIp(), physicalHost.getPort()); + removeHost(hostInfo); + } + + private boolean isSameHost(HostInfo left, HostInfo right) { + return left.getHostName().equals(right.getHostName()) && left.getPortNumber() == right.getPortNumber(); + } + + public void addHost(HostInfo hostInfo) { + logger.info("add host " + hostInfo.getHostName() + ":" + hostInfo.getPortNumber()); + for (ClientHandler handler : clientHandlers) { + if (isSameHost(handler.hostInfo, hostInfo)) { + return; + } + } + clientHandlers.add(new ClientHandler(hostInfo, configurationProperties)); + emptyLatch.countDown(); + logger.info("host added"); + } + + public void removeHost(HostInfo hostInfo) { + for (ClientHandler handler : clientHandlers) { + if (isSameHost(handler.hostInfo, hostInfo)) { + clientHandlers.remove(handler); + return; + } + } + } + + public ClientHandler getClientHandler() { + int index = currentClientIndex.getAndIncrement(); + if (currentClientIndex.get() >= clientHandlers.size()) { + currentClientIndex.set(0); + } + if (index >= clientHandlers.size()) { + index = 0; + } + try { + emptyLatch.await(); + } catch (InterruptedException e) { + logger.error(e.getMessage()); + } + Preconditions.checkElementIndex(index, clientHandlers.size()); + return clientHandlers.get(index); + } + + public void close() { + FlumeException closeException = null; + int nSuccess = clientHandlers.size(); + for (ClientHandler handler : clientHandlers) { + try { + handler.close(); + ++nSuccess; + } catch (FlumeException e) { + closeException = e; + } + } + clientHandlers.clear(); + if (closeException != null) { + throw new FlumeException("Close Exception total: " + clientHandlers.size() + + " success: " + nSuccess, closeException); + } + } + } + + public SparkRpcClient(Properties props) { + this.configurationProperties = props; + } + + private int getInteger(Properties prop, String key, int defaultValue) { + String value = prop.getProperty(key); + if (value != null && value.trim().length() > 0) { + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException e) { + logger.warn("invalid " + key + " is set, value: " + value); + } + } + return defaultValue; + } + + //This function has to be synchronized to establish a happens-before + //relationship for different threads that access this object + //since shared data structures are created here. + private synchronized void configureHosts(Properties properties) + throws FlumeException { + if (isActive) { + logger.error("This client was already configured, " + + "cannot reconfigure."); + throw new FlumeException("This client was already configured, " + + "cannot reconfigure."); + } + + String routerPath = properties.getProperty(HOST_ROUTER_PATH); + Preconditions.checkArgument(!Strings.isNullOrEmpty(routerPath), HOST_ROUTER_PATH + " is empty"); + LogicalHostRouter.Conf conf = LogicalHostRouter.Conf.fromRouterPath(routerPath); + String routerRetryTimes = properties.getProperty(HOST_ROUTER_RETRY_TIMES); + if (routerRetryTimes != null) { + conf.setRetryTimes(Integer.parseInt(routerRetryTimes)); + } + String routerRetryInterval = properties.getProperty(HOST_ROUTER_RETRY_INTERVAL); + if (routerRetryInterval != null) { + conf.setRetryInterval(Integer.parseInt(routerRetryInterval)); + } + if (this.router != null) { + this.router.stop(); + } + try { + router = new LogicalHostRouter(conf); + router.start(); + String logicalHost = properties.getProperty(HOSTNAME_KEY); + List logicalHosts = new ArrayList(); + logicalHosts.add(logicalHost); + router.registerListener(clientPool, logicalHosts); + List physicalHosts = router.getPhysicalHosts(logicalHost); + maxTries = Math.max(1, physicalHosts.size()); + for (LogicalHostRouter.PhysicalHost host : physicalHosts) { + HostInfo hostInfo = new HostInfo("h1", host.getIp(), host.getPort()); + clientPool.addHost(hostInfo); + } + } catch (IOException e) { + logger.error("failed to read hosts ", e); + throw new FlumeException("This client read hosts failed " + e.getMessage()); + } + maxTries = getInteger(properties, RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS, maxTries); + batchSize = getInteger(properties, RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, + RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE); + if (batchSize < 1) { + logger.warn("A batch-size less than 1 was specified: " + batchSize + + ". Using default instead."); + batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE; + } + isActive = true; + } + + /** + * Tries to append an event to the currently connected client. If it cannot + * send the event, it tries to send to next available host + * + * @param event The event to be appended. + * @throws org.apache.flume.EventDeliveryException + */ + @Override + public void append(Event event) throws EventDeliveryException { + synchronized (this) { + if (!isActive) { + logger.error("Attempting to append to an already closed client."); + throw new EventDeliveryException( + "Attempting to append to an already closed client."); + } + } + // Sit in an finite loop and try to append! + ClientHandler clientHandler = null; + for (int tries = 0; tries < maxTries; ++tries) { + try { + clientHandler = clientPool.getClientHandler(); + clientHandler.getClient().append(event); + return; + } catch (EventDeliveryException e) { + // Could not send event through this client, try to pick another client. + logger.warn("Client failed. Exception follows: ", e); + clientHandler.close(); + } catch (Exception e2) { + logger.error("Failed to send event: " + e2.getMessage()); + if (clientHandler != null) { + clientHandler.close(); + } + throw new EventDeliveryException( + "Failed to send event. Exception follows: ", e2); + } + } + logger.error("Tried many times, could not send event."); + throw new EventDeliveryException("Failed to send the event!"); + } + + /** + * Tries to append a list of events to the currently connected client. If it + * cannot send the event, it tries to send to next available host + * + * @param events The events to be appended. + * @throws EventDeliveryException + */ + @Override + public void appendBatch(List events) + throws EventDeliveryException { + synchronized (this) { + if (!isActive) { + logger.error("Attempting to append to an already closed client."); + throw new EventDeliveryException( + "Attempting to append to an already closed client!"); + } + } + ClientHandler clientHandler = null; + for (int tries = 0; tries < maxTries; ++tries) { + try { + clientHandler = clientPool.getClientHandler(); + clientHandler.getClient().appendBatch(events); + return; + } catch (EventDeliveryException e) { + // Could not send event through this client, try to pick another client. + logger.warn("Client failed. Exception follows: " + e.getMessage()); + clientHandler.close(); + } catch (Exception e1) { + logger.error("No clients active: " + e1.getMessage()); + if (clientHandler != null) { + clientHandler.close(); + } + throw new EventDeliveryException("No clients currently active. " + + "Exception follows: ", e1); + } + } + logger.error("Tried many times, could not send event."); + throw new EventDeliveryException("Failed to send the event!"); + } + + // Returns false if and only if this client has been closed explicitly. + // Should we check if any clients are active, if none are then return false? + // This method has to be lightweight, so not checking if hosts are active. + @Override + public synchronized boolean isActive() { + return isActive; + } + + /** + * Close the connection. This function is safe to call over and over. + */ + @Override + public synchronized void close() throws FlumeException { + clientPool.close(); + this.router.unregisterListener(clientPool); + this.router.stop(); + this.router = null; + } + + @Override + public void configure(Properties properties) throws FlumeException { + configurationProperties = new Properties(); + configurationProperties.putAll(properties); + configureHosts(configurationProperties); + } +} diff --git a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/LogicalHostRouter.java b/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/LogicalHostRouter.java new file mode 100644 index 0000000000000..60d3fd9167af8 --- /dev/null +++ b/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/LogicalHostRouter.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.flume.sink.utils; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/* + * LogicalHostRouter supplies a map between logical host and physical host. + * A physical host denotes a real socket address, which consists of one host and port. host:port + * A logical host consists of several physical hosts. [host1:port1, host2:port2, ...] + * user hold a logical host instead of a actual host:port. + */ +public final class LogicalHostRouter { + private static final Logger LOG = LoggerFactory.getLogger(LogicalHostRouter.class); + private final static int LISTENER_EXECUTORS_NUMBER = 2; + + private final Conf conf; + private ExecutorService executors = Executors.newFixedThreadPool(LISTENER_EXECUTORS_NUMBER); + private List listeners = new CopyOnWriteArrayList(); + private ZkProxy zkProxy = null; + + public LogicalHostRouter(Conf conf) { + this.conf = conf; + this.zkProxy = ZkProxy.get(new ZkProxy.Conf(conf.zkAddress, conf.zkRetryTimes, + conf.zkRetryIntervalInMs)); + } + + public void start() throws IOException { + try { + LOG.info("begin to start logical host router"); + zkProxy.start(); + } catch (Exception e) { + LOG.error("failed to start zkProxy:" + e.getMessage()); + throw new IOException("failed to start logical host router:" + e.getMessage()); + } + } + + public void stop() { + LOG.info("stop logical host router"); + try { + zkProxy.stop(); + this.executors.shutdown(); + this.executors.awaitTermination(1000, TimeUnit.MILLISECONDS); + this.executors.shutdownNow(); + } catch (Exception e) { + LOG.error("failed to stop " + e.getMessage()); + } + } + + // register a physical host to logical host + public void registerPhysicalHost(String logicalHost, PhysicalHost hostInfo) throws IOException { + String zkNodePath = getZkNodePath(logicalHost, hostInfo); + try { + zkProxy.create(zkNodePath, new byte[0], ZkProxy.ZkNodeMode.EPHEMERAL, true); + } catch (Exception e) { + throw new IOException("failed to register to " + zkNodePath, e); + } + } + + // remove a physical host from logical host + public void unregisterPhysicalHost(String logicalHost, PhysicalHost hostInfo) throws + IOException { + String zkNodePath = getZkNodePath(logicalHost, hostInfo); + try { + zkProxy.delete(zkNodePath); + } catch (Exception e) { + throw new IOException("failed to unregister " + zkNodePath, e); + } + } + + /* + * register a listener to watch physical hosts changes. + * @param watchedLogicalHosts logical hosts to be watched + * + * Notice: Currently, LogicalHostAdded event is not supported + */ + public synchronized void registerListener(LogicalHostRouterListener listener, List watchedLogicalHosts) throws IOException { + try { + for (String logicalHost : watchedLogicalHosts) { + PhysicalHostUpdateEventListener zkListener = new PhysicalHostUpdateEventListener(); + String zkPath = getZkNodePath(logicalHost); + zkProxy.addEventListener(zkPath, zkListener); + } + } catch (Exception e) { + LOG.error("failed to registerListener:" + e.getMessage()); + throw new IOException(e); + } + listeners.add(listener); + } + + public void unregisterListener(LogicalHostRouterListener listener) { + listeners.remove(listener); + } + + public List getLogicalHosts() throws IOException { + String zkNodePath = conf.zkPath; + try { + return zkProxy.getChildren(zkNodePath); + } catch (Exception e) { + throw new IOException(e); + } + } + + public List getPhysicalHosts(String logicalHost) throws IOException { + String zkNodePath = getZkNodePath(logicalHost); + List results = new ArrayList(); + try { + if (zkProxy.checkExists(zkNodePath)) { + List children = zkProxy.getChildren(zkNodePath); + for (String child : children) { + results.add(getPhysicalHost(child)); + } + } + } catch (Exception e) { + throw new IOException("failed to get physical hosts from " + zkNodePath, e); + } + return results; + } + + private void processPhysicalHostAddedEvent(String logicalHost, PhysicalHost physicalHost) { + for (LogicalHostRouterListener listener : listeners) { + listener.physicalHostAdded(logicalHost, physicalHost); + } + } + + private void processPhysicalHostRemovedEvent(String logicalHost, PhysicalHost physicalHost) { + for (LogicalHostRouterListener listener : listeners) { + listener.physicalHostRemoved(logicalHost, physicalHost); + } + } + + private PhysicalHost getPhysicalHost(String zkNodeName) { + String[] addr = zkNodeName.split(":"); + Preconditions.checkState(addr.length == 2, addr + " is not in host:port format"); + PhysicalHost host = new PhysicalHost(addr[0], Integer.parseInt(addr[1])); + return host; + } + + private String getZkNodeName(PhysicalHost host) { + return host.ip + ":" + host.port; + } + + private String getZkNodePath(String logicalHost) { + return conf.zkPath + "/" + logicalHost; + } + + private String getZkNodePath(String logicalHost, PhysicalHost hostInfo) { + return getZkNodePath(logicalHost) + "/" + getZkNodeName(hostInfo); + } + + /* + * Listen such event that a physical host is added or removed from a logical host + */ + public static interface LogicalHostRouterListener { + void physicalHostAdded(final String logicalHost, final PhysicalHost hostInfo); + + void physicalHostRemoved(final String logicalHost, final PhysicalHost hostInfo); + } + + public static class PhysicalHost { + private String ip; + private int port; + + public PhysicalHost(String ip, int port) { + this.ip = ip; + this.port = port; + } + + public String getIp() { + return ip; + } + + public int getPort() { + return port; + } + + @Override + public String toString() { + return ip + ":" + port; + } + } + + public static class Conf { + private String zkAddress; + private String zkPath; + private int zkRetryIntervalInMs = 1000; + private int zkRetryTimes = 3; + + public Conf setZkAddress(String zkAddress) { + this.zkAddress = zkAddress; + return this; + } + + public Conf setZkPath(String zkPath) { + this.zkPath = zkPath; + return this; + } + + public Conf setRetryTimes(int retryTimes) { + this.zkRetryTimes = retryTimes; + return this; + } + + public Conf setRetryInterval(int retryIntervalInMs) { + this.zkRetryIntervalInMs = retryIntervalInMs; + return this; + } + + public static Conf fromRouterPath(String routerPath) { + int index = routerPath.indexOf("/"); + return new Conf().setZkAddress(routerPath.substring(0, + index)).setZkPath(routerPath.substring(index)); + } + } + + private class PhysicalHostUpdateEventListener implements ZkProxy.ChildrenEventListener { + @Override + public void childAddedEvent(String nodePath, byte[] data) { + final Info info = getInfo(nodePath); + if (info != null) { + executors.execute(new Runnable() { + @Override + public void run() { + processPhysicalHostAddedEvent(info.logicalHost, info.physicalHost); + } + }); + } + } + + @Override + public void childRemovedEvent(String nodePath, byte[] data) { + final Info info = getInfo(nodePath); + if (info != null) { + executors.execute(new Runnable() { + @Override + public void run() { + processPhysicalHostRemovedEvent(info.logicalHost, info.physicalHost); + } + }); + } + } + + @Override + public void childUpdatedEvent(String nodePath, byte[] data) { + } + + // extract logicalhost and its physical hosts from zkNodePath + // eg. /path/to/logicalhost/host:port => {logicalhost, (host, port)} + private Info getInfo(String zkNodePath) { + int preIndex = zkNodePath.indexOf(conf.zkPath); + if (preIndex < 0) { + return null; + } + String[] parts = zkNodePath.substring(conf.zkPath.length() + 1).split("/"); + if (parts.length == 2) { + Info info = new Info(); + String[] hostInfo = parts[1].split(":"); + if (hostInfo.length == 2) { + try { + info.logicalHost = parts[0]; + info.physicalHost = new PhysicalHost(hostInfo[0], + Integer.parseInt(hostInfo[1])); + return info; + } catch (NumberFormatException e) { + LOG.error(e.getMessage()); + } + } + } + return null; + } + + private class Info { + String logicalHost; + PhysicalHost physicalHost; + } + } + +} diff --git a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/ZkProxy.java b/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/ZkProxy.java new file mode 100644 index 0000000000000..02a7824f12d9f --- /dev/null +++ b/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/ZkProxy.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.flume.sink.utils; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.curator.framework.recipes.cache.*; + +public class ZkProxy { + private static final Logger logger = LoggerFactory.getLogger(ZkProxy.class); + private static final byte[] BYTE_NULL = new byte[0]; + // use Conf as key ? + private static final ConcurrentHashMap zkProxies = new ConcurrentHashMap(); + private final Conf conf; + private final ConcurrentHashMap listeners = new ConcurrentHashMap(); + private final AtomicInteger referenceNumber = new AtomicInteger(0); + private CuratorFramework curatorClient; + private AtomicBoolean started = new AtomicBoolean(false); +// private static LoadingCache zkProxys = CacheBuilder.newBuilder() +// .weakValues().removalListener(new RemovalListener() { +// @Override +// public void onRemoval(RemovalNotification objectObjectRemovalNotification) { +// objectObjectRemovalNotification.getValue().stop(); +// } +// }).build(new CacheLoader() { +// @Override +// public ZkProxy load(Conf zkConf) throws Exception { +// return new ZkProxy(zkConf); +// } +// }); + + //initialize assignments cache + public static class Conf { + public Conf(String zkAddress) { + this(zkAddress, 3, 1000); + } + + public Conf(String zkAddress, int retryTimes, int retryIntervalInMs) { + this.address = zkAddress; + this.retryTimes = retryTimes; + this.retryIntervalInMs = retryIntervalInMs; + } + + private String address; + private int retryTimes; + private int retryIntervalInMs; + } + + public enum ZkNodeMode { + PERSISTENT, + EPHEMERAL, + } + + public interface ChildrenEventListener { + public void childAddedEvent(String nodePath, byte[] data); + public void childRemovedEvent(String nodePath, byte[] data); + public void childUpdatedEvent(String nodePath, byte[] data); + } + + public class PathChildrenEventDispatch implements PathChildrenCacheListener { + private final PathChildrenCache pathCache; + private final List listeners = new CopyOnWriteArrayList(); + private AtomicBoolean started = new AtomicBoolean(false); + + public PathChildrenEventDispatch(String zkNodePath) { + pathCache = new PathChildrenCache(curatorClient, zkNodePath, false); + pathCache.getListenable().addListener(this); + } + + public void addListener(ChildrenEventListener listener) { + logger.info("pathCache add listener "); + this.listeners.add(listener); + } + + public void start() throws Exception { + if (started.compareAndSet(false, true)) { + pathCache.start(); + logger.info("pathCache started"); + } + } + + public void close() { + if (started.compareAndSet(true, false)) { + try { + pathCache.close(); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + } + + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + String path = event.getData().getPath(); + byte[] bytes = event.getData().getData(); + logger.debug("receive event " + path + " type:" + event.getType()); + switch (event.getType()) { + case CHILD_ADDED: { + for (ChildrenEventListener listener : listeners) { + listener.childAddedEvent(path, bytes); + } + break; + } + case CHILD_UPDATED: { + for (ChildrenEventListener listener : listeners) { + listener.childUpdatedEvent(path, bytes); + } + break; + } + case CHILD_REMOVED: { + for (ChildrenEventListener listener : listeners) { + listener.childRemovedEvent(path, bytes); + } + break; + } + default: + break; + } + } + } + + private ZkProxy(Conf conf) { + this.conf = conf; + logger.info("new zkProxy " + conf.address); + } + + public static ZkProxy get(Conf conf) { + if (!zkProxies.containsKey(conf.address)) { + zkProxies.putIfAbsent(conf.address, new ZkProxy(conf)); + } + return zkProxies.get(conf.address); + } + + public synchronized void start() { + logger.info("ZkProxy starting ..."); + if (referenceNumber.getAndIncrement() == 0) { + curatorClient = CuratorFrameworkFactory.newClient(conf.address, + new RetryNTimes(conf.retryTimes, conf.retryIntervalInMs)); + curatorClient.start(); + started.compareAndSet(false, true); + logger.info("ZkProxy started success!"); + } + } + + public boolean isStarted() { + return started.get(); + } + + public synchronized void stop() { + logger.info("ZkProxy Stop called..."); + if (referenceNumber.decrementAndGet() == 0) { + started.compareAndSet(true, false); + for (Map.Entry entry : listeners.entrySet()) { + entry.getValue().close(); + } + listeners.clear(); + curatorClient.close(); + logger.info("ZkProxy is Stoped!"); + } + } + + public void create(String zkNodePath, byte[] value, ZkNodeMode mode, + boolean creatingParentsIfNeeded) throws Exception { + Preconditions.checkState(started.get()); + if (curatorClient.checkExists().forPath(zkNodePath) != null) { + logger.error(zkNodePath + " was already exists!"); + throw new Exception(zkNodePath + " was already exists!"); + } + + // If value = null, Curator will set zkNode a default value: IP addr. + // But it's not expected, so we set value to byte[0] if value is null + byte[] bytes = value == null ? BYTE_NULL : value; + if (creatingParentsIfNeeded) { + curatorClient.create().creatingParentsIfNeeded().withMode(getZkCreateMode(mode)).forPath(zkNodePath, bytes); + } else { + curatorClient.create().withMode(getZkCreateMode(mode)).forPath(zkNodePath, bytes); + } + } + + public boolean checkExists(String zkNodePath) throws Exception { + Preconditions.checkState(started.get()); + return curatorClient.checkExists().forPath(zkNodePath) != null; + } + + // Delete a node and return its value + // Notice: The deleted zkNodePath will not be listened any more. + // if you want listen zkNodePath after deleted, you need call addEventListener() again. + public byte[] delete(String zkNodePath) throws Exception { + Preconditions.checkState(started.get()); + byte[] value = BYTE_NULL; + if (curatorClient.checkExists().forPath(zkNodePath) != null) { + try { + value = get(zkNodePath); + } catch (Exception e) { + logger.warn("get({}) in delete() Exception {}", zkNodePath, e); + } + curatorClient.delete().guaranteed().forPath(zkNodePath); + } else { + logger.warn("Failed to remove {}, path does not exist.", zkNodePath); + } + return value; + } + + public byte[] get(String zkNodePath) throws Exception { + Preconditions.checkState(started.get()); + return curatorClient.getData().forPath(zkNodePath); + } + + public List getChildren(String zkNodePath) throws Exception { + Preconditions.checkState(started.get()); + return curatorClient.getChildren().forPath(zkNodePath); + } + + public void set(String zkNodePath, byte[] value) throws Exception { + Preconditions.checkState(started.get()); + curatorClient.setData().forPath(zkNodePath, value); + } + + // Add a listener to a zkNode, which will Keep watching the change of the zkNode. + // When the zkNodePath is deleted, the listener will be invalid. + public synchronized void addEventListener(String zkNodePath, final ChildrenEventListener eventListener) { + try { + logger.info("addEventListener on path " + zkNodePath); + PathChildrenEventDispatch dispatch = listeners.get(zkNodePath); + if (dispatch == null) { + dispatch = new PathChildrenEventDispatch(zkNodePath); + dispatch.start(); + listeners.put(zkNodePath, dispatch); + } + dispatch.addListener(eventListener); + } catch (Exception e) { + logger.error("checkExists {} Exception.{}" + zkNodePath, e); + } + } + + private CreateMode getZkCreateMode(ZkNodeMode mode) { + return mode == ZkNodeMode.EPHEMERAL ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; + } +} + diff --git a/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestLogicalHostRouter.java b/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestLogicalHostRouter.java new file mode 100644 index 0000000000000..25bbbf9a5ddd6 --- /dev/null +++ b/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestLogicalHostRouter.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.flume.sink.utils; + +import com.google.common.io.Files; +import org.apache.curator.test.TestingServer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +public class TestLogicalHostRouter { + private TestingServer zkServer; + private final int port = 2284; + private final String zkPath = "/router"; + + @Before + public void setUp() throws Exception { + File tempDir = Files.createTempDir(); + zkServer = new TestingServer(port, tempDir); + } + + @After + public void tearDown() throws Exception { + zkServer.stop(); + } + + private LogicalHostRouter getRouter() { + String zkAddr = zkServer.getConnectString(); + LogicalHostRouter.Conf conf = new LogicalHostRouter.Conf().setZkAddress(zkAddr) + .setZkPath(zkPath); + return new LogicalHostRouter(conf); + } + + private ZkProxy getZkProxy() { + return ZkProxy.get(new ZkProxy.Conf(zkServer.getConnectString(), 1, 1000)); + } + + @Test + public void testStartStop() throws IOException { + LogicalHostRouter router = getRouter(); + router.start(); + router.stop(); + } + + @Test + public void testGetLogicalHosts() throws Exception { + ZkProxy zkProxy = getZkProxy(); + try { + zkProxy.start(); + zkProxy.create(zkPath + "/A/phy1:2012", null, ZkProxy.ZkNodeMode.EPHEMERAL, true); + zkProxy.create(zkPath + "/B/phy1:2012", null, ZkProxy.ZkNodeMode.EPHEMERAL, true); + } finally { + zkProxy.stop(); + } + LogicalHostRouter router = getRouter(); + try { + router.start(); + List logicalHosts = router.getLogicalHosts(); + Assert.assertEquals("A", logicalHosts.get(0)); + Assert.assertEquals("B", logicalHosts.get(1)); + } finally { + router.stop(); + } + } + + @Test + public void testGetPhysicalHosts() throws Exception { + ZkProxy zkProxy = getZkProxy(); + LogicalHostRouter router = getRouter(); + try { + zkProxy.start(); + zkProxy.create(zkPath + "/A/phy1:2012", null, ZkProxy.ZkNodeMode.EPHEMERAL, true); + router.start(); + List hosts = router.getPhysicalHosts("A"); + Assert.assertEquals(1, hosts.size()); + Assert.assertEquals("phy1", hosts.get(0).getIp()); + Assert.assertEquals(2012, hosts.get(0).getPort()); + } finally { + zkProxy.stop(); + router.stop(); + } + } + + @Test + public void testRegisterPhysicalHost() throws IOException { + LogicalHostRouter router = getRouter(); + router.start(); + try { + String logicalHost = "spark"; + List hosts = router.getPhysicalHosts(logicalHost); + Assert.assertEquals(0, hosts.size()); + router.registerPhysicalHost(logicalHost, new LogicalHostRouter.PhysicalHost("127.0.0.1", 80)); + hosts = router.getPhysicalHosts(logicalHost); + Assert.assertEquals(1, hosts.size()); + Assert.assertEquals("127.0.0.1", hosts.get(0).getIp()); + Assert.assertEquals(80, hosts.get(0).getPort()); + } finally { + router.stop(); + } + } +} diff --git a/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestZkProxy.java b/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestZkProxy.java new file mode 100644 index 0000000000000..06ce121ea5368 --- /dev/null +++ b/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestZkProxy.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.flume.sink.utils; + +import com.google.common.io.Files; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.apache.curator.test.TestingServer; + +import java.io.File; +import java.io.Serializable; +import java.util.List; + +public class TestZkProxy implements Serializable { + private TestingServer server; + private int port = 2283; + + @Before + public void setUp() throws Exception { + File file = Files.createTempDir(); + System.err.println("tempPath:" + file.getAbsolutePath()); + server = new TestingServer(port, file); + } + + private ZkProxy getZkProxy() { + return ZkProxy.get(new ZkProxy.Conf(server.getConnectString(), 1, 1000)); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void testState() { + ZkProxy proxy = getZkProxy(); + Assert.assertEquals(false, proxy.isStarted()); + proxy.start(); + Assert.assertEquals(true, proxy.isStarted()); + proxy.stop(); + Assert.assertEquals(false, proxy.isStarted()); + } + + @Test + public void testCreateNormal() throws Exception { + ZkProxy proxy = getZkProxy(); + proxy.start(); + String path = "/proxy"; + proxy.create(path, null, ZkProxy.ZkNodeMode.EPHEMERAL, true); + Assert.assertTrue(proxy.checkExists(path)); + proxy.stop(); + } + + @Test + public void testCreateWithValue() throws Exception { + ZkProxy proxy = getZkProxy(); + proxy.start(); + String path = "/path"; + String value = "my value"; + proxy.create(path, value.getBytes(), ZkProxy.ZkNodeMode.PERSISTENT, true); + String realValue = new String(proxy.get(path)); + Assert.assertEquals(value, realValue); + proxy.stop(); + } + + @Test + public void testCreateCascade() throws Exception { + ZkProxy proxy = getZkProxy(); + proxy.start(); + String path = "/a/b/c"; + String value = "my value"; + boolean occureException = false; + try { + proxy.create(path, value.getBytes(), ZkProxy.ZkNodeMode.PERSISTENT, false); + } catch (Exception e) { + occureException = true; + } + Assert.assertTrue(occureException); + + proxy.create(path, value.getBytes(), ZkProxy.ZkNodeMode.PERSISTENT, true); + Assert.assertTrue(proxy.checkExists(path)); + proxy.stop(); + } + + @Test + public void testSet() throws Exception { + ZkProxy proxy = getZkProxy(); + proxy.start(); + String path = "/p"; + String value = "value1"; + proxy.create(path, value.getBytes(), ZkProxy.ZkNodeMode.PERSISTENT, true); + Assert.assertEquals(value, new String(proxy.get(path))); + value = "value2"; + proxy.set(path, value.getBytes()); + Assert.assertEquals(value, new String(proxy.get(path))); + proxy.stop(); + } + + @Test + public void testDelete() throws Exception { + ZkProxy proxy = getZkProxy(); + proxy.start(); + String path = "/proxy"; + proxy.create(path, null, ZkProxy.ZkNodeMode.EPHEMERAL, true); + proxy.delete(path); + Assert.assertTrue(!proxy.checkExists(path)); + proxy.stop(); + } + + @Test + public void testGetChildren() throws Exception { + ZkProxy proxy = getZkProxy(); + proxy.start(); + proxy.create("/root/child1", null, ZkProxy.ZkNodeMode.PERSISTENT, true); + proxy.create("/root/child2", null, ZkProxy.ZkNodeMode.PERSISTENT, false); + List children = proxy.getChildren("/root"); + Assert.assertTrue(children.size() == 2); + Assert.assertTrue(children.contains("child1")); + Assert.assertTrue(children.contains("child2")); + proxy.stop(); + } +} \ No newline at end of file From 1de7f6eb27a48246d990d581df41b053f5d13d7a Mon Sep 17 00:00:00 2001 From: joyyoj Date: Sun, 3 Aug 2014 23:30:54 +0800 Subject: [PATCH 3/3] update flume-sink --- external/flume-sink/pom.xml | 36 ++++++ .../flume/sink/SparkPushSink.java | 2 +- .../flume/sink/SparkRpcClient.java | 15 ++- .../flume/sink/utils/LogicalHostRouter.java | 4 +- .../flume/sink/utils/ZkProxy.java | 61 ++++----- .../sink/utils/TestLogicalHostRouter.java | 2 +- .../flume/sink/utils/TestZkProxy.java | 2 +- .../streaming/flume/FlumeInputDStream.scala | 121 ++++++++++++++---- .../spark/streaming/flume/FlumeUtils.scala | 25 +++- 9 files changed, 194 insertions(+), 74 deletions(-) rename external/flume-sink/src/main/java/org/apache/spark/{ => streaming}/flume/sink/SparkPushSink.java (96%) rename external/flume-sink/src/main/java/org/apache/spark/{ => streaming}/flume/sink/SparkRpcClient.java (97%) rename external/flume-sink/src/main/java/org/apache/spark/{ => streaming}/flume/sink/utils/LogicalHostRouter.java (98%) rename external/flume-sink/src/main/java/org/apache/spark/{ => streaming}/flume/sink/utils/ZkProxy.java (84%) rename external/flume-sink/src/test/java/org/apache/spark/{ => streaming}/flume/sink/utils/TestLogicalHostRouter.java (98%) rename external/flume-sink/src/test/java/org/apache/spark/{ => streaming}/flume/sink/utils/TestZkProxy.java (98%) diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index d11129ce8d89d..92940b487b364 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -64,6 +64,42 @@ + + org.apache.curator + curator-client + 2.4.0 + + + org.apache.curator + curator-framework + 2.4.0 + + + org.apache.curator + curator-recipes + 2.4.0 + + + org.apache.curator + curator-test + 2.4.0 + test + + + org.apache.zookeeper + zookeeper + 3.4.5 + + + io.netty + netty + + + org.jboss.netty + netty + + + org.scala-lang scala-library diff --git a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkPushSink.java b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkPushSink.java similarity index 96% rename from external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkPushSink.java rename to external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkPushSink.java index c826b49f56fb0..a057d3c152ba6 100644 --- a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkPushSink.java +++ b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkPushSink.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.flume.sink; +package org.apache.spark.streaming.flume.sink; import org.apache.flume.api.*; import org.apache.flume.sink.AbstractRpcSink; diff --git a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkRpcClient.java b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkRpcClient.java similarity index 97% rename from external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkRpcClient.java rename to external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkRpcClient.java index cd4e5ac57246b..7108feb2bb989 100644 --- a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/SparkRpcClient.java +++ b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/SparkRpcClient.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.flume.sink; +package org.apache.spark.streaming.flume.sink; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -23,7 +23,7 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.api.*; -import org.apache.spark.flume.sink.utils.LogicalHostRouter; +import org.apache.spark.streaming.flume.sink.utils.LogicalHostRouter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +35,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +/* + * configuration example: + * agent.sinks.ls1.hostname = benchmark + * agent.sinks.ls1.router.path=192.168.59.128:2181/spark [zookeeper path to logical host] + * agent.sinks.ls1.port = 0 + * agent.sinks.ls1.router.retry.times=1 [optional] + * agent.sinks.ls1.router.retry.interval=1000 [optional] + */ public class SparkRpcClient extends AbstractRpcClient implements RpcClient { private static final Logger logger = LoggerFactory.getLogger(SparkRpcClient.class); private static final String HOSTNAME_KEY = "hostname"; @@ -82,8 +90,6 @@ public synchronized RpcClient getClient() { props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, RpcClientConfigurationConstants.DEFAULT_CLIENT_TYPE); client = RpcClientFactory.getInstance(props); -// client = new NettyAvroRpcClient(); -// ((NettyAvroRpcClient) client).configure(props); isConnected = true; logger.debug("create new RpcClient:" + hostInfo.getHostName() + ":" + hostInfo.getPortNumber()); } @@ -337,7 +343,6 @@ public synchronized void close() throws FlumeException { clientPool.close(); this.router.unregisterListener(clientPool); this.router.stop(); - this.router = null; } @Override diff --git a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/LogicalHostRouter.java b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/LogicalHostRouter.java similarity index 98% rename from external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/LogicalHostRouter.java rename to external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/LogicalHostRouter.java index 60d3fd9167af8..437d1a4f65a6e 100644 --- a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/LogicalHostRouter.java +++ b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/LogicalHostRouter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.flume.sink.utils; +package org.apache.spark.streaming.flume.sink.utils; import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -33,7 +33,7 @@ * LogicalHostRouter supplies a map between logical host and physical host. * A physical host denotes a real socket address, which consists of one host and port. host:port * A logical host consists of several physical hosts. [host1:port1, host2:port2, ...] - * user hold a logical host instead of a actual host:port. + * user config a logical host instead of a physical host:port. */ public final class LogicalHostRouter { private static final Logger LOG = LoggerFactory.getLogger(LogicalHostRouter.class); diff --git a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/ZkProxy.java b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/ZkProxy.java similarity index 84% rename from external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/ZkProxy.java rename to external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/ZkProxy.java index 02a7824f12d9f..743681926511b 100644 --- a/external/flume-sink/src/main/java/org/apache/spark/flume/sink/utils/ZkProxy.java +++ b/external/flume-sink/src/main/java/org/apache/spark/streaming/flume/sink/utils/ZkProxy.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.spark.flume.sink.utils; +package org.apache.spark.streaming.flume.sink.utils; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -38,30 +39,14 @@ public class ZkProxy { private static final byte[] BYTE_NULL = new byte[0]; // use Conf as key ? private static final ConcurrentHashMap zkProxies = new ConcurrentHashMap(); - private final Conf conf; private final ConcurrentHashMap listeners = new ConcurrentHashMap(); private final AtomicInteger referenceNumber = new AtomicInteger(0); + private final Conf conf; private CuratorFramework curatorClient; private AtomicBoolean started = new AtomicBoolean(false); -// private static LoadingCache zkProxys = CacheBuilder.newBuilder() -// .weakValues().removalListener(new RemovalListener() { -// @Override -// public void onRemoval(RemovalNotification objectObjectRemovalNotification) { -// objectObjectRemovalNotification.getValue().stop(); -// } -// }).build(new CacheLoader() { -// @Override -// public ZkProxy load(Conf zkConf) throws Exception { -// return new ZkProxy(zkConf); -// } -// }); //initialize assignments cache public static class Conf { - public Conf(String zkAddress) { - this(zkAddress, 3, 1000); - } - public Conf(String zkAddress, int retryTimes, int retryIntervalInMs) { this.address = zkAddress; this.retryTimes = retryTimes; @@ -101,7 +86,7 @@ public void addListener(ChildrenEventListener listener) { public void start() throws Exception { if (started.compareAndSet(false, true)) { - pathCache.start(); + pathCache.start(); // PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); logger.info("pathCache started"); } } @@ -110,7 +95,7 @@ public void close() { if (started.compareAndSet(true, false)) { try { pathCache.close(); - } catch (Exception e) { + } catch (IOException e) { logger.error(e.getMessage()); } } @@ -158,31 +143,35 @@ public static ZkProxy get(Conf conf) { return zkProxies.get(conf.address); } - public synchronized void start() { + public void start() { logger.info("ZkProxy starting ..."); - if (referenceNumber.getAndIncrement() == 0) { - curatorClient = CuratorFrameworkFactory.newClient(conf.address, - new RetryNTimes(conf.retryTimes, conf.retryIntervalInMs)); - curatorClient.start(); - started.compareAndSet(false, true); - logger.info("ZkProxy started success!"); + synchronized (started) { + if (referenceNumber.getAndIncrement() == 0) { + curatorClient = CuratorFrameworkFactory.newClient(conf.address, + new RetryNTimes(conf.retryTimes, conf.retryIntervalInMs)); + curatorClient.start(); + started.set(true); + } } + logger.info("ZkProxy started success!"); } public boolean isStarted() { return started.get(); } - public synchronized void stop() { - logger.info("ZkProxy Stop called..."); - if (referenceNumber.decrementAndGet() == 0) { - started.compareAndSet(true, false); - for (Map.Entry entry : listeners.entrySet()) { - entry.getValue().close(); + public void stop() { + logger.info("ZkProxy stopping ..."); + synchronized (started) { + if (referenceNumber.decrementAndGet() == 0) { + started.set(false); + for (Map.Entry entry : listeners.entrySet()) { + entry.getValue().close(); + } + listeners.clear(); + curatorClient.close(); + logger.info("ZkProxy has stopped!"); } - listeners.clear(); - curatorClient.close(); - logger.info("ZkProxy is Stoped!"); } } diff --git a/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestLogicalHostRouter.java b/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestLogicalHostRouter.java similarity index 98% rename from external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestLogicalHostRouter.java rename to external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestLogicalHostRouter.java index 25bbbf9a5ddd6..239b5c593b15b 100644 --- a/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestLogicalHostRouter.java +++ b/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestLogicalHostRouter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.flume.sink.utils; +package org.apache.spark.streaming.flume.sink.utils; import com.google.common.io.Files; import org.apache.curator.test.TestingServer; diff --git a/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestZkProxy.java b/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestZkProxy.java similarity index 98% rename from external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestZkProxy.java rename to external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestZkProxy.java index 06ce121ea5368..0cdefc856600d 100644 --- a/external/flume-sink/src/test/java/org/apache/spark/flume/sink/utils/TestZkProxy.java +++ b/external/flume-sink/src/test/java/org/apache/spark/streaming/flume/sink/utils/TestZkProxy.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.flume.sink.utils; +package org.apache.spark.streaming.flume.sink.utils; import com.google.common.io.Files; import org.junit.After; diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 4b2ea45fb81d0..202de964ec5ae 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -17,11 +17,14 @@ package org.apache.spark.streaming.flume -import java.net.InetSocketAddress +import java.net.{ServerSocket, InetSocketAddress} import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.nio.ByteBuffer import java.util.concurrent.Executors +import org.apache.spark.streaming.flume.sink.utils.LogicalHostRouter +import org.apache.spark.streaming.flume.sink.utils.LogicalHostRouter.PhysicalHost + import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -30,7 +33,7 @@ import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.flume.source.avro.Status import org.apache.avro.ipc.specific.SpecificResponder import org.apache.avro.ipc.NettyServer -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ @@ -51,8 +54,17 @@ class FlumeInputDStream[T: ClassTag]( enableDecompression: Boolean ) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { + def this(@transient ssc : StreamingContext, receiverPath: String, + storageLevel : StorageLevel, enableDecompression : Boolean) = { + this(ssc, receiverPath, -1, storageLevel, enableDecompression) + } + override def getReceiver(): Receiver[SparkFlumeEvent] = { - new FlumeReceiver(host, port, storageLevel, enableDecompression) + if (port FlumePushingEventCount!= -1) { + new FlumeReceiver(host, port, storageLevel, enableDecompression) + } else { + new DynamicFlumeReceiver(host, storageLevel, enableDecompression) + } } } @@ -121,7 +133,7 @@ private[streaming] object SparkFlumeEvent { /** A simple server that implements Flume's Avro protocol. */ private[streaming] -class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { +class FlumeEventServer(receiver : Receiver[SparkFlumeEvent]) extends AvroSourceProtocol { override def append(event : AvroFlumeEvent) : Status = { receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)) Status.OK @@ -148,30 +160,13 @@ class FlumeReceiver( classOf[AvroSourceProtocol], new FlumeEventServer(this)) var server: NettyServer = null - private def initServer() = { - if (enableDecompression) { - val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool()) - val channelPipelineFactory = new CompressionChannelPipelineFactory() - - new NettyServer( - responder, - new InetSocketAddress(host, port), - channelFactory, - channelPipelineFactory, - null) - } else { - new NettyServer(responder, new InetSocketAddress(host, port)) - } - } - def onStart() { synchronized { if (server == null) { - server = initServer() + server = FlumeReceiver.initServer(responder, host, port, enableDecompression) server.start() } else { - logWarning("Flume receiver being asked to start more then once with out close") + logWarning("Flume receiver being asked to start more then once without close") } } logInfo("Flume receiver started") @@ -188,13 +183,34 @@ class FlumeReceiver( } override def preferredLocation = Some(host) - - /** A Netty Pipeline factory that will decompress incoming data from +} + +//private[Streaming] +object FlumeReceiver { + def initServer(responder : SpecificResponder, host : String, port : Int, + enableDecompression : Boolean) : NettyServer = { + if (enableDecompression) { + val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), + Executors.newCachedThreadPool()) + val channelPipelineFactory = new CompressionChannelPipelineFactory() + + new NettyServer( + responder, + new InetSocketAddress(host, port), + channelFactory, + channelPipelineFactory, + null) + } else { + new NettyServer(responder, new InetSocketAddress(host, port)) + } + } + + /** A Netty Pipeline factory that will decompress incoming data from * and the Netty client and compress data going back to the client. * * The compression on the return is required because Flume requires - * a successful response to indicate it can remove the event/batch - * from the configured channel + * a successful response to indicate it can remove the event/batch + * from the configured channel */ private[streaming] class CompressionChannelPipelineFactory extends ChannelPipelineFactory { @@ -205,6 +221,57 @@ class FlumeReceiver( pipeline.addFirst("deflater", encoder) pipeline.addFirst("inflater", new ZlibDecoder()) pipeline + } } } + +/** A NetworkReceiver which listens for events using the Flume Avro interface. + * @param address zookeeperAddress/path/to/logicalhost + */ +private[streaming] +class DynamicFlumeReceiver( + address : String, + storageLevel: StorageLevel, + enableDecompression : Boolean +) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { + val logicalHost = address.substring(address.lastIndexOf("/") + 1) + val routerPath = address.substring(0, address.lastIndexOf("/")) + lazy val routerConf = LogicalHostRouter.Conf.fromRouterPath(routerPath) + lazy val hostRouter = new LogicalHostRouter(routerConf) + lazy val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + lazy val hostName = Utils.localIpAddress // InetAddress.getLocalHost.getHostAddress + lazy val hostPort = selectFreePort + var server : NettyServer = null + def onStart() { + synchronized { + if (server == null) { + server = FlumeReceiver.initServer(responder, hostName, hostPort, enableDecompression) + server.start() + hostRouter.start() + hostRouter.registerPhysicalHost(logicalHost, getPhysicalHost) + } else { + logWarning("Flume receiver being asked to start more then once without close") + } + } + logInfo("Flume receiver started") + } + + private def getPhysicalHost = new PhysicalHost(hostName, hostPort) + + private def selectFreePort : Int = { + val serverSocket : ServerSocket = new ServerSocket(0) + val port = serverSocket.getLocalPort() + serverSocket.close() + logInfo("select a free port " + port) + port + } + + def onStop() { + hostRouter.unregisterPhysicalHost(logicalHost, getPhysicalHost) + hostRouter.stop() + server.close() + logInfo("Flume receiver stopped") + } + override def preferredLocation = None } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 4b732c1592ab2..3dbdb698dd178 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object FlumeUtils { @@ -243,4 +243,27 @@ object FlumeUtils { ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) } + + /** + * Creates multi input stream from a Flume source. + * @param receiverPath A zookeeper path to store receivers, format:zkAddress/path/to/receivers + * eg. localhost:2181/spark/receivers + * @param numReceivers the number of receivers to be started + * @param storageLevel Storage level to use for storing the received objects + */ + def createMultiStream( + ssc: StreamingContext, + receiverPath: String, + numReceivers : Int, + storageLevel : StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + enableDecompression : Boolean = false + ): DStream[SparkFlumeEvent] = { + var inputStream : DStream[SparkFlumeEvent] = new FlumeInputDStream[SparkFlumeEvent](ssc, + receiverPath, storageLevel, enableDecompression) + for (i <- 1 until numReceivers) { + inputStream = inputStream.union(new FlumeInputDStream[SparkFlumeEvent](ssc, receiverPath, + storageLevel, enableDecompression)) + } + inputStream + } }