Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,24 @@ table, the changes will be reverted and the backup table restored if post action
</tr>
</table>

## Additional configuration options

### Configuring the maximum size of string columns

When creating Redshift tables, `spark-redshift`'s default behavior is to create `TEXT` columns for string columns. Redshift stores `TEXT` columns as `VARCHAR(256)`, so these columns have a maximum size of 256 characters ([source](http://docs.aws.amazon.com/redshift/latest/dg/r_Character_types.html)).

To support larger columns, you can use the `maxlength` column metadata field to specify the maximum length of individual string columns. This can also be done as a space-savings performance optimization in order to declare columns with a smaller maximum length than the default.

Here is an example of updating a column's metadata field in Scala:

```
import org.apache.spark.sql.types.MetadataBuilder
val metadata = new MetadataBuilder().putLong("maxlength", 10).build()
df.withColumn("colName", col("colName").as("colName", metadata)
```

Column metadata modification is unsupported in the Python, SQL, and R language APIs.

## AWS Credentials

Note that you can provide AWS credentials in the parameters above, with Hadoop `fs.*` configuration settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,49 @@ class RedshiftIntegrationSuite
}
}

test("configuring maxlength on string columns") {
val tableName = s"configuring_maxlength_on_string_column_$randomSuffix"
try {
val metadata = new MetadataBuilder().putLong("maxlength", 512).build()
val schema = StructType(
StructField("x", StringType, metadata = metadata) :: Nil)
sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))), schema).write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("aws_access_key_id", AWS_ACCESS_KEY_ID)
.option("aws_secret_access_key", AWS_SECRET_ACCESS_KEY)
.mode(SaveMode.ErrorIfExists)
.save()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("aws_access_key_id", AWS_ACCESS_KEY_ID)
.option("aws_secret_access_key", AWS_SECRET_ACCESS_KEY)
.load()
checkAnswer(loadedDf, Seq(Row("a" * 512)))
// This append should fail due to the string being longer than the maxlength
intercept[SQLException] {
sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 513))), schema).write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("aws_access_key_id", AWS_ACCESS_KEY_ID)
.option("aws_secret_access_key", AWS_SECRET_ACCESS_KEY)
.mode(SaveMode.Append)
.save()
}
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("informative error message when saving a table with string that is longer than max length") {
val tableName = s"error_message_when_string_too_long_$randomSuffix"
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,20 @@ private[redshift] class JDBCWrapper extends Logging {
case ShortType => "INTEGER"
case ByteType => "SMALLINT" // Redshift does not support the BYTE type.
case BooleanType => "BOOLEAN"
case StringType => "TEXT"
case StringType =>
if (field.metadata.contains("maxlength")) {
s"VARCHAR(${field.metadata.getLong("maxlength")})"
} else {
"TEXT"
}
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case t: DecimalType => s"DECIMAL(${t.precision},${t.scale})"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
}
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s", $name $typ $nullable")
sb.append(s", $name $typ $nullable".trim)
}}
if (sb.length < 2) "" else sb.substring(2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import org.apache.spark.sql.types._
/**
* Functions to write data to Redshift with intermediate Avro serialisation into S3.
*/
class RedshiftWriter(jdbcWrapper: JDBCWrapper) extends Logging {
private[redshift] class RedshiftWriter(jdbcWrapper: JDBCWrapper) extends Logging {

/**
* Generate CREATE TABLE statement for Redshift
*/
private def createTableSql(data: DataFrame, params: MergedParameters): String = {
// Visible for testing.
private[redshift] def createTableSql(data: DataFrame, params: MergedParameters): String = {
val schemaSql = jdbcWrapper.schemaString(data.schema)
val distStyleDef = params.distStyle match {
case Some(style) => s"DISTSTYLE $style"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

import com.databricks.spark.redshift.Parameters.MergedParameters

private class TestContext extends SparkContext("local", "RedshiftSourceSuite") {

/**
Expand Down Expand Up @@ -441,6 +443,23 @@ class RedshiftSourceSuite
checkAnswer(written, TestUtils.expectedDataWithConvertedTimesAndDates)
}

test("configuring maxlength on string columns") {
val longStrMetadata = new MetadataBuilder().putLong("maxlength", 512).build()
val shortStrMetadata = new MetadataBuilder().putLong("maxlength", 10).build()
val schema = StructType(
StructField("long_str", StringType, metadata = longStrMetadata) ::
StructField("short_str", StringType, metadata = shortStrMetadata) ::
StructField("default_str", StringType) ::
Nil)
val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema)
val createTableCommand =
DefaultRedshiftWriter.createTableSql(df, MergedParameters.apply(defaultParams)).trim
val expectedCreateTableCommand =
"CREATE TABLE IF NOT EXISTS test_table (long_str VARCHAR(512), short_str VARCHAR(10), " +
"default_str TEXT)"
assert(createTableCommand === expectedCreateTableCommand)
}

test("Respect SaveMode.ErrorIfExists when table exists") {
val errIfExistsWrapper = mockJdbcWrapper(defaultParams("url"), Seq.empty[Regex])

Expand Down