Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ object Encoders {
* - boxed types: Boolean, Integer, Double, etc.
* - String
* - java.math.BigDecimal, java.math.BigInteger
* - time related: java.sql.Date, java.sql.Timestamp
* - time related: java.sql.Date, java.sql.Timestamp, java.time.LocalDate, java.time.Instant
* - collection types: only array and java.util.List currently, map support is in progress
* - nested java bean.
*
Expand Down
14 changes: 14 additions & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,27 @@ trait Row extends Serializable {
*/
def getDate(i: Int): java.sql.Date = getAs[java.sql.Date](i)

/**
* Returns the value at position i of date type as java.time.LocalDate.
*
* @throws ClassCastException when data type does not match.
*/
def getLocalDate(i: Int): java.time.LocalDate = getAs[java.time.LocalDate](i)

/**
* Returns the value at position i of date type as java.sql.Timestamp.
*
* @throws ClassCastException when data type does not match.
*/
def getTimestamp(i: Int): java.sql.Timestamp = getAs[java.sql.Timestamp](i)

/**
* Returns the value at position i of date type as java.time.Instant.
*
* @throws ClassCastException when data type does not match.
*/
def getInstant(i: Int): java.time.Instant = getAs[java.time.Instant](i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are getLocalDate and getInstant added just for test? If so, seems to be overkill.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added for consistency with other supported java classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not native classes for native Spark SQL datatypes. Not necessarily to have native getters for them. If you're like to add them, I'm fine with it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just checked and found you added them as external types in RowEncoder recently. Then it makes sense.


/**
* Returns the value at position i of array type as a Scala Seq.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ object JavaTypeInference {

case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType.SYSTEM_DEFAULT, true)
case c: Class[_] if c == classOf[java.math.BigInteger] => (DecimalType.BigIntDecimal, true)
case c: Class[_] if c == classOf[java.time.LocalDate] => (DateType, true)
case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType, true)
case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)

case _ if typeToken.isArray =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package test.org.apache.spark.sql;

import java.io.Serializable;
import java.time.Instant;
import java.time.LocalDate;
import java.util.*;

import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.catalyst.util.TimestampFormatter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.*;
Expand Down Expand Up @@ -509,4 +514,95 @@ public void setId(Integer id) {
this.id = id;
}
}

@Test
public void testBeanWithLocalDateAndInstant() {
String originConf = spark.conf().get(SQLConf.DATETIME_JAVA8API_ENABLED().key());
try {
spark.conf().set(SQLConf.DATETIME_JAVA8API_ENABLED().key(), "true");
List<Row> inputRows = new ArrayList<>();
List<LocalDateInstantRecord> expectedRecords = new ArrayList<>();

for (long idx = 0 ; idx < 5 ; idx++) {
Row row = createLocalDateInstantRow(idx);
inputRows.add(row);
expectedRecords.add(createLocalDateInstantRecord(row));
}

Encoder<LocalDateInstantRecord> encoder = Encoders.bean(LocalDateInstantRecord.class);

StructType schema = new StructType()
.add("localDateField", DataTypes.DateType)
.add("instantField", DataTypes.TimestampType);

Dataset<Row> dataFrame = spark.createDataFrame(inputRows, schema);
Dataset<LocalDateInstantRecord> dataset = dataFrame.as(encoder);

List<LocalDateInstantRecord> records = dataset.collectAsList();

Assert.assertEquals(expectedRecords, records);
} finally {
spark.conf().set(SQLConf.DATETIME_JAVA8API_ENABLED().key(), originConf);
}
}

public static final class LocalDateInstantRecord {
private String localDateField;
private String instantField;

public LocalDateInstantRecord() { }

public String getLocalDateField() {
return localDateField;
}

public void setLocalDateField(String localDateField) {
this.localDateField = localDateField;
}

public String getInstantField() {
return instantField;
}

public void setInstantField(String instantField) {
this.instantField = instantField;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LocalDateInstantRecord that = (LocalDateInstantRecord) o;
return Objects.equals(localDateField, that.localDateField) &&
Objects.equals(instantField, that.instantField);
}

@Override
public int hashCode() {
return Objects.hash(localDateField, instantField);
}

@Override
public String toString() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to bother implementing toString here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In regular case, toString should not be called but it is called when the assert fails.

return com.google.common.base.Objects.toStringHelper(this)
.add("localDateField", localDateField)
.add("instantField", instantField)
.toString();
}
}

private static Row createLocalDateInstantRow(Long index) {
Object[] values = new Object[] { LocalDate.ofEpochDay(42), Instant.ofEpochSecond(42) };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Total nit, but I think you can omit "new Object[]"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right it can be omitted. Should I remove it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not unless you're otherwise changing the code. It doesn't matter. Just a style convention I like to use

return new GenericRow(values);
}

private static LocalDateInstantRecord createLocalDateInstantRecord(Row recordRow) {
LocalDateInstantRecord record = new LocalDateInstantRecord();
record.setLocalDateField(String.valueOf(recordRow.getLocalDate(0)));
Instant instant = recordRow.getInstant(1);
TimestampFormatter formatter = TimestampFormatter.getFractionFormatter(
DateTimeUtils.getZoneId(SQLConf.get().sessionLocalTimeZone()));
record.setInstantField(formatter.format(DateTimeUtils.instantToMicros(instant)));
return record;
}
}