diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index f25591794abd..d4e72ffd5ca8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -24,12 +24,14 @@ import java.util.{Map => JavaMap} import javax.annotation.Nullable import scala.language.existentials +import scala.reflect.ClassTag import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils /** * Functions to convert Scala types to Catalyst types and vice versa. @@ -39,6 +41,8 @@ object CatalystTypeConverters { // Since the map values can be mutable, we explicitly import scala.collection.Map at here. import scala.collection.Map + lazy val universe = ScalaReflection.universe + private def isPrimitive(dataType: DataType): Boolean = { dataType match { case BooleanType => true @@ -454,4 +458,166 @@ object CatalystTypeConverters { def convertToScala(catalystValue: Any, dataType: DataType): Any = { createToScalaConverter(dataType)(catalystValue) } + + /** + * Like createToScalaConverter(DataType), creates a function that converts a Catalyst object to a + * Scala object; however, in this case, the Scala object is an instance of a subtype of Product + * (e.g. a case class). + * + * If the given Scala type is not compatible with the given structType, this method ultimately + * throws a ClassCastException when the converter is invoked. + * + * Typical use case would be converting a collection of rows that have the same schema. You will + * call this function once to get a converter, and apply it to every row. + */ + private[sql] def createToProductConverter[T <: Product]( + structType: StructType)(implicit classTag: ClassTag[T]): InternalRow => T = { + + // Use ScalaReflectionLock, to avoid reflection thread safety issues in 2.10. + // https://issues.scala-lang.org/browse/SI-6240 + // http://docs.scala-lang.org/overviews/reflection/thread-safety.html + ScalaReflectionLock.synchronized { createToProductConverter(classTag, structType) } + } + + private[sql] def createToProductConverter[T <: Product]( + classTag: ClassTag[T], structType: StructType): InternalRow => T = { + + import universe._ + + val constructorMirror = { + val mirror = runtimeMirror(Utils.getContextOrSparkClassLoader) + val classSymbol = mirror.classSymbol(classTag.runtimeClass) + val classMirror = mirror.reflectClass(classSymbol) + val constructorSymbol = { + // Adapted from ScalaReflection to find primary constructor. + // https://issues.apache.org/jira/browse/SPARK-4791 + val symbol = classSymbol.toType.declaration(nme.CONSTRUCTOR) + if (symbol.isMethod) { + symbol.asMethod + } else { + val candidateSymbol = + symbol.asTerm.alternatives.find { s => s.isMethod && s.asMethod.isPrimaryConstructor } + if (candidateSymbol.isDefined) { + candidateSymbol.get.asMethod + } else { + throw new IllegalArgumentException(s"No primary constructor for ${symbol.name}") + } + } + } + classMirror.reflectConstructor(constructorSymbol) + } + + val params = constructorMirror.symbol.paramss.head.toSeq + val paramTypes = params.map { _.asTerm.typeSignature } + val fields = structType.fields + val dataTypes = fields.map { _.dataType } + val converters: Seq[Any => Any] = + paramTypes.zip(dataTypes).map { case (pt, dt) => createToScalaConverter(pt, dt) } + + (row: InternalRow) => if (row == null) { + null.asInstanceOf[T] + } else { + val convertedArgs = + converters.zip(row.toSeq(dataTypes)).map { case (converter, arg) => converter(arg) } + try { + constructorMirror.apply(convertedArgs: _*).asInstanceOf[T] + } catch { + case e: IllegalArgumentException => // argument type mismatch + val message = + s"""|Error constructing ${classTag.runtimeClass.getName}: ${e.getMessage}; + |paramTypes: ${paramTypes}, dataTypes: ${dataTypes}, + |convertedArgs: ${convertedArgs}""".stripMargin.replace("\n", " ") + throw new ClassCastException(message) + } + } + } + + /** + * Like createToScalaConverter(DataType), but with a Scala type hint. + * + * Please keep in sync with createToScalaConverter(DataType) and ScalaReflection.schemaFor[T]. + */ + private[sql] def createToScalaConverter( + universeType: universe.Type, dataType: DataType): Any => Any = { + + import universe._ + + (universeType, dataType) match { + case (t, dt) if t <:< typeOf[Option[_]] => + val TypeRef(_, _, Seq(elementType)) = t + val converter: Any => Any = createToScalaConverter(elementType, dt) + (catalystValue: Any) => Option(converter(catalystValue)) + + case (t, udt: UserDefinedType[_]) => + (catalystValue: Any) => if (catalystValue == null) null else udt.deserialize(catalystValue) + + case (t, bt: BinaryType) => identity + + case (t, at: ArrayType) if t <:< typeOf[Array[_]] => + throw new UnsupportedOperationException("Array[_] is not supported; try using Seq instead.") + + case (t, at: ArrayType) if t <:< typeOf[Seq[_]] => + val TypeRef(_, _, Seq(elementType)) = t + val converter: Any => Any = createToScalaConverter(elementType, at.elementType) + (catalystValue: Any) => catalystValue match { + case arrayData: ArrayData => arrayData.toArray[Any](at.elementType).map(converter).toSeq + case o => o + } + + case (t, mt: MapType) if t <:< typeOf[Map[_, _]] => + val TypeRef(_, _, Seq(keyType, valueType)) = t + val keyConverter: Any => Any = createToScalaConverter(keyType, mt.keyType) + val valueConverter: Any => Any = createToScalaConverter(valueType, mt.valueType) + (catalystValue: Any) => catalystValue match { + case mapData: MapData => + val keys = mapData.keyArray().toArray[Any](mt.keyType) + val values = mapData.valueArray().toArray[Any](mt.valueType) + keys.map(keyConverter).zip(values.map(valueConverter)).toMap + case o => o + } + + case (t, st: StructType) if t <:< typeOf[Product] => + val className = t.erasure.typeSymbol.asClass.fullName + val classTag = if (Utils.classIsLoadable(className)) { + scala.reflect.ClassTag(Utils.classForName(className)) + } else { + throw new IllegalArgumentException(s"$className is not loadable") + } + createToProductConverter(classTag, st).asInstanceOf[Any => Any] + + case (t, StringType) if t <:< typeOf[String] => + (catalystValue: Any) => catalystValue match { + case utf8: UTF8String => utf8.toString + case o => o + } + + case (t, DateType) if t <:< typeOf[Date] => + (catalystValue: Any) => catalystValue match { + case i: Int => DateTimeUtils.toJavaDate(i) + case o => o + } + + case (t, TimestampType) if t <:< typeOf[Timestamp] => + (catalystValue: Any) => catalystValue match { + case x: Long => DateTimeUtils.toJavaTimestamp(x) + case o => o + } + + case (t, _: DecimalType) if t <:< typeOf[BigDecimal] => + (catalystValue: Any) => catalystValue match { + case d: Decimal => d.toBigDecimal + case o => o + } + + case (t, _: DecimalType) if t <:< typeOf[java.math.BigDecimal] => + (catalystValue: Any) => catalystValue match { + case d: Decimal => d.toJavaBigDecimal + case o => o + } + + // Pass non-string primitives through. (Strings are converted from UTF8Strings above.) + // For everything else, hope for the best. + case (t, o) => identity + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index 03bb102c67fe..8b34bea702e3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -17,8 +17,12 @@ package org.apache.spark.sql.catalyst +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ class CatalystTypeConvertersSuite extends SparkFunSuite { @@ -61,4 +65,202 @@ class CatalystTypeConvertersSuite extends SparkFunSuite { test("option handling in createToCatalystConverter") { assert(CatalystTypeConverters.createToCatalystConverter(IntegerType)(Some(123)) === 123) } + + import ScalaReflection._ + + test("createToProductConverter[T] with a case class") { + val a = A(1) + assert(a === rebuildWithProductConverter(a)) + } + + test("createToProductConverter[T] with a case class and null value") { + val a = A(null.asInstanceOf[Int]) + assert(a === rebuildWithProductConverter(a)) + } + + test("createToProductConverter[T] with a case class dependent on another case class") { + val b = B(A(1), 2.0) + assert(b === rebuildWithProductConverter(b)) + } + + test("createToProductConverter[T] with a case class dep. on case class dep. on case class") { + val c = C(B(A(1), 2.0), "hi everybody") + assert(c === rebuildWithProductConverter(c)) + } + + test("createToProductConverter[T] with T having Seqs") { + val d = D(B(A(1), 2.0), Seq(A(1), A(5)), Seq(3, 8), 10L) + assert(d === rebuildWithProductConverter(d)) + } + + test("createToProductConverter[T] with T having Maps") { + val e = E(B(A(1), 2.0), + Map(A(1) -> A(5), A(11) -> A(15)), + Map(A(1) -> 5, A(11) -> 15), + Map(1 -> A(5), 11 -> A(15)), + Map(1 -> 5, 11 -> 15)) + assert(e === rebuildWithProductConverter(e)) + } + + test("createToProductConverter[T] with T having multiple constructors") { + val f = new F(1) + assert(F(1, "hi everybody") === rebuildWithProductConverter(f)) + } + + test("createToProductConverter[T] with T having String") { + val g = new G("hi everybody!") + assert(g === rebuildWithProductConverter(g)) + } + + test("createToProductConverter[T] with an incompatible case class fails at conversion") { + val a = A(1) + val dataType = schemaFor[A].dataType + val row = CatalystTypeConverters.createToCatalystConverter(dataType)(a) + val converter = + CatalystTypeConverters.createToProductConverter[G](dataType.asInstanceOf[StructType]) + intercept[ClassCastException] { converter(row.asInstanceOf[InternalRow]) } + } + + test("createToProductConverter[T] with T having Some[U]") { + val h = H(Some(1)) + assert(h === rebuildWithProductConverter(h)) + } + + test("createToProductConverter[T] with T having None") { + val h = H(None) + assert(h === rebuildWithProductConverter(h)) + } + + test("createToProductConverter[T] with T having Option[Option[U]]") { + val i = I(Some(H(Some(1)))) + assert(i === rebuildWithProductConverter(i)) + } + + test("createToProductConverter[T] with T having Option[Seq[U]]") { + val j = J(Some(Seq(A(1), A(2)))) + assert(j === rebuildWithProductConverter(j)) + } + + test("createToProductConverter[T] with T having BigDecimal") { + val k = K(BigDecimal("123.0")) + assert(k === rebuildWithProductConverter(k)) + } + + test("createToProductConverter[T] with T having java.math.BigDecimal") { + // NOTE: As currently implemented, Decimal.apply(java.math.BigDecimal) triggers the implicit + // BigDecimal.javaBigDecimal2bigDecimal, creating a Scala BigDecimal with + // BigDecimal.defaultMathContext. So a given java.math.BigDecimal ends up taking maximum + // precision, and the scale is truncated to Decimal.MAX_LONG_DIGITS when converting back. This + // unit test accounts for that current behavior. + val l = L(new java.math.BigDecimal(new java.math.BigInteger("123"), Decimal.MAX_LONG_DIGITS)) + assert(l === rebuildWithProductConverter(l)) + } + + test("createToProductConverter[T] with T having java.sql.Date") { + val m = M(DateTimeUtils.toJavaDate(daysSinceEpoch = 16000)) + assert(m === rebuildWithProductConverter(m)) + } + + test("createToProductConverter[T] with T having java.sql.Timestamp") { + val n = N(DateTimeUtils.toJavaTimestamp(System.currentTimeMillis * 1000L)) + assert(n === rebuildWithProductConverter(n)) + } + + test("createToProductConverter[T] with T having Array[Byte] (treated as BinaryType)") { + // Use different objects to simulate not having the same byte array ref. + val p0 = P("hi everybody!".getBytes) + val p1 = P("hi everybody!".getBytes) + val dataType = schemaFor[P].dataType + val row = CatalystTypeConverters.createToCatalystConverter(dataType)(p1) + val converter = + CatalystTypeConverters.createToProductConverter[P](dataType.asInstanceOf[StructType]) + val rebuildWithProductConverterP1 = converter(row.asInstanceOf[InternalRow]) + assert(p0 === rebuildWithProductConverterP1) + } + + test("createToProductConverter[T] with T having Array[_ != Byte] isn't supported") { + intercept[UnsupportedOperationException] { + val q = Q(Array(1, 2, 3)) + val dataType = schemaFor[Q].dataType + val row = CatalystTypeConverters.createToCatalystConverter(dataType)(q) + val converter = + CatalystTypeConverters.createToProductConverter[Q](dataType.asInstanceOf[StructType]) + converter(row.asInstanceOf[InternalRow]) + } + } + + test("createToProductConverter[T] with T having UDT") { + val r = R(Point(3.0, 8.0)) + assert(r === rebuildWithProductConverter(r)) + } + + def rebuildWithProductConverter[T <: Product : TypeTag : ClassTag](obj: T): T = { + val dataType = schemaFor[T].dataType + val row = CatalystTypeConverters.createToCatalystConverter(dataType)(obj) + val converter = + CatalystTypeConverters.createToProductConverter[T](dataType.asInstanceOf[StructType]) + converter(row.asInstanceOf[InternalRow]) + } +} + +case class A(x: Int) +case class B(a: A, y: Double) +case class C(b: B, z: String) +case class D(b: B, d: Seq[A], p: Seq[Int], q: Long) +case class E(b: B, r: Map[A, A], s: Map[A, Int], t: Map[Int, A], u: Map[Int, Int]) + +case class F(f: Int, v: String) { + def this(f: Int) = this(f, "hi everybody") +} + +case class G(g: String) +case class H(x: Option[Int]) +case class I(h: Option[H]) +case class J(h: Option[Seq[A]]) +case class K(bd: BigDecimal) +case class L(bd: java.math.BigDecimal) +case class M(d: java.sql.Date) +case class N(t: java.sql.Timestamp) + +case class P(ba: Array[Byte]) { + override def equals(o: Any): Boolean = o match { + case p: P => java.util.Arrays.equals(ba, p.ba) + case _ => false + } + + override def hashCode(): Int = java.util.Arrays.hashCode(ba) +} + +case class Q(a: Array[Int]) + +case class R(p: Point) + +// UDT-related test classes. They are case classes to make testing equality easier. + +@SQLUserDefinedType(udt = classOf[PointUDT]) +case class Point(val x: Double, val y: Double) + +case class PointUDT() extends UserDefinedType[Point] { + + override def sqlType: DataType = ArrayType(DoubleType, false) + + override def serialize(obj: Any): Seq[Double] = { + obj match { + case p: Point => Seq(p.x, p.y) + case _ => throw new IllegalArgumentException(s"${obj} not serializable") + } + } + + override def deserialize(datum: Any): Point = { + datum match { + case values: Seq[_] => + val xy = values.asInstanceOf[Seq[Double]] + assert(xy.length == 2) + new Point(xy(0), xy(1)) + } + } + + override def userClass: Class[Point] = classOf[Point] + + override def asNullable: PointUDT = this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f9995da3a85e..20f7eb5a5671 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1522,6 +1522,71 @@ class DataFrame private[sql]( } } + /** + * :: Experimental :: + * + * Returns the content of the [[DataFrame]] as an [[RDD]] of the given type `T`, where `T` is a + * subtype of [[scala.Product]] (typically a case class). + * + * For example, given a case class `Food` + * + * {{{ + * case class Food(name: String, count: Int) + * }}} + * + * the following example shows how a [[DataFrame]] derived from an `RDD[Food]` can be + * reconstituted into another `RDD[Food]` with the same elements: + * + * {{{ + * val rdd0 = sc.parallelize(Seq(Food("apple", 1), Food("banana", 2), Food("cherry", 3))) + * val df0 = rdd0.toDF() + * df0.save("foods.parquet") + * + * val df1 = sqlContext.load("foods.parquet") + * val rdd1 = df1.toTypedRDD[Food]() + * // rdd0 and rdd1 should have the same elements + * }}} + * + * This method makes a best effort to validate, up front, i.e. before the RDD is materialized, + * that `T` is compatible with this DataFrame's [[schema]] and will throw an + * `IllegalArgumentException` if it isn't. Any other problems with the schema or conversion should + * manifest as exceptions when materializing the RDD. + * + * `toTypedRDD` can reconstruct most but not all `T`. For example, if `T` has a field of type + * `Array` whose corresponding Catalyst type is `ArrayType`, `toTypedRDD` cannot rebuild the array + * because of limitations with reflection. (`toTypedRDD` can only build `Seq` fields from Catalyst + * values of `ArrayType`.) + * + * This method cannot reconstruct classes defined in the Spark shell. Before using the shell, you + * should compile any classes you want to use with `toTypedRDD`. + * + * @group rdd + */ + @Experimental + def toTypedRDD[T <: Product : TypeTag]()(implicit classTag: ClassTag[T]): RDD[T] = { + // NOTE: Similar to implementation of rdd. + + // Use a local variable to make sure the map closure doesn't capture the whole DataFrame. + val schema = this.schema.asInstanceOf[StructType] + + // Validate the schema. + // NOTE: Nullability appears to be lost for primitives when an RDD is converted to a DataFrame. + // To give the DataFrame a chance to be converted back to an RDD, we ignore nullability. + val schemaForT = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType] + if (schema.asNullable != schemaForT.asNullable) { + throw new IllegalArgumentException( + s"""|Even after ignoring nullable, schemas are incompatible: + |DataFrame ${schema.asNullable} vs. + |${classTag.runtimeClass.getName} ${schemaForT.asNullable}")""" + .stripMargin.replaceAll("\n", " ")) + } else { + queryExecution.executedPlan.execute().mapPartitions { rows => + val converter = CatalystTypeConverters.createToProductConverter[T](schema) + rows.map(converter) + } + } + } + /** * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s. * @group rdd diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToRDDSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToRDDSuite.scala new file mode 100644 index 000000000000..de8a1decef9b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToRDDSuite.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.test.SharedSQLContext + +// For SPARK-7160: toTypedRDD[T]. +class DataFrameToRDDSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("toTypedRDD[T] works with simple case class") { + import org.scalatest.Matchers._ + val oldRDD = sqlContext.sparkContext.parallelize( + Seq(A("apple", 1), A("banana", 2), A("cherry", 3))) + val df = oldRDD.toDF() + val newRDD = df.toTypedRDD[A]() + newRDD.collect() should contain theSameElementsAs oldRDD.collect() + } + + test("toTypedRDD[T] works with case class dependent on another case class") { + import org.scalatest.Matchers._ + val oldRDD = sqlContext.sparkContext.parallelize( + Seq(B(A("apple", 1), 1.0), B(A("banana", 2), 2.0), B(A("cherry", 3), 3.0))) + val df = oldRDD.toDF() + val newRDD = df.toTypedRDD[B]() + newRDD.collect() should contain theSameElementsAs oldRDD.collect() + } + + test("toTypedRDD[T] works with case class having a Seq") { + import org.scalatest.Matchers._ + val oldRDD = sqlContext.sparkContext.parallelize( + Seq( + C("fruits", Seq(A("apple", 1), A("banana", 2), A("cherry", 3))), + C("vegetables", Seq(A("eggplant", 4), A("spinach", 5), A("zucchini", 6))))) + val df = oldRDD.toDF() + val newRDD = df.toTypedRDD[C]() + newRDD.collect() should contain theSameElementsAs oldRDD.collect() + } + + test("toTypedRDD[T] works with case class having a Map") { + import org.scalatest.Matchers._ + val oldRDD = sqlContext.sparkContext.parallelize( + Seq( + D("fruits", Map(1 -> A("apple", 1), 2 -> A("banana", 2), 3 -> A("cherry", 3))), + D("vegetables", Map(4 -> A("eggplant", 4), 5 -> A("spinach", 5), 6 -> A("zucchini", 6))))) + val df = oldRDD.toDF() + val newRDD = df.toTypedRDD[D]() + newRDD.collect() should contain theSameElementsAs oldRDD.collect() + } + + test("toTypedRDD[T] works with case class having an Option") { + import org.scalatest.Matchers._ + val oldRDD = sqlContext.sparkContext.parallelize( + Seq(E(Some(A("apple", 1))), E(Some(A("banana", 2))), E(Some(A("cherry", 3))), E(None))) + val df = oldRDD.toDF() + val newRDD = df.toTypedRDD[E]() + newRDD.collect() should contain theSameElementsAs oldRDD.collect() + } + + test("toTypedRDD[T] works with case class having a (Scala) BigDecimal") { + import org.scalatest.Matchers._ + val oldRDD = sqlContext.sparkContext.parallelize( + Seq(F(BigDecimal(1.0)), F(BigDecimal(2.0)), F(BigDecimal(3.0)))) + val df = oldRDD.toDF() + val newRDD = df.toTypedRDD[F]() + newRDD.collect() should contain theSameElementsAs oldRDD.collect() + } + + test("toTypedRDD[T] fails with an incompatible case class") { + intercept[IllegalArgumentException] { + val oldRDD = sqlContext.sparkContext.parallelize(Seq(A("apple", 1))) + val df = oldRDD.toDF() + val newRDD = df.toTypedRDD[B]() + newRDD.collect() + } + } + + test("toTypedRDD[T] can be used to reload an RDD saved to Parquet") { + import java.io.File + import org.apache.spark.util.Utils + import org.scalatest.Matchers._ + + val tempDir = Utils.createTempDir() + val filePath = new File(tempDir, "testParquet").getCanonicalPath + + val rdd0 = + sqlContext.sparkContext.parallelize(Seq(A("apple", 1), A("banana", 2), A("cherry", 3))) + val df0 = rdd0.toDF() + df0.write.format("parquet").save(filePath) + + val df1 = sqlContext.read.format("parquet").load(filePath) + val rdd1 = df1.toTypedRDD[A]() + rdd1.collect() should contain theSameElementsAs rdd0.collect() + } +} + +case class A(x: String, y: Int) +case class B(a: A, z: Double) +case class C(x: String, a: Seq[A]) +case class D(x: String, a: Map[Int, A]) +case class E(o: Option[A]) +case class F(bd: BigDecimal)