Skip to content

Commit 7683982

Browse files
hotousrowen
authored andcommitted
[SPARK-5860][CORE] JdbcRDD: overflow on large range with high number of partitions
Fix a overflow bug in JdbcRDD when calculating partitions for large BIGINT ids Author: Evan Yu <[email protected]> Closes apache#4701 from hotou/SPARK-5860 and squashes the following commits: 9e038d1 [Evan Yu] [SPARK-5860][CORE] Prevent overflowing at the length level 7883ad9 [Evan Yu] [SPARK-5860][CORE] Prevent overflowing at the length level c88755a [Evan Yu] [SPARK-5860][CORE] switch to BigInt instead of BigDecimal 4e9ff4f [Evan Yu] [SPARK-5860][CORE] JdbcRDD overflow on large range with high number of partitions
1 parent 7138816 commit 7683982

File tree

2 files changed

+50
-18
lines changed

2 files changed

+50
-18
lines changed

core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ class JdbcRDD[T: ClassTag](
6262

6363
override def getPartitions: Array[Partition] = {
6464
// bounds are inclusive, hence the + 1 here and - 1 on end
65-
val length = 1 + upperBound - lowerBound
65+
val length = BigInt(1) + upperBound - lowerBound
6666
(0 until numPartitions).map(i => {
67-
val start = lowerBound + ((i * length) / numPartitions).toLong
68-
val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
69-
new JdbcPartition(i, start, end)
67+
val start = lowerBound + ((i * length) / numPartitions)
68+
val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
69+
new JdbcPartition(i, start.toLong, end.toLong)
7070
}).toArray
7171
}
7272

core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,42 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
2929
Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
3030
val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
3131
try {
32-
val create = conn.createStatement
33-
create.execute("""
34-
CREATE TABLE FOO(
35-
ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
36-
DATA INTEGER
37-
)""")
38-
create.close()
39-
val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
40-
(1 to 100).foreach { i =>
41-
insert.setInt(1, i * 2)
42-
insert.executeUpdate
32+
33+
try {
34+
val create = conn.createStatement
35+
create.execute("""
36+
CREATE TABLE FOO(
37+
ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
38+
DATA INTEGER
39+
)""")
40+
create.close()
41+
val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
42+
(1 to 100).foreach { i =>
43+
insert.setInt(1, i * 2)
44+
insert.executeUpdate
45+
}
46+
insert.close()
47+
} catch {
48+
case e: SQLException if e.getSQLState == "X0Y32" =>
49+
// table exists
4350
}
44-
insert.close()
45-
} catch {
46-
case e: SQLException if e.getSQLState == "X0Y32" =>
51+
52+
try {
53+
val create = conn.createStatement
54+
create.execute("CREATE TABLE BIGINT_TEST(ID BIGINT NOT NULL, DATA INTEGER)")
55+
create.close()
56+
val insert = conn.prepareStatement("INSERT INTO BIGINT_TEST VALUES(?,?)")
57+
(1 to 100).foreach { i =>
58+
insert.setLong(1, 100000000000000000L + 4000000000000000L * i)
59+
insert.setInt(2, i)
60+
insert.executeUpdate
61+
}
62+
insert.close()
63+
} catch {
64+
case e: SQLException if e.getSQLState == "X0Y32" =>
4765
// table exists
66+
}
67+
4868
} finally {
4969
conn.close()
5070
}
@@ -62,6 +82,18 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
6282
assert(rdd.count === 100)
6383
assert(rdd.reduce(_+_) === 10100)
6484
}
85+
86+
test("large id overflow") {
87+
sc = new SparkContext("local", "test")
88+
val rdd = new JdbcRDD(
89+
sc,
90+
() => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
91+
"SELECT DATA FROM BIGINT_TEST WHERE ? <= ID AND ID <= ?",
92+
1131544775L, 567279358897692673L, 20,
93+
(r: ResultSet) => { r.getInt(1) } ).cache()
94+
assert(rdd.count === 100)
95+
assert(rdd.reduce(_+_) === 5050)
96+
}
6597

6698
after {
6799
try {

0 commit comments

Comments
 (0)