From 2e08c659f72f1b70cd38e5212cefbe960a4e94c8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 26 Aug 2015 00:31:04 -0700 Subject: [PATCH 1/5] First cut at supporting configurable VARCHAR column lengths --- .../redshift/RedshiftIntegrationSuite.scala | 39 ++++++++++++++++++- .../spark/redshift/RedshiftJDBCWrapper.scala | 8 +++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala index fdc1129c..f5d269ff 100644 --- a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala +++ b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala @@ -17,7 +17,7 @@ package com.databricks.spark.redshift import java.net.URI -import java.sql.Connection +import java.sql.{SQLException, Connection} import java.util.Properties import scala.util.Random @@ -373,6 +373,43 @@ class RedshiftIntegrationSuite } } + test("configuring maxlength on string columns") { + val tableName = s"configuring_maxlength_on_string_column_$randomSuffix" + try { + val metadata = new MetadataBuilder().putLong("maxlength", 512).build() + val schema = StructType( + StructField("x", StringType, metadata = metadata) :: Nil) + sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))), schema).write + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .option("tempdir", tempDir) + .mode(SaveMode.ErrorIfExists) + .save() + assert(DefaultJDBCWrapper.tableExists(conn, tableName)) + val loadedDf = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .option("tempdir", tempDir) + .load() + checkAnswer(loadedDf, Seq(Row("a" * 512))) + // This append should fail due to the string being longer than the maxlength + intercept[SQLException] { + sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 513))), schema).write + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .option("tempdir", tempDir) + .mode(SaveMode.Append) + .save() + } + } finally { + conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() + conn.commit() + } + } + test("SaveMode.Overwrite with non-existent table") { val tableName = s"overwrite_non_existent_table$randomSuffix" try { diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala index b3ffc267..0ab9b8d9 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala @@ -141,7 +141,13 @@ private[redshift] class JDBCWrapper extends Logging { case ShortType => "INTEGER" case ByteType => "SMALLINT" // Redshift does not support the BYTE type. case BooleanType => "BOOLEAN" - case StringType => "TEXT" + case StringType => + val maxlength = if (field.metadata.contains("maxlength")) { + field.metadata.getLong("maxlength") + } else { + 255 // TODO: make this default configurable? + } + s"VARCHAR($maxlength)" case BinaryType => "BLOB" case TimestampType => "TIMESTAMP" case DateType => "DATE" From 326a4c75229dd92f281830c0a366ba0867d7d515 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 26 Aug 2015 15:36:52 -0700 Subject: [PATCH 2/5] Add more missing keys --- .../spark/redshift/RedshiftIntegrationSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala index 9986b396..2e7094fc 100644 --- a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala +++ b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala @@ -404,6 +404,8 @@ class RedshiftIntegrationSuite .option("url", jdbcUrl) .option("dbtable", tableName) .option("tempdir", tempDir) + .option("aws_access_key_id", AWS_ACCESS_KEY_ID) + .option("aws_secret_access_key", AWS_SECRET_ACCESS_KEY) .mode(SaveMode.ErrorIfExists) .save() assert(DefaultJDBCWrapper.tableExists(conn, tableName)) @@ -412,6 +414,8 @@ class RedshiftIntegrationSuite .option("url", jdbcUrl) .option("dbtable", tableName) .option("tempdir", tempDir) + .option("aws_access_key_id", AWS_ACCESS_KEY_ID) + .option("aws_secret_access_key", AWS_SECRET_ACCESS_KEY) .load() checkAnswer(loadedDf, Seq(Row("a" * 512))) // This append should fail due to the string being longer than the maxlength @@ -421,6 +425,8 @@ class RedshiftIntegrationSuite .option("url", jdbcUrl) .option("dbtable", tableName) .option("tempdir", tempDir) + .option("aws_access_key_id", AWS_ACCESS_KEY_ID) + .option("aws_secret_access_key", AWS_SECRET_ACCESS_KEY) .mode(SaveMode.Append) .save() } From e9772d5f04c50b323f6bfe2c6b537c2084dfe813 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 26 Aug 2015 15:42:47 -0700 Subject: [PATCH 3/5] Use TEXT as default --- .../databricks/spark/redshift/RedshiftJDBCWrapper.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala index 0ab9b8d9..31b90bc7 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala @@ -142,12 +142,11 @@ private[redshift] class JDBCWrapper extends Logging { case ByteType => "SMALLINT" // Redshift does not support the BYTE type. case BooleanType => "BOOLEAN" case StringType => - val maxlength = if (field.metadata.contains("maxlength")) { - field.metadata.getLong("maxlength") + if (field.metadata.contains("maxlength")) { + s"VARCHAR(${field.metadata.getLong("maxlength")})" } else { - 255 // TODO: make this default configurable? + "TEXT" } - s"VARCHAR($maxlength)" case BinaryType => "BLOB" case TimestampType => "TIMESTAMP" case DateType => "DATE" From 88927de72571c9acfd08ccd9f1258a352e7c1e21 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 26 Aug 2015 15:59:50 -0700 Subject: [PATCH 4/5] Add unit test for create table command --- .../spark/redshift/RedshiftJDBCWrapper.scala | 2 +- .../spark/redshift/RedshiftWriter.scala | 5 +++-- .../spark/redshift/RedshiftSourceSuite.scala | 19 +++++++++++++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala index 31b90bc7..92a7bb69 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala @@ -154,7 +154,7 @@ private[redshift] class JDBCWrapper extends Logging { case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC") } val nullable = if (field.nullable) "" else "NOT NULL" - sb.append(s", $name $typ $nullable") + sb.append(s", $name $typ $nullable".trim) }} if (sb.length < 2) "" else sb.substring(2) } diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala index f3e753a2..a8814832 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala @@ -32,12 +32,13 @@ import org.apache.spark.sql.types._ /** * Functions to write data to Redshift with intermediate Avro serialisation into S3. */ -class RedshiftWriter(jdbcWrapper: JDBCWrapper) extends Logging { +private[redshift] class RedshiftWriter(jdbcWrapper: JDBCWrapper) extends Logging { /** * Generate CREATE TABLE statement for Redshift */ - private def createTableSql(data: DataFrame, params: MergedParameters): String = { + // Visible for testing. + private[redshift] def createTableSql(data: DataFrame, params: MergedParameters): String = { val schemaSql = jdbcWrapper.schemaString(data.schema) val distStyleDef = params.distStyle match { case Some(style) => s"DISTSTYLE $style" diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index 279c4921..03992714 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ +import com.databricks.spark.redshift.Parameters.MergedParameters + private class TestContext extends SparkContext("local", "RedshiftSourceSuite") { /** @@ -441,6 +443,23 @@ class RedshiftSourceSuite checkAnswer(written, TestUtils.expectedDataWithConvertedTimesAndDates) } + test("configuring maxlength on string columns") { + val longStrMetadata = new MetadataBuilder().putLong("maxlength", 512).build() + val shortStrMetadata = new MetadataBuilder().putLong("maxlength", 10).build() + val schema = StructType( + StructField("long_str", StringType, metadata = longStrMetadata) :: + StructField("short_str", StringType, metadata = shortStrMetadata) :: + StructField("default_str", StringType) :: + Nil) + val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema) + val createTableCommand = + DefaultRedshiftWriter.createTableSql(df, MergedParameters.apply(defaultParams)).trim + val expectedCreateTableCommand = + "CREATE TABLE IF NOT EXISTS test_table (long_str VARCHAR(512), short_str VARCHAR(10), " + + "default_str TEXT)" + assert(createTableCommand === expectedCreateTableCommand) + } + test("Respect SaveMode.ErrorIfExists when table exists") { val errIfExistsWrapper = mockJdbcWrapper(defaultParams("url"), Seq.empty[Regex]) From b92e689444d4a6913d0e87294b6450ef95abcb59 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 26 Aug 2015 16:52:23 -0700 Subject: [PATCH 5/5] Add user-facing documentation on how to use column metadata --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index c1533944..63cc6b13 100644 --- a/README.md +++ b/README.md @@ -303,6 +303,24 @@ table, the changes will be reverted and the backup table restored if post action +## Additional configuration options + +### Configuring the maximum size of string columns + +When creating Redshift tables, `spark-redshift`'s default behavior is to create `TEXT` columns for string columns. Redshift stores `TEXT` columns as `VARCHAR(256)`, so these columns have a maximum size of 256 characters ([source](http://docs.aws.amazon.com/redshift/latest/dg/r_Character_types.html)). + +To support larger columns, you can use the `maxlength` column metadata field to specify the maximum length of individual string columns. This can also be done as a space-savings performance optimization in order to declare columns with a smaller maximum length than the default. + +Here is an example of updating a column's metadata field in Scala: + +``` +import org.apache.spark.sql.types.MetadataBuilder +val metadata = new MetadataBuilder().putLong("maxlength", 10).build() +df.withColumn("colName", col("colName").as("colName", metadata) +``` + +Column metadata modification is unsupported in the Python, SQL, and R language APIs. + ## AWS Credentials Note that you can provide AWS credentials in the parameters above, with Hadoop `fs.*` configuration settings,