Skip to content

Conversation

@JoshRosen
Copy link
Contributor

This patch refactors the internals of the JDBC data source in order to allow some of its code to be re-used in an automated comparison testing harness. Here are the key changes:

  • Move the JDBC ResultSetMetadata to StructType conversion logic from JDBCRDD.resolveTable() to the JdbcUtils object (as a new getSchema(ResultSet, JdbcDialect) method), allowing it to be applied on ResultSets that are created elsewhere.
  • Move the ResultSet to InternalRow conversion methods from JDBCRDD to JdbcUtils:
    • It makes sense to move the JDBCValueGetter type and makeGetter functions here given that their write-path counterparts (JDBCValueSetter) are already in JdbcUtils.
    • Add an internal resultSetToSparkInternalRows method which takes a ResultSet and schema and returns an Iterator[InternalRow]. This effectively extracts the main loop of JDBCRDD into its own method.
    • Add a public resultSetToRows method to JdbcUtils, which wraps the minimal machinery around resultSetToSparkInternalRows in order to allow it to be called from outside of a Spark job.
  • Make JdbcDialect.get into a DeveloperApi (JdbcDialect itself is already a DeveloperApi).

Put together, these changes enable the following testing pattern:

val jdbResultSet: ResultSet = conn.prepareStatement(query).executeQuery()
val resultSchema: StructType = JdbcUtils.getSchema(jdbResultSet, JdbcDialects.get("jdbc:postgresql"))
val jdbcRows: Seq[Row] = JdbcUtils.resultSetToRows(jdbResultSet, schema).toSeq
checkAnswer(sparkResult, jdbcRows) // in a test case

*/
override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] =
new Iterator[InternalRow] {
override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While most of the changes in this block stem from moving the inner loop into a JdbcUtils method, there are one or two non-trivial changes that may impact cleanup in error situations. I'll comment on these changes below in order to help walk through them.

@SparkQA
Copy link

SparkQA commented Sep 1, 2016

Test build #64745 has finished for PR 14907 at commit 43cbef6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Please merge #14911 ahead of this so that I can bring this up-to-date with that change. Merging in this order reduces the amount of work to backport #14911.

@JoshRosen
Copy link
Contributor Author

I've merged #14911 so this should now be ready for review.

case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.simpleString}")
}

private def nullSafeConvert[T](input: T, f: T => Any): Any = {
Copy link
Contributor

@srinathshankar srinathshankar Sep 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be
private def nullSafeConvert[T, U](input: T, f: T => U): U

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is carried over from the old code. Good catch.

Changing this signature might actually improve performance in the ArrayType case because Scala should be able to statically determine that it can allocate primitive arrays once it knows the return type of nullSafeConvert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't think that we can do this because I don't think that there's a way to set an upper type bound to say that U must be a nullable object, so you have to do a dangerous null.asInstanceOf cast. And since we're not working with primitives here there's no savings on boxing, etc. Therefore, I'd prefer to just leave this unchanged since it's a carryover from the old code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the U >: Null bound right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, forgot about using lower type bounds for this.

@SparkQA
Copy link

SparkQA commented Sep 2, 2016

Test build #64816 has finished for PR 14907 at commit f09174b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

LGTM. Merging to master. Thanks!

@asfgit asfgit closed this in 6bcbf9b Sep 2, 2016
@JoshRosen JoshRosen deleted the modularize-jdbc-internals branch September 2, 2016 17:39
@gatorsmile
Copy link
Member

We exposed JdbcUtils.resultSetToRows, but forgot to add a test case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants