Skip to content

Commit 4d7963e

Browse files
committed
remove muanlly serialization
1 parent 6d26b03 commit 4d7963e

File tree

3 files changed

+12
-708
lines changed

3 files changed

+12
-708
lines changed

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 0 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.mllib.api.python
1919

2020
import java.io.OutputStream
21-
import java.nio.{ByteBuffer, ByteOrder}
2221

2322
import scala.collection.JavaConverters._
2423

@@ -467,10 +466,6 @@ class PythonMLLibAPI extends Serializable {
467466
* SerDe utility functions for PythonMLLibAPI.
468467
*/
469468
private[spark] object SerDe extends Serializable {
470-
private val DENSE_VECTOR_MAGIC: Byte = 1
471-
private val SPARSE_VECTOR_MAGIC: Byte = 2
472-
private val DENSE_MATRIX_MAGIC: Byte = 3
473-
private val LABELED_POINT_MAGIC: Byte = 4
474469

475470
private[python] def reduce_object(out: OutputStream, pickler: Pickler,
476471
module: String, name: String, objects: Object*) = {
@@ -595,178 +590,6 @@ private[spark] object SerDe extends Serializable {
595590
new Unpickler().loads(bytes)
596591
}
597592

598-
private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
599-
require(bytes.length - offset >= 5, "Byte array too short")
600-
val magic = bytes(offset)
601-
if (magic == DENSE_VECTOR_MAGIC) {
602-
deserializeDenseVector(bytes, offset)
603-
} else if (magic == SPARSE_VECTOR_MAGIC) {
604-
deserializeSparseVector(bytes, offset)
605-
} else {
606-
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
607-
}
608-
}
609-
610-
private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
611-
require(bytes.length - offset == 8, "Wrong size byte array for Double")
612-
val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
613-
bb.order(ByteOrder.nativeOrder())
614-
bb.getDouble
615-
}
616-
617-
private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
618-
val packetLength = bytes.length - offset
619-
require(packetLength >= 5, "Byte array too short")
620-
val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
621-
bb.order(ByteOrder.nativeOrder())
622-
val magic = bb.get()
623-
require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
624-
val length = bb.getInt()
625-
require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
626-
val db = bb.asDoubleBuffer()
627-
val ans = new Array[Double](length.toInt)
628-
db.get(ans)
629-
Vectors.dense(ans)
630-
}
631-
632-
private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
633-
val packetLength = bytes.length - offset
634-
require(packetLength >= 9, "Byte array too short")
635-
val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
636-
bb.order(ByteOrder.nativeOrder())
637-
val magic = bb.get()
638-
require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
639-
val size = bb.getInt()
640-
val nonZeros = bb.getInt()
641-
require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
642-
val ib = bb.asIntBuffer()
643-
val indices = new Array[Int](nonZeros)
644-
ib.get(indices)
645-
bb.position(bb.position() + 4 * nonZeros)
646-
val db = bb.asDoubleBuffer()
647-
val values = new Array[Double](nonZeros)
648-
db.get(values)
649-
Vectors.sparse(size, indices, values)
650-
}
651-
652-
/**
653-
* Returns an 8-byte array for the input Double.
654-
*
655-
* Note: we currently do not use a magic byte for double for storage efficiency.
656-
* This should be reconsidered when we add Ser/De for other 8-byte types (e.g. Long), for safety.
657-
* The corresponding deserializer, deserializeDouble, needs to be modified as well if the
658-
* serialization scheme changes.
659-
*/
660-
private[python] def serializeDouble(double: Double): Array[Byte] = {
661-
val bytes = new Array[Byte](8)
662-
val bb = ByteBuffer.wrap(bytes)
663-
bb.order(ByteOrder.nativeOrder())
664-
bb.putDouble(double)
665-
bytes
666-
}
667-
668-
private[python] def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
669-
val len = doubles.length
670-
val bytes = new Array[Byte](5 + 8 * len)
671-
val bb = ByteBuffer.wrap(bytes)
672-
bb.order(ByteOrder.nativeOrder())
673-
bb.put(DENSE_VECTOR_MAGIC)
674-
bb.putInt(len)
675-
val db = bb.asDoubleBuffer()
676-
db.put(doubles)
677-
bytes
678-
}
679-
680-
private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] = {
681-
val nonZeros = vector.indices.length
682-
val bytes = new Array[Byte](9 + 12 * nonZeros)
683-
val bb = ByteBuffer.wrap(bytes)
684-
bb.order(ByteOrder.nativeOrder())
685-
bb.put(SPARSE_VECTOR_MAGIC)
686-
bb.putInt(vector.size)
687-
bb.putInt(nonZeros)
688-
val ib = bb.asIntBuffer()
689-
ib.put(vector.indices)
690-
bb.position(bb.position() + 4 * nonZeros)
691-
val db = bb.asDoubleBuffer()
692-
db.put(vector.values)
693-
bytes
694-
}
695-
696-
private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
697-
case s: SparseVector =>
698-
serializeSparseVector(s)
699-
case _ =>
700-
serializeDenseVector(vector.toArray)
701-
}
702-
703-
private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
704-
val packetLength = bytes.length
705-
if (packetLength < 9) {
706-
throw new IllegalArgumentException("Byte array too short.")
707-
}
708-
val bb = ByteBuffer.wrap(bytes)
709-
bb.order(ByteOrder.nativeOrder())
710-
val magic = bb.get()
711-
if (magic != DENSE_MATRIX_MAGIC) {
712-
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
713-
}
714-
val rows = bb.getInt()
715-
val cols = bb.getInt()
716-
if (packetLength != 9 + 8 * rows * cols) {
717-
throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
718-
}
719-
val db = bb.asDoubleBuffer()
720-
val ans = new Array[Array[Double]](rows.toInt)
721-
for (i <- 0 until rows.toInt) {
722-
ans(i) = new Array[Double](cols.toInt)
723-
db.get(ans(i))
724-
}
725-
ans
726-
}
727-
728-
private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
729-
val rows = doubles.length
730-
var cols = 0
731-
if (rows > 0) {
732-
cols = doubles(0).length
733-
}
734-
val bytes = new Array[Byte](9 + 8 * rows * cols)
735-
val bb = ByteBuffer.wrap(bytes)
736-
bb.order(ByteOrder.nativeOrder())
737-
bb.put(DENSE_MATRIX_MAGIC)
738-
bb.putInt(rows)
739-
bb.putInt(cols)
740-
val db = bb.asDoubleBuffer()
741-
for (i <- 0 until rows) {
742-
db.put(doubles(i))
743-
}
744-
bytes
745-
}
746-
747-
private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
748-
val fb = serializeDoubleVector(p.features)
749-
val bytes = new Array[Byte](1 + 8 + fb.length)
750-
val bb = ByteBuffer.wrap(bytes)
751-
bb.order(ByteOrder.nativeOrder())
752-
bb.put(LABELED_POINT_MAGIC)
753-
bb.putDouble(p.label)
754-
bb.put(fb)
755-
bytes
756-
}
757-
758-
private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
759-
require(bytes.length >= 9, "Byte array too short")
760-
val magic = bytes(0)
761-
if (magic != LABELED_POINT_MAGIC) {
762-
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
763-
}
764-
val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
765-
labelBytes.order(ByteOrder.nativeOrder())
766-
val label = labelBytes.asDoubleBuffer().get(0)
767-
LabeledPoint(label, deserializeDoubleVector(bytes, 9))
768-
}
769-
770593
// Reformat a Matrix into Array[Array[Double]] for serialization
771594
private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
772595
val values = matrix.toArray

0 commit comments

Comments
 (0)