1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+
18+ package org .apache .spark .sql .execution .streaming
19+
20+ import org .apache .spark .internal .Logging
21+ import org .apache .spark .sql .{DataFrame , SQLContext , SparkSession }
22+ import org .apache .spark .sql .sources .{DataSourceRegister , StreamSourceProvider }
23+ import org .apache .spark .sql .types ._
24+
25+ class TimeStreamProvider extends StreamSourceProvider with DataSourceRegister {
26+ /** Returns the name and schema of the source that can be used to continually read data. */
27+ override def sourceSchema (
28+ sqlContext : SQLContext ,
29+ schema : Option [StructType ],
30+ providerName : String ,
31+ parameters : Map [String , String ]): (String , StructType ) =
32+ (" time" , StructType (StructField (" timestamp" , LongType ) ::
33+ StructField (" value" , LongType ) :: Nil ))
34+
35+ override def createSource (
36+ sqlContext : SQLContext ,
37+ metadataPath : String ,
38+ schema : Option [StructType ],
39+ providerName : String ,
40+ parameters : Map [String , String ]): Source = {
41+ new TimeStreamSource (
42+ sqlContext.sparkSession,
43+ parameters.get(" tuplesPerSecond" ).map(_.toLong).getOrElse(1L ),
44+ parameters.get(" rampUpTimeSeconds" ).map(_.toLong).getOrElse(10L ),
45+ parameters.get(" wiggleRatio" ).map(_.toDouble).getOrElse(0.01 ))
46+ }
47+
48+ /**
49+ * The string that represents the format that this data source provider uses. This is
50+ * overridden by children to provide a nice alias for the data source. For example:
51+ *
52+ * {{{
53+ * override def shortName(): String = "parquet"
54+ * }}}
55+ *
56+ * @since 1.5.0
57+ */
58+ override def shortName (): String = " time"
59+ }
60+
61+ class TimeStreamSource (
62+ spark : SparkSession ,
63+ tuplesPerSecond : Long ,
64+ rampUpTimeSeconds : Long ,
65+ wiggleRatio : Double ) extends Source with Logging {
66+ val startTime = System .currentTimeMillis()
67+ var lastTime = startTime
68+ import spark .implicits ._
69+
70+ /** Returns the schema of the data from this source */
71+ override def schema : StructType = StructType (StructField (" timestamp" , LongType ) ::
72+ StructField (" value" , LongType ) :: Nil )
73+
74+ /** Returns the maximum available offset for this source. */
75+ override def getOffset : Option [Offset ] = {
76+ val curTime = System .currentTimeMillis()
77+ if (curTime > lastTime) {
78+ lastTime = curTime
79+ }
80+ Some (LongOffset ((curTime - startTime) / 1000 ))
81+ }
82+
83+ private def addRateWiggle (second : Long ): Long = {
84+ var wiggled : Long = (((second % 4 ) - 1 ) * tuplesPerSecond * wiggleRatio + second).toLong
85+ if (wiggled < 0 ) {
86+ wiggled = second
87+ }
88+ logError(s " \n\n wiggleBefore: $second - wiggleAfter: $wiggled\n\n " )
89+ wiggled
90+ }
91+
92+ /**
93+ * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
94+ * the batch should begin with the first available record. This method must always return the
95+ * same data for a particular `start` and `end` pair.
96+ */
97+ override def getBatch (start : Option [Offset ], end : Offset ): DataFrame = {
98+ val startSeconds = start.map(_.asInstanceOf [LongOffset ].offset).getOrElse(0L )
99+ val endSeconds = end.asInstanceOf [LongOffset ].offset
100+ val (rangeStart, rangeEnd) = if (rampUpTimeSeconds > endSeconds) {
101+ (math.rint(tuplesPerSecond * (startSeconds * 1.0 / rampUpTimeSeconds)).toLong * startSeconds,
102+ math.rint(tuplesPerSecond * (endSeconds * 1.0 / rampUpTimeSeconds)).toLong * endSeconds)
103+ } else if (startSeconds < rampUpTimeSeconds) {
104+ (math.rint(tuplesPerSecond * (startSeconds * 1.0 / rampUpTimeSeconds)).toLong * startSeconds,
105+ addRateWiggle(endSeconds * tuplesPerSecond))
106+ } else {
107+ (addRateWiggle(startSeconds * tuplesPerSecond),
108+ addRateWiggle(endSeconds * tuplesPerSecond))
109+ }
110+ logError(s " \n\n rangeStart: $rangeStart - rangeEnd: $rangeEnd - \n\n " )
111+ val localStartTime = startTime
112+ val localPerSecond = tuplesPerSecond
113+
114+ spark.range(rangeStart, rangeEnd, 1 , 1 ).map { v =>
115+ val relative = v + (1 << (v % 4 ))
116+ (relative / localPerSecond + localStartTime / 1000 , relative)
117+ }.toDF(" timestamp" , " value" )
118+ }
119+
120+ /** Stop this source and free any resources it has allocated. */
121+ override def stop (): Unit = {}
122+ }
0 commit comments