From 93371d8967916c8432198426ba281ec56c07c532 Mon Sep 17 00:00:00 2001 From: zhangxinyu1 <342689740@qq.com> Date: Thu, 22 Sep 2016 17:41:32 +0800 Subject: [PATCH 1/3] Add HttpStreamSink. Streaming query results can be sinked to http server --- ...pache.spark.sql.sources.DataSourceRegister | 1 + .../execution/streaming/HttpStreamSink.scala | 105 ++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 27d32b5dca431..48a61c3f6745d 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -5,3 +5,4 @@ org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.TextSocketSourceProvider +org.apache.spark.sql.execution.streaming.HttpStreamSink diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala new file mode 100644 index 0000000000000..8d8fdc75541bc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{BufferedReader, InputStreamReader, PrintWriter} +import java.net.{UnknownHostException, URL, URLConnection} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql._ +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.util.Utils + +class HttpSink(options: Map[String, String]) extends Sink with Logging { + override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { + val dataFormat: HttpDataFormat = { + val className = options.getOrElse("format.class", + "org.apache.spark.sql.execution.streaming.HttpDataToStringDefault") + createObject[HttpDataFormat](className) + } + data.collect().foreach(dataSeq => { + post(dataFormat.format(dataSeq.toSeq)) + }) + } + + private def createObject[T<:AnyRef](className: String, args: AnyRef*): T = { + val klass = Utils.classForName(className).asInstanceOf[Class[T]] + val constructor = klass.getConstructor(args.map(_.getClass): _*) + constructor.newInstance(args: _*) + } + + private def post(param: String): Unit = { + val url: URL = new URL(options.get("url").get) + val connection: URLConnection = url.openConnection + connection.setDoInput(true) + connection.setDoOutput(true) + val writer = new PrintWriter(connection.getOutputStream) + try { + writer.print(param) + writer.flush() + } catch { + case cause: Throwable => { + logError("Post http request error: ", cause) + } + } finally { + writer.close() + } + val reader = new BufferedReader(new InputStreamReader(connection.getInputStream)) + try { + val it = reader.lines().iterator() + var lines: String = "" + while (it.hasNext()) { + lines += it.next() + } + logTrace("Http request post result: " + lines) + } catch { + case cause: Throwable => { + logError("Read http result error: ", cause) + } + } finally { + reader.close() + } + } +} + +trait HttpDataFormat{ + def format(data: Seq[Any]): String +} + +class HttpDataToStringDefault extends HttpDataFormat { + def format(data: Seq[Any]) : String = { + return data.mkString(", ") + } +} + +class HttpStreamSink extends StreamSinkProvider with DataSourceRegister{ + def createSink( + sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + if (!parameters.contains("url")) { + throw new AnalysisException("Http url should be set: .option(\"url\", \"...\").") + } + new HttpSink(parameters) + } + + def shortName(): String = "http" +} + From d046e85fb6d0d68259abdf123032c304926d159f Mon Sep 17 00:00:00 2001 From: zhangxinyu1 <342689740@qq.com> Date: Mon, 26 Sep 2016 18:54:51 +0800 Subject: [PATCH 2/3] 1. Replace trait HttpDataFormat by verifying that output dataFrame must hava a single string column. 2. Add HttpStreamSinkSuite to test --- sql/core/pom.xml | 6 ++ .../execution/streaming/HttpStreamSink.scala | 90 +++++++++---------- .../streaming/HttpStreamSinkSuite.scala | 38 ++++++++ 3 files changed, 85 insertions(+), 49 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala diff --git a/sql/core/pom.xml b/sql/core/pom.xml index b2752638bebd5..7ccae20f993d9 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -126,6 +126,12 @@ org.apache.xbean xbean-asm5-shaded test + + + com.sparkjava + spark-core + 2.5 + test diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala index 8d8fdc75541bc..d9e8f642a2149 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala @@ -18,45 +18,65 @@ package org.apache.spark.sql.execution.streaming import java.io.{BufferedReader, InputStreamReader, PrintWriter} -import java.net.{UnknownHostException, URL, URLConnection} +import java.net.{URL, URLConnection} import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.util.Utils +import org.apache.spark.sql.types.{StringType, StructType} + +class HttpStreamSink extends StreamSinkProvider with DataSourceRegister{ + override def createSink( + sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + if (!parameters.contains("url")) { + throw new AnalysisException("Http url should be set: .option(\"url\", \"...\").") + } + new HttpSink(parameters) + } + + override def shortName(): String = "http" +} + +/** + * A sink that outputs streaming query results through sending http post request. Each [[Row]] + * in batch will be post to a http url. + * Each [[Row]] in batch must only have one single column, and the column type should be + * [[StringType]]. + */ class HttpSink(options: Map[String, String]) extends Sink with Logging { override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { - val dataFormat: HttpDataFormat = { - val className = options.getOrElse("format.class", - "org.apache.spark.sql.execution.streaming.HttpDataToStringDefault") - createObject[HttpDataFormat](className) - } + verifySchema(data.schema) data.collect().foreach(dataSeq => { - post(dataFormat.format(dataSeq.toSeq)) + post(dataSeq.get(0).toString) }) } - - private def createObject[T<:AnyRef](className: String, args: AnyRef*): T = { - val klass = Utils.classForName(className).asInstanceOf[Class[T]] - val constructor = klass.getConstructor(args.map(_.getClass): _*) - constructor.newInstance(args: _*) + private def verifySchema(schema: StructType): Unit = { + if (schema.size != 1) { + throw new AnalysisException( + s"Http data sink supports only a single column, and you have ${schema.size} columns.") + } + val tpe = schema(0).dataType + if (tpe != StringType) { + throw new AnalysisException( + s"Http data sink supports only a string column, but you have ${tpe.simpleString}.") + } } - - private def post(param: String): Unit = { + private def post(data: String): Unit = { val url: URL = new URL(options.get("url").get) val connection: URLConnection = url.openConnection connection.setDoInput(true) connection.setDoOutput(true) val writer = new PrintWriter(connection.getOutputStream) try { - writer.print(param) + writer.print(data) writer.flush() } catch { - case cause: Throwable => { - logError("Post http request error: ", cause) - } + case cause: Throwable => logError("Post http request error: ", cause) } finally { writer.close() } @@ -67,39 +87,11 @@ class HttpSink(options: Map[String, String]) extends Sink with Logging { while (it.hasNext()) { lines += it.next() } - logTrace("Http request post result: " + lines) + logTrace(s"Http request post result: ${lines}.") } catch { - case cause: Throwable => { - logError("Read http result error: ", cause) - } + case cause: Throwable => logError("Read http result error: ", cause) } finally { reader.close() } } } - -trait HttpDataFormat{ - def format(data: Seq[Any]): String -} - -class HttpDataToStringDefault extends HttpDataFormat { - def format(data: Seq[Any]) : String = { - return data.mkString(", ") - } -} - -class HttpStreamSink extends StreamSinkProvider with DataSourceRegister{ - def createSink( - sqlContext: SQLContext, - parameters: Map[String, String], - partitionColumns: Seq[String], - outputMode: OutputMode): Sink = { - if (!parameters.contains("url")) { - throw new AnalysisException("Http url should be set: .option(\"url\", \"...\").") - } - new HttpSink(parameters) - } - - def shortName(): String = "http" -} - diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala new file mode 100644 index 0000000000000..edd21f4cdcc8b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala @@ -0,0 +1,38 @@ +package org.apache.spark.sql.execution.streaming + +import java.util + +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{StructField, StructType, StringType} +import spark.{Route, Spark, Request, Response} +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.scalatest.BeforeAndAfter + +class HttpStreamSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter{ + import testImplicits._ + after { + sqlContext.streams.active.foreach(_.stop()) + } + test("http sink"){ + var output: String = "" + Spark.port(3775) + Spark.get("/welcome/:vistor", new Route{ + override def handle(req: Request, resp: Response) : Object = { + val name: String = req.params(":vistor") + output = name + return s"welcome $name" + } + }) + val input = MemoryStream[String] + val query = input.toDF().writeStream + .outputMode("complete") + .format("http") + .option("url", "http://localhost:3775/welcome") + .start() + input.addData("Jerry") + CheckAnswer(Row(output)) + query.awaitTermination() + } +} From be748a1f74cae7464462c52594bc00ed14d37f98 Mon Sep 17 00:00:00 2001 From: zhangxinyu1 <342689740@qq.com> Date: Mon, 26 Sep 2016 19:05:27 +0800 Subject: [PATCH 3/3] modify code style of HttpStreamSinkSuite --- .../spark/sql/execution/streaming/HttpStreamSinkSuite.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala index edd21f4cdcc8b..90ceca7d441a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala @@ -1,11 +1,8 @@ package org.apache.spark.sql.execution.streaming -import java.util +import spark.{Response, Request, Route, Spark} import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{StructField, StructType, StringType} -import spark.{Route, Spark, Request, Response} -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.test.SharedSQLContext import org.scalatest.BeforeAndAfter