@@ -36,20 +36,39 @@ object Selector {
3636// ----------------------------------------------------------------------------
3737
3838trait Dataset [T ] {
39- def select [A ](c : Column [T , A ]): Dataset [A ] =
40- this .asInstanceOf [Dataset [A ]] // Use c.label to do an untyped select on actual Spark Dataset, and
39+
40+ def select [A ](c : Column [T , A ]): Dataset [A ] = new SelectedDataset [T , A ](this , c)
41+ // Use c.label to do an untyped select on actual Spark Dataset, and
4142 // cast the result to TypedDataset[A]
4243
4344 def col [S <: String , A ](s : S )(implicit unused ev : Exists [T , s.type , A ]) =
44- new Column [T , A ](s) // ev is only here to check than this is safe, it's
45- // never used at runtime!
45+ new Column [T , A ](s) // ev is only here to check than this is safe, it's never used at runtime!
46+
47+ def collect (): Vector [T ]
48+ }
49+
50+ class SelectedDataset [T , A ](ds : Dataset [T ], val col : Column [T , A ]) extends Dataset [A ] {
51+ def collect (): Vector [A ] = {
52+ // This would use collect of the underlying Spark structure plus a cast
53+ ds match { // Dummy implementation
54+ case SeqDataset (data) =>
55+ println(s " selecting ` ${col.label}` from $data" )
56+ col.label match {
57+ case " a" => data.map(_.asInstanceOf [X4 [A ,_,_,_]].a).toVector
58+ case " b" => data.map(_.asInstanceOf [X4 [_,A ,_,_]].b).toVector
59+ case " c" => data.map(_.asInstanceOf [X4 [_,_,A ,_]].c).toVector
60+ case " d" => data.map(_.asInstanceOf [X4 [_,_,_,A ]].d).toVector
61+ }
62+ }
63+ }
64+ }
4665
47- def collect () : Vector [T ] =
48- Vector .empty [T ] // Uses collect of the underlying Spark structure plus a cast
66+ case class SeqDataset [ T ]( data : Seq [T ]) extends Dataset [ T ] {
67+ override def collect () : Vector [T ] = data.toVector
4968}
5069
5170object Dataset {
52- def create [T ](values : Seq [T ]): Dataset [T ] = new Dataset [T ] { }
71+ def create [T ](values : Seq [T ]): Dataset [T ] = new SeqDataset [T ](values)
5372}
5473
5574/** Expression used in `select`-like constructions.
@@ -115,15 +134,13 @@ object Test {
115134 println(" unused" )
116135 val unusedD = ds.col(" d" )
117136 val outSpark1 : Vector [Boolean ] = ds.select(unusedD).collect()
118- // FIXME implement Dataset opertations
119- // assert(outSpark1 == outColl)
137+ assert(outSpark1 == outColl)
120138 }
121139
122140 println(" used" )
123141 val usedD = ds.col(" d" )
124142 val outSpark2 : Vector [Boolean ] = ds.select(usedD).collect()
125- // FIXME implement Dataset opertations
126- // assert(outSpark2 == outColl)
143+ assert(outSpark2 == outColl)
127144
128145 println(" end" )
129146 }
0 commit comments