Skip to content

Commit f471651

Browse files
committed
tuplesPerSecond -> rowsPerSecond
1 parent 240c27b commit f471651

File tree

2 files changed

+39
-39
lines changed

2 files changed

+39
-39
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ import org.apache.spark.util.{ManualClock, SystemClock}
3838
* with 0L.
3939
*
4040
* This source supports the following options:
41-
* - `tuplesPerSecond` (e.g. 100, default: 1): How many tuples should be generated per second.
41+
* - `rowsPerSecond` (e.g. 100, default: 1): How many tuples should be generated per second.
4242
* - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed
43-
* becomes `tuplesPerSecond`. Using finer granularities than seconds will be truncated to integer
43+
* becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer
4444
* seconds.
4545
* - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the
46-
* generated tuples. The source will try its best to reach `tuplesPerSecond`, but the query may
46+
* generated tuples. The source will try its best to reach `rowsPerSecond`, but the query may
4747
* be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed.
4848
*/
4949
class RateSourceProvider extends StreamSourceProvider with DataSourceRegister {
@@ -63,10 +63,10 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister {
6363
parameters: Map[String, String]): Source = {
6464
val params = CaseInsensitiveMap(parameters)
6565

66-
val tuplesPerSecond = params.get("tuplesPerSecond").map(_.toLong).getOrElse(1L)
67-
if (tuplesPerSecond <= 0) {
66+
val rowsPerSecond = params.get("rowsPerSecond").map(_.toLong).getOrElse(1L)
67+
if (rowsPerSecond <= 0) {
6868
throw new IllegalArgumentException(
69-
s"Invalid value '${params("tuplesPerSecond")}'. The option 'tuplesPerSecond' " +
69+
s"Invalid value '${params("rowsPerSecond")}'. The option 'rowsPerSecond' " +
7070
"must be positive")
7171
}
7272

@@ -89,7 +89,7 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister {
8989
new RateStreamSource(
9090
sqlContext,
9191
metadataPath,
92-
tuplesPerSecond,
92+
rowsPerSecond,
9393
rampUpTimeSeconds,
9494
numPartitions,
9595
params.get("useManualClock").map(_.toBoolean).getOrElse(false) // Only for testing
@@ -108,7 +108,7 @@ object RateSourceProvider {
108108
class RateStreamSource(
109109
sqlContext: SQLContext,
110110
metadataPath: String,
111-
tuplesPerSecond: Long,
111+
rowsPerSecond: Long,
112112
rampUpTimeSeconds: Long,
113113
numPartitions: Int,
114114
useManualClock: Boolean) extends Source with Logging {
@@ -118,11 +118,11 @@ class RateStreamSource(
118118

119119
val clock = if (useManualClock) new ManualClock else new SystemClock
120120

121-
private val maxSeconds = Long.MaxValue / tuplesPerSecond
121+
private val maxSeconds = Long.MaxValue / rowsPerSecond
122122

123123
if (rampUpTimeSeconds > maxSeconds) {
124124
throw new ArithmeticException(
125-
s"Integer overflow. Max offset with $tuplesPerSecond tuplesPerSecond" +
125+
s"Integer overflow. Max offset with $rowsPerSecond rowsPerSecond" +
126126
s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
127127
}
128128

@@ -183,14 +183,14 @@ class RateStreamSource(
183183
assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")
184184
if (endSeconds > maxSeconds) {
185185
throw new ArithmeticException("Integer overflow. Max offset with " +
186-
s"$tuplesPerSecond tuplesPerSecond is $maxSeconds, but it's $endSeconds now.")
186+
s"$rowsPerSecond rowsPerSecond is $maxSeconds, but it's $endSeconds now.")
187187
}
188188
// Fix "lastTimeMs" for recovery
189189
if (lastTimeMs < TimeUnit.SECONDS.toMillis(endSeconds) + startTimeMs) {
190190
lastTimeMs = TimeUnit.SECONDS.toMillis(endSeconds) + startTimeMs
191191
}
192-
val rangeStart = valueAtSecond(startSeconds, tuplesPerSecond, rampUpTimeSeconds)
193-
val rangeEnd = valueAtSecond(endSeconds, tuplesPerSecond, rampUpTimeSeconds)
192+
val rangeStart = valueAtSecond(startSeconds, rowsPerSecond, rampUpTimeSeconds)
193+
val rangeEnd = valueAtSecond(endSeconds, rowsPerSecond, rampUpTimeSeconds)
194194
logDebug(s"startSeconds: $startSeconds, endSeconds: $endSeconds, " +
195195
s"rangeStart: $rangeStart, rangeEnd: $rangeEnd")
196196

@@ -254,14 +254,14 @@ class RateStreamSource(
254254
object RateStreamSource {
255255

256256
/** Calculate the end value we will emit at the time `seconds`. */
257-
def valueAtSecond(seconds: Long, tuplesPerSecond: Long, rampUpTimeSeconds: Long): Long = {
258-
// E.g., rampUpTimeSeconds = 4, tuplesPerSecond = 10
257+
def valueAtSecond(seconds: Long, rowsPerSecond: Long, rampUpTimeSeconds: Long): Long = {
258+
// E.g., rampUpTimeSeconds = 4, rowsPerSecond = 10
259259
// Then speedDeltaPerSecond = 2
260260
//
261261
// seconds = 0 1 2 3 4 5 6
262262
// speed = 0 2 4 6 8 10 10 (speedDeltaPerSecond * seconds)
263263
// end value = 0 2 6 12 20 30 40 (0 + speedDeltaPerSecond * seconds) * (seconds + 1) / 2
264-
val speedDeltaPerSecond = tuplesPerSecond / (rampUpTimeSeconds + 1)
264+
val speedDeltaPerSecond = rowsPerSecond / (rampUpTimeSeconds + 1)
265265
if (seconds <= rampUpTimeSeconds) {
266266
// Calculate "(0 + speedDeltaPerSecond * seconds) * (seconds + 1) / 2" in a special way to
267267
// avoid overflow
@@ -272,8 +272,8 @@ object RateStreamSource {
272272
}
273273
} else {
274274
// rampUpPart is just a special case of the above formula: rampUpTimeSeconds == seconds
275-
val rampUpPart = valueAtSecond(rampUpTimeSeconds, tuplesPerSecond, rampUpTimeSeconds)
276-
rampUpPart + (seconds - rampUpTimeSeconds) * tuplesPerSecond
275+
val rampUpPart = valueAtSecond(rampUpTimeSeconds, rowsPerSecond, rampUpTimeSeconds)
276+
rampUpPart + (seconds - rampUpTimeSeconds) * rowsPerSecond
277277
}
278278
}
279279
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class RateSourceSuite extends StreamTest {
4242
test("basic") {
4343
val input = spark.readStream
4444
.format("rate")
45-
.option("tuplesPerSecond", "10")
45+
.option("rowsPerSecond", "10")
4646
.option("useManualClock", "true")
4747
.load()
4848
testStream(input)(
@@ -56,10 +56,10 @@ class RateSourceSuite extends StreamTest {
5656
)
5757
}
5858

59-
test("uniform distribution of event timestamps: tuplesPerSecond > 1000") {
59+
test("uniform distribution of event timestamps: rowsPerSecond > 1000") {
6060
val input = spark.readStream
6161
.format("rate")
62-
.option("tuplesPerSecond", "1500")
62+
.option("rowsPerSecond", "1500")
6363
.option("useManualClock", "true")
6464
.load()
6565
.as[(java.sql.Timestamp, Long)]
@@ -73,10 +73,10 @@ class RateSourceSuite extends StreamTest {
7373
)
7474
}
7575

76-
test("uniform distribution of event timestamps: tuplesPerSecond < 1000") {
76+
test("uniform distribution of event timestamps: rowsPerSecond < 1000") {
7777
val input = spark.readStream
7878
.format("rate")
79-
.option("tuplesPerSecond", "400")
79+
.option("rowsPerSecond", "400")
8080
.option("useManualClock", "true")
8181
.load()
8282
.as[(java.sql.Timestamp, Long)]
@@ -92,23 +92,23 @@ class RateSourceSuite extends StreamTest {
9292
test("valueAtSecond") {
9393
import RateStreamSource._
9494

95-
assert(valueAtSecond(seconds = 0, tuplesPerSecond = 5, rampUpTimeSeconds = 2) === 0)
96-
assert(valueAtSecond(seconds = 1, tuplesPerSecond = 5, rampUpTimeSeconds = 2) === 1)
97-
assert(valueAtSecond(seconds = 2, tuplesPerSecond = 5, rampUpTimeSeconds = 2) === 3)
98-
assert(valueAtSecond(seconds = 3, tuplesPerSecond = 5, rampUpTimeSeconds = 2) === 8)
99-
100-
assert(valueAtSecond(seconds = 0, tuplesPerSecond = 10, rampUpTimeSeconds = 4) === 0)
101-
assert(valueAtSecond(seconds = 1, tuplesPerSecond = 10, rampUpTimeSeconds = 4) === 2)
102-
assert(valueAtSecond(seconds = 2, tuplesPerSecond = 10, rampUpTimeSeconds = 4) === 6)
103-
assert(valueAtSecond(seconds = 3, tuplesPerSecond = 10, rampUpTimeSeconds = 4) === 12)
104-
assert(valueAtSecond(seconds = 4, tuplesPerSecond = 10, rampUpTimeSeconds = 4) === 20)
105-
assert(valueAtSecond(seconds = 5, tuplesPerSecond = 10, rampUpTimeSeconds = 4) === 30)
95+
assert(valueAtSecond(seconds = 0, rowsPerSecond = 5, rampUpTimeSeconds = 2) === 0)
96+
assert(valueAtSecond(seconds = 1, rowsPerSecond = 5, rampUpTimeSeconds = 2) === 1)
97+
assert(valueAtSecond(seconds = 2, rowsPerSecond = 5, rampUpTimeSeconds = 2) === 3)
98+
assert(valueAtSecond(seconds = 3, rowsPerSecond = 5, rampUpTimeSeconds = 2) === 8)
99+
100+
assert(valueAtSecond(seconds = 0, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 0)
101+
assert(valueAtSecond(seconds = 1, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 2)
102+
assert(valueAtSecond(seconds = 2, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 6)
103+
assert(valueAtSecond(seconds = 3, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 12)
104+
assert(valueAtSecond(seconds = 4, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 20)
105+
assert(valueAtSecond(seconds = 5, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 30)
106106
}
107107

108108
test("rampUpTime") {
109109
val input = spark.readStream
110110
.format("rate")
111-
.option("tuplesPerSecond", "10")
111+
.option("rowsPerSecond", "10")
112112
.option("rampUpTime", "4s")
113113
.option("useManualClock", "true")
114114
.load()
@@ -138,7 +138,7 @@ class RateSourceSuite extends StreamTest {
138138
test("numPartitions") {
139139
val input = spark.readStream
140140
.format("rate")
141-
.option("tuplesPerSecond", "10")
141+
.option("rowsPerSecond", "10")
142142
.option("numPartitions", "6")
143143
.option("useManualClock", "true")
144144
.load()
@@ -153,15 +153,15 @@ class RateSourceSuite extends StreamTest {
153153
testQuietly("overflow") {
154154
val input = spark.readStream
155155
.format("rate")
156-
.option("tuplesPerSecond", Long.MaxValue.toString)
156+
.option("rowsPerSecond", Long.MaxValue.toString)
157157
.option("useManualClock", "true")
158158
.load()
159159
.select(spark_partition_id())
160160
.distinct()
161161
testStream(input)(
162162
AdvanceRateManualClock(2),
163163
ExpectFailure[ArithmeticException](t => {
164-
Seq("overflow", "tuplesPerSecond").foreach { msg =>
164+
Seq("overflow", "rowsPerSecond").foreach { msg =>
165165
assert(t.getMessage.contains(msg))
166166
}
167167
})
@@ -189,7 +189,7 @@ class RateSourceSuite extends StreamTest {
189189
}
190190
}
191191

192-
testIllegalOptionValue("tuplesPerSecond", "-1", Seq("-1", "tuplesPerSecond", "positive"))
192+
testIllegalOptionValue("rowsPerSecond", "-1", Seq("-1", "rowsPerSecond", "positive"))
193193
testIllegalOptionValue("numPartitions", "-1", Seq("-1", "numPartitions", "positive"))
194194
}
195195
}

0 commit comments

Comments
 (0)