Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ private[feature] trait CountVectorizerParams extends Params with HasInputCol wit

/** Validates and transforms the input schema. */
protected def validateAndTransformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(inputCol), new ArrayType(StringType, true))
val typeCandidates = List(ArrayType(StringType, true), ArrayType(StringType, false))
SchemaUtils.checkColumnTypes(schema, $(inputCol), typeCandidates)
SchemaUtils.appendColumn(schema, $(outputCol), new VectorUDT)
}

Expand Down
17 changes: 17 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ private[spark] object SchemaUtils {
s"Column $colName must be of type $dataType but was actually $actualDataType.$message")
}

/**
* Check whether the given schema contains a column of one of the require data types.
* @param colName column name
* @param dataTypes required column data types
*/
def checkColumnTypes(
schema: StructType,
colName: String,
dataTypes: Seq[DataType],
msg: String = ""): Unit = {
val actualDataType = schema(colName).dataType
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(dataTypes.exists(actualDataType.equals),
s"Column $colName must be of type equal to one of the following types: " +
s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.$message")
}

/**
* Appends a new column to the input schema. This fails if the given output column already exists.
* @param schema input schema
Expand Down