diff --git a/sql/core/pom.xml b/sql/core/pom.xml index b2752638bebd5..7ccae20f993d9 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -126,6 +126,12 @@ org.apache.xbean xbean-asm5-shaded test + + + com.sparkjava + spark-core + 2.5 + test diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 27d32b5dca431..48a61c3f6745d 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -5,3 +5,4 @@ org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.TextSocketSourceProvider +org.apache.spark.sql.execution.streaming.HttpStreamSink diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala new file mode 100644 index 0000000000000..d9e8f642a2149 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HttpStreamSink.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{BufferedReader, InputStreamReader, PrintWriter} +import java.net.{URL, URLConnection} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql._ +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.{StringType, StructType} + + +class HttpStreamSink extends StreamSinkProvider with DataSourceRegister{ + override def createSink( + sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + if (!parameters.contains("url")) { + throw new AnalysisException("Http url should be set: .option(\"url\", \"...\").") + } + new HttpSink(parameters) + } + + override def shortName(): String = "http" +} + +/** + * A sink that outputs streaming query results through sending http post request. Each [[Row]] + * in batch will be post to a http url. + * Each [[Row]] in batch must only have one single column, and the column type should be + * [[StringType]]. + */ +class HttpSink(options: Map[String, String]) extends Sink with Logging { + override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { + verifySchema(data.schema) + data.collect().foreach(dataSeq => { + post(dataSeq.get(0).toString) + }) + } + private def verifySchema(schema: StructType): Unit = { + if (schema.size != 1) { + throw new AnalysisException( + s"Http data sink supports only a single column, and you have ${schema.size} columns.") + } + val tpe = schema(0).dataType + if (tpe != StringType) { + throw new AnalysisException( + s"Http data sink supports only a string column, but you have ${tpe.simpleString}.") + } + } + private def post(data: String): Unit = { + val url: URL = new URL(options.get("url").get) + val connection: URLConnection = url.openConnection + connection.setDoInput(true) + connection.setDoOutput(true) + val writer = new PrintWriter(connection.getOutputStream) + try { + writer.print(data) + writer.flush() + } catch { + case cause: Throwable => logError("Post http request error: ", cause) + } finally { + writer.close() + } + val reader = new BufferedReader(new InputStreamReader(connection.getInputStream)) + try { + val it = reader.lines().iterator() + var lines: String = "" + while (it.hasNext()) { + lines += it.next() + } + logTrace(s"Http request post result: ${lines}.") + } catch { + case cause: Throwable => logError("Read http result error: ", cause) + } finally { + reader.close() + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala new file mode 100644 index 0000000000000..90ceca7d441a7 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HttpStreamSinkSuite.scala @@ -0,0 +1,35 @@ +package org.apache.spark.sql.execution.streaming + +import spark.{Response, Request, Route, Spark} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.scalatest.BeforeAndAfter + +class HttpStreamSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter{ + import testImplicits._ + after { + sqlContext.streams.active.foreach(_.stop()) + } + test("http sink"){ + var output: String = "" + Spark.port(3775) + Spark.get("/welcome/:vistor", new Route{ + override def handle(req: Request, resp: Response) : Object = { + val name: String = req.params(":vistor") + output = name + return s"welcome $name" + } + }) + val input = MemoryStream[String] + val query = input.toDF().writeStream + .outputMode("complete") + .format("http") + .option("url", "http://localhost:3775/welcome") + .start() + input.addData("Jerry") + CheckAnswer(Row(output)) + query.awaitTermination() + } +}