|
18 | 18 | package org.apache.spark.api.r |
19 | 19 |
|
20 | 20 | import java.io.{DataInputStream, DataOutputStream} |
21 | | -import java.sql.{Date, Time} |
| 21 | +import java.sql.{Timestamp, Date, Time} |
22 | 22 |
|
23 | 23 | import scala.collection.JavaConversions._ |
24 | 24 |
|
@@ -107,9 +107,12 @@ private[spark] object SerDe { |
107 | 107 | Date.valueOf(readString(in)) |
108 | 108 | } |
109 | 109 |
|
110 | | - def readTime(in: DataInputStream): Time = { |
111 | | - val t = in.readDouble() |
112 | | - new Time((t * 1000L).toLong) |
| 110 | + def readTime(in: DataInputStream): Timestamp = { |
| 111 | + val seconds = in.readDouble() |
| 112 | + val sec = Math.floor(seconds).toLong |
| 113 | + val t = new Timestamp(sec * 1000L) |
| 114 | + t.setNanos(((seconds - sec) * 1e9).toInt) |
| 115 | + t |
113 | 116 | } |
114 | 117 |
|
115 | 118 | def readBytesArr(in: DataInputStream): Array[Array[Byte]] = { |
@@ -227,6 +230,9 @@ private[spark] object SerDe { |
227 | 230 | case "java.sql.Time" => |
228 | 231 | writeType(dos, "time") |
229 | 232 | writeTime(dos, value.asInstanceOf[Time]) |
| 233 | + case "java.sql.Timestamp" => |
| 234 | + writeType(dos, "time") |
| 235 | + writeTime(dos, value.asInstanceOf[Timestamp]) |
230 | 236 | case "[B" => |
231 | 237 | writeType(dos, "raw") |
232 | 238 | writeBytes(dos, value.asInstanceOf[Array[Byte]]) |
@@ -289,6 +295,10 @@ private[spark] object SerDe { |
289 | 295 | out.writeDouble(value.getTime.toDouble / 1000.0) |
290 | 296 | } |
291 | 297 |
|
| 298 | + def writeTime(out: DataOutputStream, value: Timestamp): Unit = { |
| 299 | + println(s"$value ${(value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9}") |
| 300 | + out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9) |
| 301 | + } |
292 | 302 |
|
293 | 303 | // NOTE: Only works for ASCII right now |
294 | 304 | def writeString(out: DataOutputStream, value: String): Unit = { |
|
0 commit comments