diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml index 4b28e95b71cf5..524af4e7730fd 100644 --- a/flink-formats/flink-parquet/pom.xml +++ b/flink-formats/flink-parquet/pom.xml @@ -35,39 +35,17 @@ under the License. jar - - - org.apache.flink - flink-core - ${project.version} - provided - + org.apache.flink - flink-table-common + flink-core ${project.version} provided - - - org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} - ${project.version} - provided - true - - - - org.apache.flink - flink-table-planner_${scala.binary.version} - ${project.version} - provided - true - org.apache.flink @@ -77,7 +55,7 @@ under the License. true - + org.apache.parquet @@ -156,7 +134,7 @@ under the License. - + org.apache.flink diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java deleted file mode 100644 index 008dee8fe335e..0000000000000 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.parquet; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.io.CheckpointableInputFormat; -import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.formats.parquet.utils.ParquetRecordReader; -import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; -import org.apache.flink.formats.parquet.utils.RowReadSupport; -import org.apache.flink.metrics.Counter; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.filter2.compat.FilterCompat; -import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.io.InputFile; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The base InputFormat class to read from Parquet files. For specific return types the {@link - * #convert(Row)} method need to be implemented. - * - *

Using {@link ParquetRecordReader} to read files instead of {@link - * org.apache.flink.core.fs.FSDataInputStream}, we override {@link #open(FileInputSplit)} and {@link - * #close()} to change the behaviors. - * - * @param The type of record to read. - */ -public abstract class ParquetInputFormat extends FileInputFormat - implements CheckpointableInputFormat> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); - - /** The flag to specify whether to skip file splits with wrong schema. */ - private boolean skipWrongSchemaFileSplit = false; - - /** The flag to specify whether to skip corrupted record. */ - private boolean skipCorruptedRecord = false; - - /** The flag to track that the current split should be skipped. */ - private boolean skipThisSplit = false; - - private TypeInformation[] fieldTypes; - - private String[] fieldNames; - - private FilterPredicate filterPredicate; - - private transient Counter recordConsumed; - - private transient MessageType expectedFileSchema; - - private transient ParquetRecordReader parquetRecordReader; - - /** - * Read parquet files with given parquet file schema. - * - * @param path The path of the file to read. - * @param messageType schema of parquet file - */ - protected ParquetInputFormat(Path path, MessageType messageType) { - super(path); - this.expectedFileSchema = checkNotNull(messageType, "messageType"); - RowTypeInfo rowTypeInfo = - (RowTypeInfo) ParquetSchemaConverter.fromParquetType(expectedFileSchema); - this.fieldTypes = rowTypeInfo.getFieldTypes(); - this.fieldNames = rowTypeInfo.getFieldNames(); - // read whole parquet file as one file split - this.unsplittable = true; - } - - @Override - public void configure(Configuration parameters) { - super.configure(parameters); - - if (!this.skipWrongSchemaFileSplit) { - this.skipWrongSchemaFileSplit = - parameters.getBoolean(PARQUET_SKIP_WRONG_SCHEMA_SPLITS, false); - } - - if (this.skipCorruptedRecord) { - this.skipCorruptedRecord = parameters.getBoolean(PARQUET_SKIP_CORRUPTED_RECORD, false); - } - } - - /** - * Configures the fields to be read and returned by the ParquetInputFormat. Selected fields must - * be present in the configured schema. - * - * @param fieldNames Names of all selected fields. - */ - public void selectFields(String[] fieldNames) { - checkNotNull(fieldNames, "fieldNames"); - this.fieldNames = fieldNames; - RowTypeInfo rowTypeInfo = - (RowTypeInfo) ParquetSchemaConverter.fromParquetType(expectedFileSchema); - TypeInformation[] selectFieldTypes = new TypeInformation[fieldNames.length]; - for (int i = 0; i < fieldNames.length; i++) { - try { - selectFieldTypes[i] = rowTypeInfo.getTypeAt(fieldNames[i]); - } catch (IndexOutOfBoundsException e) { - throw new IllegalArgumentException( - String.format( - "Fail to access Field %s , " - + "which is not contained in the file schema", - fieldNames[i]), - e); - } - } - this.fieldTypes = selectFieldTypes; - } - - public void setFilterPredicate(FilterPredicate filterPredicate) { - this.filterPredicate = filterPredicate; - } - - @Override - public Tuple2 getCurrentState() { - return parquetRecordReader.getCurrentReadPosition(); - } - - @Override - public void open(FileInputSplit split) throws IOException { - // reset the flag when open a new split - this.skipThisSplit = false; - org.apache.hadoop.conf.Configuration configuration = - new org.apache.hadoop.conf.Configuration(); - InputFile inputFile = - HadoopInputFile.fromPath( - new org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration); - ParquetReadOptions options = ParquetReadOptions.builder().build(); - ParquetFileReader fileReader = new ParquetFileReader(inputFile, options); - MessageType fileSchema = fileReader.getFileMetaData().getSchema(); - MessageType readSchema = getReadSchema(fileSchema, split.getPath()); - if (skipThisSplit) { - LOG.warn( - String.format( - "Escaped the file split [%s] due to mismatch of file schema to expected result schema", - split.getPath().toString())); - } else { - this.parquetRecordReader = - new ParquetRecordReader<>( - new RowReadSupport(), - readSchema, - filterPredicate == null - ? FilterCompat.NOOP - : FilterCompat.get(filterPredicate)); - this.parquetRecordReader.initialize(fileReader, configuration); - this.parquetRecordReader.setSkipCorruptedRecord(this.skipCorruptedRecord); - - if (this.recordConsumed == null) { - this.recordConsumed = - getRuntimeContext().getMetricGroup().counter("parquet-records-consumed"); - } - - LOG.debug( - String.format( - "Open ParquetInputFormat with FileInputSplit [%s]", - split.getPath().toString())); - } - } - - @Override - public void reopen(FileInputSplit split, Tuple2 state) throws IOException { - Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); - Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); - this.open(split); - // seek to the read position in the split that we were at when the checkpoint was taken. - parquetRecordReader.seek(state.f0, state.f1); - } - - /** - * Get field names of read result. - * - * @return field names array - */ - protected String[] getFieldNames() { - return fieldNames; - } - - /** - * Get field types of read result. - * - * @return field types array - */ - protected TypeInformation[] getFieldTypes() { - return fieldTypes; - } - - @VisibleForTesting - protected FilterPredicate getPredicate() { - return this.filterPredicate; - } - - @Override - public void close() throws IOException { - if (parquetRecordReader != null) { - parquetRecordReader.close(); - } - } - - @Override - public boolean reachedEnd() throws IOException { - if (skipThisSplit) { - return true; - } - - return parquetRecordReader.reachEnd(); - } - - @Override - public E nextRecord(E e) throws IOException { - if (reachedEnd()) { - return null; - } - - recordConsumed.inc(); - return convert(parquetRecordReader.nextRecord()); - } - - /** - * This ParquetInputFormat read parquet record as Row by default. Sub classes of it can extend - * this method to further convert row to other types, such as POJO, Map or Tuple. - * - * @param row row read from parquet file - * @return E target result type - */ - protected abstract E convert(Row row); - - /** - * Generates and returns the read schema based on the projected fields for a given file. - * - * @param fileSchema The schema of the given file. - * @param filePath The path of the given file. - * @return The read schema based on the given file's schema and the projected fields. - */ - private MessageType getReadSchema(MessageType fileSchema, Path filePath) { - RowTypeInfo fileTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(fileSchema); - List types = new ArrayList<>(); - for (int i = 0; i < fieldNames.length; ++i) { - String readFieldName = fieldNames[i]; - TypeInformation readFieldType = fieldTypes[i]; - if (fileTypeInfo.getFieldIndex(readFieldName) < 0) { - if (!skipWrongSchemaFileSplit) { - throw new IllegalArgumentException( - "Field " - + readFieldName - + " cannot be found in schema of " - + " Parquet file: " - + filePath - + "."); - } else { - this.skipThisSplit = true; - return fileSchema; - } - } - - if (!readFieldType.equals(fileTypeInfo.getTypeAt(readFieldName))) { - if (!skipWrongSchemaFileSplit) { - throw new IllegalArgumentException( - "Expecting type " - + readFieldType - + " for field " - + readFieldName - + " but found type " - + fileTypeInfo.getTypeAt(readFieldName) - + " in Parquet file: " - + filePath - + "."); - } else { - this.skipThisSplit = true; - return fileSchema; - } - } - types.add(fileSchema.getType(readFieldName)); - } - - return new MessageType(fileSchema.getName(), types); - } - - /** The config parameter which defines whether to skip file split with wrong schema. */ - public static final String PARQUET_SKIP_WRONG_SCHEMA_SPLITS = "skip.splits.wrong.schema"; - - /** The config parameter which defines whether to skip corrupted record. */ - public static final String PARQUET_SKIP_CORRUPTED_RECORD = "skip.corrupted.record"; -} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java deleted file mode 100644 index 9e1110c123f6d..0000000000000 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.parquet; - -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.Row; - -import org.apache.parquet.schema.MessageType; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * An implementation of {@link ParquetInputFormat} to read {@link Map} records from Parquet files. - */ -public class ParquetMapInputFormat extends ParquetInputFormat { - - public ParquetMapInputFormat(Path path, MessageType messageType) { - super(path, messageType); - } - - @Override - protected Map convert(Row row) { - Map map = new HashMap<>(); - convert(map, row, getFieldTypes(), getFieldNames()); - return map; - } - - @SuppressWarnings("unchecked") - private void convert( - Map map, - Row row, - TypeInformation[] fieldTypes, - String[] fieldNames) { - for (int i = 0; i < fieldNames.length; i++) { - if (row.getField(i) != null) { - if (fieldTypes[i] instanceof BasicTypeInfo - || fieldTypes[i] instanceof PrimitiveArrayTypeInfo - || fieldTypes[i] instanceof BasicArrayTypeInfo) { - map.put(fieldNames[i], row.getField(i)); - } else if (fieldTypes[i] instanceof RowTypeInfo) { - Map nestedRow = new HashMap<>(); - RowTypeInfo nestedRowTypeInfo = (RowTypeInfo) fieldTypes[i]; - convert( - nestedRow, - (Row) row.getField(i), - nestedRowTypeInfo.getFieldTypes(), - nestedRowTypeInfo.getFieldNames()); - map.put(fieldNames[i], nestedRow); - } else if (fieldTypes[i] instanceof MapTypeInfo) { - Map nestedMap = new HashMap<>(); - MapTypeInfo mapTypeInfo = (MapTypeInfo) fieldTypes[i]; - convert(nestedMap, (Map) row.getField(i), mapTypeInfo); - map.put(fieldNames[i], nestedMap); - } else if (fieldTypes[i] instanceof ObjectArrayTypeInfo) { - List nestedObjectList = new ArrayList<>(); - ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) fieldTypes[i]; - convert(nestedObjectList, (Row[]) row.getField(i), objectArrayTypeInfo); - map.put(fieldNames[i], nestedObjectList); - } - } - } - } - - @SuppressWarnings("unchecked") - private void convert( - Map target, Map source, MapTypeInfo mapTypeInfo) { - TypeInformation valueTypeInfp = mapTypeInfo.getValueTypeInfo(); - - for (Map.Entry entry : source.entrySet()) { - String key = entry.getKey(); - Object value = entry.getValue(); - if (valueTypeInfp instanceof RowTypeInfo) { - Map nestedRow = new HashMap<>(); - convert( - nestedRow, - (Row) value, - ((RowTypeInfo) valueTypeInfp).getFieldTypes(), - ((RowTypeInfo) valueTypeInfp).getFieldNames()); - target.put(key, nestedRow); - } else if (valueTypeInfp instanceof MapTypeInfo) { - Map nestedMap = new HashMap<>(); - convert(nestedMap, (Map) value, (MapTypeInfo) valueTypeInfp); - target.put(key, nestedMap); - } else if (valueTypeInfp instanceof ObjectArrayTypeInfo) { - List nestedObjectList = new ArrayList<>(); - convert(nestedObjectList, (Object[]) value, (ObjectArrayTypeInfo) valueTypeInfp); - target.put(key, nestedObjectList); - } - } - } - - @SuppressWarnings("unchecked") - private void convert( - List target, Object[] source, ObjectArrayTypeInfo objectArrayTypeInfo) { - TypeInformation itemType = objectArrayTypeInfo.getComponentInfo(); - for (Object field : source) { - if (itemType instanceof RowTypeInfo) { - Map nestedRow = new HashMap<>(); - convert( - nestedRow, - (Row) field, - ((RowTypeInfo) itemType).getFieldTypes(), - ((RowTypeInfo) itemType).getFieldNames()); - target.add(nestedRow); - } else if (itemType instanceof MapTypeInfo) { - Map nestedMap = new HashMap<>(); - MapTypeInfo mapTypeInfo = (MapTypeInfo) itemType; - convert(nestedMap, (Map) field, mapTypeInfo); - target.add(nestedMap); - } else if (itemType instanceof ObjectArrayTypeInfo) { - List nestedObjectList = new ArrayList<>(); - convert(nestedObjectList, (Row[]) field, (ObjectArrayTypeInfo) itemType); - target.add(nestedObjectList); - } - } - } -} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java deleted file mode 100644 index a2a340e4f0596..0000000000000 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.parquet; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import org.apache.parquet.schema.MessageType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** An implementation of {@link ParquetInputFormat} to read POJO records from Parquet files. */ -public class ParquetPojoInputFormat extends ParquetInputFormat { - - private static final Logger LOG = LoggerFactory.getLogger(ParquetPojoInputFormat.class); - private final Class pojoTypeClass; - private final TypeSerializer typeSerializer; - private transient Field[] pojoFields; - - public ParquetPojoInputFormat( - Path filePath, MessageType messageType, PojoTypeInfo pojoTypeInfo) { - super(filePath, messageType); - this.pojoTypeClass = pojoTypeInfo.getTypeClass(); - this.typeSerializer = pojoTypeInfo.createSerializer(new ExecutionConfig()); - final Map fieldMap = new HashMap<>(); - findAllFields(pojoTypeClass, fieldMap); - selectFields(fieldMap.keySet().toArray(new String[0])); - } - - @Override - public void open(FileInputSplit split) throws IOException { - super.open(split); - pojoFields = new Field[getFieldNames().length]; - LOG.error("Fields number is {}.", getFieldNames().length); - final Map fieldMap = new HashMap<>(); - findAllFields(pojoTypeClass, fieldMap); - - for (int i = 0; i < getFieldNames().length; ++i) { - String fieldName = getFieldNames()[i]; - pojoFields[i] = fieldMap.get(fieldName); - - if (pojoFields[i] != null) { - pojoFields[i].setAccessible(true); - } else { - throw new RuntimeException( - String.format( - "There is no field called %s in %s", - fieldName, pojoTypeClass.getName())); - } - } - } - - private void findAllFields(Class clazz, Map fieldMap) { - - for (Field field : clazz.getDeclaredFields()) { - fieldMap.put(field.getName(), field); - } - - if (clazz.getSuperclass() != null) { - findAllFields(clazz.getSuperclass(), fieldMap); - } - } - - @Override - protected E convert(Row row) { - E result = typeSerializer.createInstance(); - for (int i = 0; i < row.getArity(); ++i) { - try { - if (pojoFields[i].getType().isAssignableFrom(List.class)) { - pojoFields[i].set(result, Collections.singletonList(row.getField(i))); - } else { - pojoFields[i].set(result, row.getField(i)); - } - } catch (IllegalAccessException e) { - throw new RuntimeException( - String.format( - "Parsed value could not be set in POJO field %s", - getFieldNames()[i])); - } - } - - return result; - } - - /** - * Extracts the {@link TypeInformation}s from {@link PojoTypeInfo} according to the given field - * name. - */ - private static TypeInformation[] extractTypeInfos( - PojoTypeInfo pojoTypeInfo, String[] fieldNames) { - Preconditions.checkNotNull(pojoTypeInfo); - Preconditions.checkNotNull(fieldNames); - Preconditions.checkArgument(pojoTypeInfo.getArity() >= fieldNames.length); - TypeInformation[] fieldTypes = new TypeInformation[fieldNames.length]; - for (int i = 0; i < fieldNames.length; ++i) { - String fieldName = fieldNames[i]; - Preconditions.checkNotNull(fieldName, "The field can't be null"); - int fieldPos = pojoTypeInfo.getFieldIndex(fieldName); - Preconditions.checkArgument( - fieldPos >= 0, - String.format( - "Field %s is not a member of POJO type %s", - fieldName, pojoTypeInfo.getTypeClass().getName())); - fieldTypes[i] = pojoTypeInfo.getTypeAt(fieldPos); - } - - return fieldTypes; - } -} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java deleted file mode 100644 index b55b87739e269..0000000000000 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.parquet; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.Row; - -import org.apache.parquet.schema.MessageType; - -/** - * An implementation of {@link ParquetInputFormat} to read {@link Row} records from Parquet files. - */ -public class ParquetRowInputFormat extends ParquetInputFormat - implements ResultTypeQueryable { - private static final long serialVersionUID = 11L; - - public ParquetRowInputFormat(Path path, MessageType messageType) { - super(path, messageType); - } - - @Override - public TypeInformation getProducedType() { - return new RowTypeInfo(getFieldTypes(), getFieldNames()); - } - - @Override - protected Row convert(Row row) { - return row; - } -} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java deleted file mode 100644 index 9be7cb70dcbc8..0000000000000 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java +++ /dev/null @@ -1,616 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.parquet; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.fs.Path; -import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.expressions.And; -import org.apache.flink.table.expressions.Attribute; -import org.apache.flink.table.expressions.BinaryComparison; -import org.apache.flink.table.expressions.BinaryExpression; -import org.apache.flink.table.expressions.EqualTo; -import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.GreaterThan; -import org.apache.flink.table.expressions.GreaterThanOrEqual; -import org.apache.flink.table.expressions.LessThan; -import org.apache.flink.table.expressions.LessThanOrEqual; -import org.apache.flink.table.expressions.Literal; -import org.apache.flink.table.expressions.Not; -import org.apache.flink.table.expressions.NotEqualTo; -import org.apache.flink.table.expressions.Or; -import org.apache.flink.table.sources.BatchTableSource; -import org.apache.flink.table.sources.FilterableTableSource; -import org.apache.flink.table.sources.ProjectableTableSource; -import org.apache.flink.table.sources.TableSource; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.filter2.predicate.FilterApi; -import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; -import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; -import org.apache.parquet.filter2.predicate.Operators.Column; -import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; -import org.apache.parquet.filter2.predicate.Operators.FloatColumn; -import org.apache.parquet.filter2.predicate.Operators.IntColumn; -import org.apache.parquet.filter2.predicate.Operators.LongColumn; -import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.io.InvalidRecordException; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * A TableSource to read Parquet files. - * - *

The {@link ParquetTableSource} supports projection and filter push-down. - * - *

An {@link ParquetTableSource} is used as shown in the example below. - * - *

{@code
- * ParquetTableSource parquetSrc = ParquetTableSource.builder()
- *   .path("file:///my/data/file.parquet")
- *   .schema(messageType)
- *   .build();
- *
- * tEnv.registerTableSourceInternal("parquetTable", parquetSrc);
- * Table res = tableEnv.sqlQuery("SELECT * FROM parquetTable");
- * }
- */ -public class ParquetTableSource - implements BatchTableSource, FilterableTableSource, ProjectableTableSource { - - private static final Logger LOG = LoggerFactory.getLogger(ParquetTableSource.class); - - // path to read Parquet files from - private final String path; - // schema of the Parquet file - private final MessageType parquetSchema; - // the schema of table - private final TableSchema tableSchema; - // the configuration to read the file - private final Configuration parquetConfig; - // type information of the data returned by the InputFormat - private final RowTypeInfo typeInfo; - // list of selected Parquet fields to return - @Nullable private final int[] selectedFields; - // predicate expression to apply - @Nullable private final FilterPredicate predicate; - // flag whether a path is recursively enumerated - private final boolean recursiveEnumeration; - - private boolean isFilterPushedDown; - - private ParquetTableSource( - String path, - MessageType parquetSchema, - Configuration configuration, - boolean recursiveEnumeration) { - this(path, parquetSchema, configuration, recursiveEnumeration, null, null); - } - - private ParquetTableSource( - String path, - MessageType parquetSchema, - Configuration configuration, - boolean recursiveEnumeration, - @Nullable int[] selectedFields, - @Nullable FilterPredicate predicate) { - Preconditions.checkNotNull(path, "Path must not be null."); - Preconditions.checkNotNull(parquetSchema, "ParquetSchema must not be null."); - Preconditions.checkNotNull(configuration, "Configuration must not be null"); - this.path = path; - this.parquetSchema = parquetSchema; - this.parquetConfig = configuration; - this.selectedFields = selectedFields; - this.predicate = predicate; - this.recursiveEnumeration = recursiveEnumeration; - - if (predicate != null) { - this.isFilterPushedDown = true; - } - // determine the type information from the Parquet schema - RowTypeInfo typeInfoFromSchema = - (RowTypeInfo) ParquetSchemaConverter.fromParquetType(parquetSchema); - - // set return type info - if (selectedFields == null) { - this.typeInfo = typeInfoFromSchema; - } else { - this.typeInfo = RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields); - } - - // create a TableSchema that corresponds to the Parquet schema - this.tableSchema = - new TableSchema( - typeInfoFromSchema.getFieldNames(), typeInfoFromSchema.getFieldTypes()); - } - - @Override - public TableSource projectFields(int[] fields) { - return new ParquetTableSource( - path, parquetSchema, parquetConfig, recursiveEnumeration, fields, predicate); - } - - @Override - public DataSet getDataSet(ExecutionEnvironment executionEnvironment) { - ParquetRowInputFormat parquetRowInputFormat = - new ParquetRowInputFormat(new Path(path), parquetSchema); - parquetRowInputFormat.setNestedFileEnumeration(recursiveEnumeration); - if (selectedFields != null) { - parquetRowInputFormat.selectFields(typeInfo.getFieldNames()); - } - - if (predicate != null) { - parquetRowInputFormat.setFilterPredicate(predicate); - } - - return executionEnvironment.createInput(parquetRowInputFormat).name(explainSource()); - } - - @Override - public TableSource applyPredicate(List predicates) { - - // try to convert Flink filter expressions to Parquet FilterPredicates - List convertedPredicates = new ArrayList<>(predicates.size()); - List unsupportedExpressions = new ArrayList<>(predicates.size()); - - for (Expression toConvert : predicates) { - FilterPredicate convertedPredicate = toParquetPredicate(toConvert); - if (convertedPredicate != null) { - convertedPredicates.add(convertedPredicate); - } else { - unsupportedExpressions.add(toConvert); - } - } - - // update list of Flink expressions to unsupported expressions - predicates.clear(); - predicates.addAll(unsupportedExpressions); - - // construct single Parquet FilterPredicate - FilterPredicate parquetPredicate = null; - if (!convertedPredicates.isEmpty()) { - // concat converted predicates with AND - parquetPredicate = convertedPredicates.get(0); - - for (FilterPredicate converted : - convertedPredicates.subList(1, convertedPredicates.size())) { - parquetPredicate = FilterApi.and(parquetPredicate, converted); - } - } - - // create and return a new ParquetTableSource with Parquet FilterPredicate - return new ParquetTableSource( - path, - parquetSchema, - this.parquetConfig, - recursiveEnumeration, - selectedFields, - parquetPredicate); - } - - @Override - public boolean isFilterPushedDown() { - return isFilterPushedDown; - } - - @Override - public TypeInformation getReturnType() { - return typeInfo; - } - - @Override - public TableSchema getTableSchema() { - return tableSchema; - } - - @Override - public String explainSource() { - return "ParquetFile[path=" - + path - + ", schema=" - + parquetSchema - + ", filter=" - + predicateString() - + ", typeInfo=" - + typeInfo - + ", selectedFields=" - + Arrays.toString(selectedFields) - + ", pushDownStatus=" - + isFilterPushedDown - + "]"; - } - - private String predicateString() { - if (predicate != null) { - return predicate.toString(); - } else { - return "TRUE"; - } - } - - /** Converts Flink Expression to Parquet FilterPredicate. */ - @Nullable - private FilterPredicate toParquetPredicate(Expression exp) { - if (exp instanceof Not) { - FilterPredicate c = toParquetPredicate(((Not) exp).child()); - if (c == null) { - return null; - } else { - return FilterApi.not(c); - } - } else if (exp instanceof BinaryComparison) { - BinaryComparison binComp = (BinaryComparison) exp; - - if (!isValid(binComp)) { - // unsupported literal Type - LOG.debug("Unsupported predict [{}] cannot be pushed to ParquetTableSource.", exp); - return null; - } - - boolean onRight = literalOnRight(binComp); - Tuple2 columnPair = extractColumnAndLiteral(binComp); - - if (columnPair != null) { - if (exp instanceof EqualTo) { - if (columnPair.f0 instanceof IntColumn) { - return FilterApi.eq((IntColumn) columnPair.f0, (Integer) columnPair.f1); - } else if (columnPair.f0 instanceof LongColumn) { - return FilterApi.eq((LongColumn) columnPair.f0, (Long) columnPair.f1); - } else if (columnPair.f0 instanceof DoubleColumn) { - return FilterApi.eq((DoubleColumn) columnPair.f0, (Double) columnPair.f1); - } else if (columnPair.f0 instanceof FloatColumn) { - return FilterApi.eq((FloatColumn) columnPair.f0, (Float) columnPair.f1); - } else if (columnPair.f0 instanceof BooleanColumn) { - return FilterApi.eq((BooleanColumn) columnPair.f0, (Boolean) columnPair.f1); - } else if (columnPair.f0 instanceof BinaryColumn) { - return FilterApi.eq((BinaryColumn) columnPair.f0, (Binary) columnPair.f1); - } - } else if (exp instanceof NotEqualTo) { - if (columnPair.f0 instanceof IntColumn) { - return FilterApi.notEq((IntColumn) columnPair.f0, (Integer) columnPair.f1); - } else if (columnPair.f0 instanceof LongColumn) { - return FilterApi.notEq((LongColumn) columnPair.f0, (Long) columnPair.f1); - } else if (columnPair.f0 instanceof DoubleColumn) { - return FilterApi.notEq( - (DoubleColumn) columnPair.f0, (Double) columnPair.f1); - } else if (columnPair.f0 instanceof FloatColumn) { - return FilterApi.notEq((FloatColumn) columnPair.f0, (Float) columnPair.f1); - } else if (columnPair.f0 instanceof BooleanColumn) { - return FilterApi.notEq( - (BooleanColumn) columnPair.f0, (Boolean) columnPair.f1); - } else if (columnPair.f0 instanceof BinaryColumn) { - return FilterApi.notEq( - (BinaryColumn) columnPair.f0, (Binary) columnPair.f1); - } - } else if (exp instanceof GreaterThan) { - if (onRight) { - return greaterThan(exp, columnPair); - } else { - lessThan(exp, columnPair); - } - } else if (exp instanceof GreaterThanOrEqual) { - if (onRight) { - return greaterThanOrEqual(exp, columnPair); - } else { - return lessThanOrEqual(exp, columnPair); - } - } else if (exp instanceof LessThan) { - if (onRight) { - return lessThan(exp, columnPair); - } else { - return greaterThan(exp, columnPair); - } - } else if (exp instanceof LessThanOrEqual) { - if (onRight) { - return lessThanOrEqual(exp, columnPair); - } else { - return greaterThanOrEqual(exp, columnPair); - } - } else { - // Unsupported Predicate - LOG.debug( - "Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", - exp); - return null; - } - } - } else if (exp instanceof BinaryExpression) { - if (exp instanceof And) { - LOG.debug( - "All of the predicates should be in CNF. Found an AND expression: {}.", - exp); - } else if (exp instanceof Or) { - FilterPredicate c1 = toParquetPredicate(((Or) exp).left()); - FilterPredicate c2 = toParquetPredicate(((Or) exp).right()); - - if (c1 == null || c2 == null) { - return null; - } else { - return FilterApi.or(c1, c2); - } - } else { - // Unsupported Predicate - LOG.debug( - "Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", - exp); - return null; - } - } - - return null; - } - - @Nullable - private FilterPredicate greaterThan(Expression exp, Tuple2 columnPair) { - Preconditions.checkArgument(exp instanceof GreaterThan, "exp has to be GreaterThan"); - if (columnPair.f0 instanceof IntColumn) { - return FilterApi.gt((IntColumn) columnPair.f0, (Integer) columnPair.f1); - } else if (columnPair.f0 instanceof LongColumn) { - return FilterApi.gt((LongColumn) columnPair.f0, (Long) columnPair.f1); - } else if (columnPair.f0 instanceof DoubleColumn) { - return FilterApi.gt((DoubleColumn) columnPair.f0, (Double) columnPair.f1); - } else if (columnPair.f0 instanceof FloatColumn) { - return FilterApi.gt((FloatColumn) columnPair.f0, (Float) columnPair.f1); - } - - return null; - } - - @Nullable - private FilterPredicate lessThan(Expression exp, Tuple2 columnPair) { - Preconditions.checkArgument(exp instanceof LessThan, "exp has to be LessThan"); - - if (columnPair.f0 instanceof IntColumn) { - return FilterApi.lt((IntColumn) columnPair.f0, (Integer) columnPair.f1); - } else if (columnPair.f0 instanceof LongColumn) { - return FilterApi.lt((LongColumn) columnPair.f0, (Long) columnPair.f1); - } else if (columnPair.f0 instanceof DoubleColumn) { - return FilterApi.lt((DoubleColumn) columnPair.f0, (Double) columnPair.f1); - } else if (columnPair.f0 instanceof FloatColumn) { - return FilterApi.lt((FloatColumn) columnPair.f0, (Float) columnPair.f1); - } - - return null; - } - - @Nullable - private FilterPredicate greaterThanOrEqual( - Expression exp, Tuple2 columnPair) { - Preconditions.checkArgument( - exp instanceof GreaterThanOrEqual, "exp has to be GreaterThanOrEqual"); - if (columnPair.f0 instanceof IntColumn) { - return FilterApi.gtEq((IntColumn) columnPair.f0, (Integer) columnPair.f1); - } else if (columnPair.f0 instanceof LongColumn) { - return FilterApi.gtEq((LongColumn) columnPair.f0, (Long) columnPair.f1); - } else if (columnPair.f0 instanceof DoubleColumn) { - return FilterApi.gtEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1); - } else if (columnPair.f0 instanceof FloatColumn) { - return FilterApi.gtEq((FloatColumn) columnPair.f0, (Float) columnPair.f1); - } - - return null; - } - - @Nullable - private FilterPredicate lessThanOrEqual(Expression exp, Tuple2 columnPair) { - Preconditions.checkArgument( - exp instanceof LessThanOrEqual, "exp has to be LessThanOrEqual"); - if (columnPair.f0 instanceof IntColumn) { - return FilterApi.ltEq((IntColumn) columnPair.f0, (Integer) columnPair.f1); - } else if (columnPair.f0 instanceof LongColumn) { - return FilterApi.ltEq((LongColumn) columnPair.f0, (Long) columnPair.f1); - } else if (columnPair.f0 instanceof DoubleColumn) { - return FilterApi.ltEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1); - } else if (columnPair.f0 instanceof FloatColumn) { - return FilterApi.ltEq((FloatColumn) columnPair.f0, (Float) columnPair.f1); - } - - return null; - } - - private boolean isValid(BinaryComparison comp) { - return (comp.left() instanceof Literal && comp.right() instanceof Attribute) - || (comp.left() instanceof Attribute && comp.right() instanceof Literal); - } - - private boolean literalOnRight(BinaryComparison comp) { - if (comp.left() instanceof Literal && comp.right() instanceof Attribute) { - return false; - } else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) { - return true; - } else { - throw new RuntimeException("Invalid binary comparison."); - } - } - - private TypeInformation getLiteralType(BinaryComparison comp) { - if (literalOnRight(comp)) { - return ((Literal) comp.right()).resultType(); - } else { - return ((Literal) comp.left()).resultType(); - } - } - - private Object getLiteral(BinaryComparison comp) { - if (literalOnRight(comp)) { - return ((Literal) comp.right()).value(); - } else { - return ((Literal) comp.left()).value(); - } - } - - private String getColumnName(BinaryComparison comp) { - if (literalOnRight(comp)) { - return ((Attribute) comp.left()).name(); - } else { - return ((Attribute) comp.right()).name(); - } - } - - @Nullable - private Tuple2 extractColumnAndLiteral(BinaryComparison comp) { - String columnName = getColumnName(comp); - ColumnPath columnPath = ColumnPath.fromDotString(columnName); - TypeInformation typeInfo = null; - try { - Type type = parquetSchema.getType(columnPath.toArray()); - typeInfo = ParquetSchemaConverter.convertParquetTypeToTypeInfo(type); - } catch (InvalidRecordException e) { - LOG.error("Pushed predicate on undefined field name {} in schema", columnName); - return null; - } - - // fetch literal and ensure it is comparable - Object value = getLiteral(comp); - // validate that literal is comparable - if (!(value instanceof Comparable)) { - LOG.warn( - "Encountered a non-comparable literal of type {}." - + "Cannot push predicate [{}] into ParquetTablesource." - + "This is a bug and should be reported.", - value.getClass().getCanonicalName(), - comp); - return null; - } - - if (typeInfo == BasicTypeInfo.BYTE_TYPE_INFO - || typeInfo == BasicTypeInfo.SHORT_TYPE_INFO - || typeInfo == BasicTypeInfo.INT_TYPE_INFO) { - return new Tuple2<>(FilterApi.intColumn(columnName), ((Number) value).intValue()); - } else if (typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { - return new Tuple2<>(FilterApi.longColumn(columnName), ((Number) value).longValue()); - } else if (typeInfo == BasicTypeInfo.FLOAT_TYPE_INFO) { - return new Tuple2<>(FilterApi.floatColumn(columnName), ((Number) value).floatValue()); - } else if (typeInfo == BasicTypeInfo.BOOLEAN_TYPE_INFO) { - return new Tuple2<>(FilterApi.booleanColumn(columnName), (Boolean) value); - } else if (typeInfo == BasicTypeInfo.DOUBLE_TYPE_INFO) { - return new Tuple2<>(FilterApi.doubleColumn(columnName), ((Number) value).doubleValue()); - } else if (typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { - return new Tuple2<>( - FilterApi.binaryColumn(columnName), Binary.fromString((String) value)); - } else { - // unsupported type - return null; - } - } - - // Builder - public static Builder builder() { - return new Builder(); - } - - /** Constructs an {@link ParquetTableSource}. */ - public static class Builder { - - private String path; - - private MessageType schema; - - private Configuration config; - - private boolean recursive = true; - - /** - * Sets the path of Parquet files. If the path is specifies a directory, it will be - * recursively enumerated. - * - * @param path the path of the Parquet files. - * @return The Builder - */ - public Builder path(String path) { - Preconditions.checkNotNull(path, "Path must not be null"); - Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty"); - this.path = path; - return this; - } - - /** - * Sets the path of the Parquet files. - * - * @param path The path of the Parquet files - * @param recursive Flag whether to enumerate - * @return The Builder - */ - public Builder path(String path, boolean recursive) { - Preconditions.checkNotNull(path, "Path must not be null"); - Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty"); - this.path = path; - this.recursive = recursive; - return this; - } - - /** - * Sets the Parquet schema of the files to read as a String. - * - * @param parquetSchema The parquet schema of the files to read as a String. - * @return The Builder - */ - public Builder forParquetSchema(MessageType parquetSchema) { - Preconditions.checkNotNull(parquetSchema, "Parquet schema must not be null"); - this.schema = parquetSchema; - return this; - } - - /** - * Sets a Hadoop {@link Configuration} for the Parquet Reader. If no configuration is - * configured, an empty configuration is used. - * - * @param config The Hadoop Configuration for the Parquet reader. - * @return The Builder - */ - public Builder withConfiguration(Configuration config) { - Preconditions.checkNotNull(config, "Configuration must not be null."); - this.config = config; - return this; - } - - /** - * Builds the ParquetTableSource for this builder. - * - * @return The ParquetTableSource for this builder. - */ - public ParquetTableSource build() { - Preconditions.checkNotNull(path, "Path must not be null"); - Preconditions.checkNotNull(schema, "Parquet schema must not be null"); - if (config == null) { - this.config = new Configuration(); - } - - return new ParquetTableSource(this.path, this.schema, this.config, this.recursive); - } - } -} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java deleted file mode 100644 index 301d9a63bfe54..0000000000000 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.parquet.utils; - -/** Interface for {@link RowConverter} for extracting nested value from parquet record. */ -public interface ParentDataHolder { - - void add(int fieldIndex, Object object); -} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java deleted file mode 100644 index 11987e008f7a3..0000000000000 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.parquet.utils; - -import org.apache.flink.api.java.tuple.Tuple2; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.column.page.PageReadStore; -import org.apache.parquet.filter2.compat.FilterCompat; -import org.apache.parquet.filter2.compat.FilterCompat.Filter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.api.InitContext; -import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.FileMetaData; -import org.apache.parquet.io.ColumnIOFactory; -import org.apache.parquet.io.MessageColumnIO; -import org.apache.parquet.io.RecordReader; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; -import org.apache.parquet.schema.MessageType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.CheckReturnValue; -import javax.annotation.meta.When; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from - * particular position. - */ -public class ParquetRecordReader { - private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); - - private ColumnIOFactory columnIOFactory; - private Filter filter; - private MessageType readSchema; - private MessageType fileSchema; - private ReadSupport readSupport; - - private RecordMaterializer recordMaterializer; - private ParquetFileReader reader; - private RecordReader recordReader; - private boolean skipCorruptedRecord = true; - - private T readRecord; - private boolean readRecordReturned = true; - - // number of records in file - private long numTotalRecords; - // number of records that were read from file - private long numReadRecords = 0; - - // id of current block - private int currentBlock = -1; - private long numRecordsUpToPreviousBlock = 0; - private long numRecordsUpToCurrentBlock = 0; - - public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { - this.filter = checkNotNull(filter, "filter"); - this.readSupport = checkNotNull(readSupport, "readSupport"); - this.readSchema = checkNotNull(readSchema, "readSchema"); - } - - public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { - this(readSupport, readSchema, FilterCompat.NOOP); - } - - public void setSkipCorruptedRecord(boolean skipCorruptedRecord) { - this.skipCorruptedRecord = skipCorruptedRecord; - } - - public void initialize(ParquetFileReader reader, Configuration configuration) { - this.reader = reader; - FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); - // real schema of parquet file - this.fileSchema = parquetFileMetadata.getSchema(); - Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); - ReadSupport.ReadContext readContext = - readSupport.init( - new InitContext(configuration, toSetMultiMap(fileMetadata), readSchema)); - - this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); - this.recordMaterializer = - readSupport.prepareForRead(configuration, fileMetadata, readSchema, readContext); - this.numTotalRecords = reader.getRecordCount(); - } - - private RecordReader createRecordReader(PageReadStore pages) throws IOException { - if (pages == null) { - throw new IOException( - "Expecting more rows but reached last block. Read " - + numReadRecords - + " out of " - + numTotalRecords); - } - MessageColumnIO columnIO = columnIOFactory.getColumnIO(readSchema, fileSchema, true); - return columnIO.getRecordReader(pages, recordMaterializer, filter); - } - - /** - * Moves the reading position to the given block and seeks to and reads the given record. - * - * @param block The block to seek to. - * @param recordInBlock The number of the record in the block to return next. - */ - public void seek(long block, long recordInBlock) throws IOException { - - List blockMetaData = reader.getRowGroups(); - - if (block == -1L && recordInBlock == -1L) { - // the split was fully consumed - currentBlock = blockMetaData.size() - 1; - numReadRecords = numTotalRecords; - numRecordsUpToCurrentBlock = numTotalRecords; - return; - } - - // init all counters for the start of the first block - currentBlock = 0; - numRecordsUpToPreviousBlock = 0; - numRecordsUpToCurrentBlock = blockMetaData.get(0).getRowCount(); - numReadRecords = 0; - - // seek to the given block - while (currentBlock < block) { - currentBlock++; - reader.skipNextRowGroup(); - numRecordsUpToPreviousBlock = numRecordsUpToCurrentBlock; - numRecordsUpToCurrentBlock += blockMetaData.get(currentBlock).getRowCount(); - numReadRecords = numRecordsUpToPreviousBlock; - } - - // seek to and read the given record - PageReadStore pages = reader.readNextRowGroup(); - recordReader = createRecordReader(pages); - for (int i = 0; i <= recordInBlock; i++) { - readNextRecord(); - } - } - - /** - * Returns the current read position in the split, i.e., the current block and the number of - * records that were returned from that block. - * - * @return The current read position in the split. - */ - public Tuple2 getCurrentReadPosition() { - - // compute number of returned records - long numRecordsReturned = numReadRecords; - if (!readRecordReturned && numReadRecords > 0) { - numRecordsReturned -= 1; - } - - if (numRecordsReturned == numTotalRecords) { - // all records of split returned. - return Tuple2.of(-1L, -1L); - } - - if (numRecordsReturned == numRecordsUpToCurrentBlock) { - // all records of block returned. Next record is in next block - return Tuple2.of(currentBlock + 1L, 0L); - } - - // compute number of returned records of this block - long numRecordsOfBlockReturned = numRecordsReturned - numRecordsUpToPreviousBlock; - return Tuple2.of((long) currentBlock, numRecordsOfBlockReturned); - } - - /** - * Checks if the record reader returned all records. This method must be called before a record - * can be returned. - * - * @return False if there are more records to be read. True if all records have been returned. - */ - public boolean reachEnd() throws IOException { - // check if we have a read row that was not returned yet - if (readRecord != null && !readRecordReturned) { - return false; - } - // check if there are more rows to be read - if (numReadRecords >= numTotalRecords) { - return true; - } - // try to read next row - return !readNextRecord(); - } - - /** - * Reads the next record. - * - * @return True if a record could be read, false otherwise. - */ - private boolean readNextRecord() throws IOException { - boolean recordFound = false; - while (!recordFound) { - // no more records left - if (numReadRecords >= numTotalRecords) { - return false; - } - - try { - if (numReadRecords == numRecordsUpToCurrentBlock) { - // advance to next block - PageReadStore pages = reader.readNextRowGroup(); - recordReader = createRecordReader(pages); - numRecordsUpToPreviousBlock = numRecordsUpToCurrentBlock; - numRecordsUpToCurrentBlock += pages.getRowCount(); - currentBlock++; - } - - numReadRecords++; - try { - readRecord = recordReader.read(); - readRecordReturned = false; - } catch (RecordMaterializationException e) { - String errorMessage = - String.format( - "skipping a corrupt record in block number [%d] record number [%s] of file %s", - currentBlock, - numReadRecords - numRecordsUpToPreviousBlock, - reader.getFile()); - - if (!skipCorruptedRecord) { - LOG.error(errorMessage); - throw e; - } else { - LOG.warn(errorMessage); - } - continue; - } - - if (readRecord == null) { - readRecordReturned = true; - numReadRecords = numRecordsUpToCurrentBlock; - LOG.debug("filtered record reader reached end of block"); - continue; - } - - recordFound = true; - LOG.debug("read value: {}", readRecord); - } catch (RecordMaterializationException e) { - LOG.error( - String.format( - "Can not read value at %d in block %d in file %s", - numReadRecords - numRecordsUpToPreviousBlock, - currentBlock, - reader.getFile()), - e); - if (!skipCorruptedRecord) { - throw e; - } - return false; - } - } - - return true; - } - - /** - * Returns the next record. Note that the reachedEnd() method must be called before. - * - * @return The next record. - */ - @CheckReturnValue(when = When.NEVER) - public T nextRecord() { - readRecordReturned = true; - return readRecord; - } - - public void close() throws IOException { - if (reader != null) { - reader.close(); - } - } - - private static Map> toSetMultiMap(Map map) { - Map> setMultiMap = new HashMap<>(); - for (Map.Entry entry : map.entrySet()) { - Set set = Collections.singleton(entry.getValue()); - setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set)); - } - return Collections.unmodifiableMap(setMultiMap); - } -} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java index 29bcd2f337388..37aa43f244e31 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java @@ -18,528 +18,18 @@ package org.apache.flink.formats.parquet.utils; -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.List; /** Schema converter converts Parquet schema to and from Flink internal types. */ public class ParquetSchemaConverter { - private static final Logger LOGGER = LoggerFactory.getLogger(ParquetSchemaConverter.class); - public static final String MAP_VALUE = "value"; - public static final String LIST_ARRAY_TYPE = "array"; - public static final String LIST_ELEMENT = "element"; - public static final String LIST_GROUP_NAME = "list"; - public static final String MESSAGE_ROOT = "root"; - - /** - * Converts Parquet schema to Flink Internal Type. - * - * @param type Parquet schema - * @return Flink type information - */ - public static TypeInformation fromParquetType(MessageType type) { - return convertFields(type.getFields()); - } - - /** - * Converts Flink Internal Type to Parquet schema. - * - * @param typeInformation Flink type information - * @param legacyMode is standard LIST and MAP schema or back-compatible schema - * @return Parquet schema - */ - public static MessageType toParquetType( - TypeInformation typeInformation, boolean legacyMode) { - return (MessageType) - convertField(null, typeInformation, Type.Repetition.OPTIONAL, legacyMode); - } - - public static TypeInformation convertFields(List parquetFields) { - List> types = new ArrayList<>(); - List names = new ArrayList<>(); - for (Type field : parquetFields) { - TypeInformation subType = convertParquetTypeToTypeInfo(field); - if (subType != null) { - types.add(subType); - names.add(field.getName()); - } else { - LOGGER.error( - "Parquet field {} in schema type {} can not be converted to Flink Internal Type", - field.getName(), - field.getOriginalType().name()); - } - } - - return new RowTypeInfo( - types.toArray(new TypeInformation[0]), names.toArray(new String[0])); - } - - public static TypeInformation convertParquetTypeToTypeInfo(final Type fieldType) { - TypeInformation typeInfo; - if (fieldType.isPrimitive()) { - OriginalType originalType = fieldType.getOriginalType(); - PrimitiveType primitiveType = fieldType.asPrimitiveType(); - switch (primitiveType.getPrimitiveTypeName()) { - case BINARY: - if (originalType != null) { - switch (originalType) { - case DECIMAL: - typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO; - break; - case UTF8: - case ENUM: - case JSON: - case BSON: - typeInfo = BasicTypeInfo.STRING_TYPE_INFO; - break; - default: - throw new UnsupportedOperationException( - "Unsupported original type : " - + originalType.name() - + " for primitive type BINARY"); - } - } else { - typeInfo = BasicTypeInfo.STRING_TYPE_INFO; - } - break; - case BOOLEAN: - typeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO; - break; - case INT32: - if (originalType != null) { - switch (originalType) { - case DECIMAL: // for 1 <= precision (number of digits before the decimal - // point) <= 9, the INT32 stores the unscaled value - typeInfo = BasicTypeInfo.INT_TYPE_INFO; - break; - case TIME_MICROS: - case TIME_MILLIS: - typeInfo = SqlTimeTypeInfo.TIME; - break; - case TIMESTAMP_MICROS: - case TIMESTAMP_MILLIS: - typeInfo = SqlTimeTypeInfo.TIMESTAMP; - break; - case DATE: - typeInfo = SqlTimeTypeInfo.DATE; - break; - case UINT_8: - case UINT_16: - case UINT_32: - typeInfo = BasicTypeInfo.INT_TYPE_INFO; - break; - case INT_8: - typeInfo = org.apache.flink.api.common.typeinfo.Types.BYTE; - break; - case INT_16: - typeInfo = org.apache.flink.api.common.typeinfo.Types.SHORT; - break; - case INT_32: - typeInfo = BasicTypeInfo.INT_TYPE_INFO; - break; - default: - throw new UnsupportedOperationException( - "Unsupported original type : " - + originalType.name() - + " for primitive type INT32"); - } - } else { - typeInfo = BasicTypeInfo.INT_TYPE_INFO; - } - break; - case INT64: - if (originalType != null) { - switch (originalType) { - case TIME_MICROS: - typeInfo = SqlTimeTypeInfo.TIME; - break; - case TIMESTAMP_MICROS: - case TIMESTAMP_MILLIS: - typeInfo = SqlTimeTypeInfo.TIMESTAMP; - break; - case INT_64: - case DECIMAL: // for 1 <= precision (number of digits before the decimal - // point) <= 18, the INT64 stores the unscaled value - typeInfo = BasicTypeInfo.LONG_TYPE_INFO; - break; - default: - throw new UnsupportedOperationException( - "Unsupported original type : " - + originalType.name() - + " for primitive type INT64"); - } - } else { - typeInfo = BasicTypeInfo.LONG_TYPE_INFO; - } - break; - case INT96: - // It stores a timestamp type data, we read it as millisecond - typeInfo = SqlTimeTypeInfo.TIMESTAMP; - break; - case FLOAT: - typeInfo = BasicTypeInfo.FLOAT_TYPE_INFO; - break; - case DOUBLE: - typeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO; - break; - case FIXED_LEN_BYTE_ARRAY: - if (originalType != null) { - switch (originalType) { - case DECIMAL: - typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO; - break; - default: - throw new UnsupportedOperationException( - "Unsupported original type : " - + originalType.name() - + " for primitive type FIXED_LEN_BYTE_ARRAY"); - } - } else { - typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO; - } - break; - default: - throw new UnsupportedOperationException("Unsupported schema: " + fieldType); - } - } else { - GroupType parquetGroupType = fieldType.asGroupType(); - OriginalType originalType = parquetGroupType.getOriginalType(); - if (originalType != null) { - switch (originalType) { - case LIST: - if (parquetGroupType.getFieldCount() != 1) { - throw new UnsupportedOperationException( - "Invalid list type " + parquetGroupType); - } - Type repeatedType = parquetGroupType.getType(0); - if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) { - throw new UnsupportedOperationException( - "Invalid list type " + parquetGroupType); - } - - if (repeatedType.isPrimitive()) { - typeInfo = convertParquetPrimitiveListToFlinkArray(repeatedType); - } else { - // Backward-compatibility element group name can be any string - // (element/array/other) - GroupType elementType = repeatedType.asGroupType(); - // If the repeated field is a group with multiple fields, then its type - // is the element - // type and elements are required. - if (elementType.getFieldCount() > 1) { - typeInfo = - convertGroupElementToArrayTypeInfo( - parquetGroupType, elementType); - } else { - Type internalType = elementType.getType(0); - if (internalType.isPrimitive()) { - typeInfo = - convertParquetPrimitiveListToFlinkArray(internalType); - } else { - // No need to do special process for group named array and tuple - GroupType tupleGroup = internalType.asGroupType(); - if (tupleGroup.getFieldCount() == 1 - && tupleGroup - .getFields() - .get(0) - .isRepetition(Type.Repetition.REQUIRED)) { - typeInfo = - ObjectArrayTypeInfo.getInfoFor( - convertParquetTypeToTypeInfo(internalType)); - } else { - typeInfo = - convertGroupElementToArrayTypeInfo( - parquetGroupType, tupleGroup); - } - } - } - } - break; - - case MAP_KEY_VALUE: - case MAP: - // The outer-most level must be a group annotated with MAP - // that contains a single field named key_value - if (parquetGroupType.getFieldCount() != 1 - || parquetGroupType.getType(0).isPrimitive()) { - throw new UnsupportedOperationException( - "Invalid map type " + parquetGroupType); - } - - // The middle level must be a repeated group with a key field for map keys - // and, optionally, a value field for map values. But we can't enforce two - // strict condition here - // the schema generated by Parquet lib doesn't contain LogicalType - // ! mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE) - GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType(); - if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) - || mapKeyValType.getFieldCount() != 2) { - throw new UnsupportedOperationException( - "The middle level of Map should be single field named key_value. Invalid map type " - + parquetGroupType); - } - - Type keyType = mapKeyValType.getType(0); - - // The key field encodes the map's key type. This field must have repetition - // required and - // must always be present. - if (!keyType.isPrimitive() - || !keyType.isRepetition(Type.Repetition.REQUIRED) - || !keyType.asPrimitiveType() - .getPrimitiveTypeName() - .equals(PrimitiveType.PrimitiveTypeName.BINARY) - || !keyType.getOriginalType().equals(OriginalType.UTF8)) { - throw new IllegalArgumentException( - "Map key type must be required binary (UTF8): " + keyType); - } - - Type valueType = mapKeyValType.getType(1); - return new MapTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - convertParquetTypeToTypeInfo(valueType)); - default: - throw new UnsupportedOperationException("Unsupported schema: " + fieldType); - } - } else { - // if no original type than it is a record - return convertFields(parquetGroupType.getFields()); - } - } - - return typeInfo; - } - - private static ObjectArrayTypeInfo convertGroupElementToArrayTypeInfo( - GroupType arrayFieldType, GroupType elementType) { - for (Type type : elementType.getFields()) { - if (!type.isRepetition(Type.Repetition.REQUIRED)) { - throw new UnsupportedOperationException( - String.format( - "List field [%s] in List [%s] has to be required. ", - type.toString(), arrayFieldType.getName())); - } - } - return ObjectArrayTypeInfo.getInfoFor(convertParquetTypeToTypeInfo(elementType)); - } - - private static TypeInformation convertParquetPrimitiveListToFlinkArray(Type type) { - // Backward-compatibility element group doesn't exist also allowed - TypeInformation flinkType = convertParquetTypeToTypeInfo(type); - if (flinkType.isBasicType()) { - return BasicArrayTypeInfo.getInfoFor( - Array.newInstance(flinkType.getTypeClass(), 0).getClass()); - } else { - // flinkType here can be either SqlTimeTypeInfo or BasicTypeInfo.BIG_DEC_TYPE_INFO, - // So it should be converted to ObjectArrayTypeInfo - return ObjectArrayTypeInfo.getInfoFor(flinkType); - } - } - - private static Type convertField( - String fieldName, - TypeInformation typeInfo, - Type.Repetition inheritRepetition, - boolean legacyMode) { - Type fieldType = null; - - Type.Repetition repetition = - inheritRepetition == null ? Type.Repetition.OPTIONAL : inheritRepetition; - if (typeInfo instanceof BasicTypeInfo) { - BasicTypeInfo basicTypeInfo = (BasicTypeInfo) typeInfo; - if (basicTypeInfo.equals(BasicTypeInfo.BIG_DEC_TYPE_INFO) - || basicTypeInfo.equals(BasicTypeInfo.BIG_INT_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) - .as(OriginalType.DECIMAL) - .named(fieldName); - } else if (basicTypeInfo.equals(BasicTypeInfo.INT_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(OriginalType.INT_32) - .named(fieldName); - } else if (basicTypeInfo.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition) - .named(fieldName); - } else if (basicTypeInfo.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition) - .named(fieldName); - } else if (basicTypeInfo.equals(BasicTypeInfo.LONG_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition) - .as(OriginalType.INT_64) - .named(fieldName); - } else if (basicTypeInfo.equals(BasicTypeInfo.SHORT_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(OriginalType.INT_16) - .named(fieldName); - } else if (basicTypeInfo.equals(BasicTypeInfo.BYTE_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(OriginalType.INT_8) - .named(fieldName); - } else if (basicTypeInfo.equals(BasicTypeInfo.CHAR_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) - .as(OriginalType.UTF8) - .named(fieldName); - } else if (basicTypeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition) - .named(fieldName); - } else if (basicTypeInfo.equals(BasicTypeInfo.DATE_TYPE_INFO) - || basicTypeInfo.equals(BasicTypeInfo.STRING_TYPE_INFO)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) - .as(OriginalType.UTF8) - .named(fieldName); - } - } else if (typeInfo instanceof MapTypeInfo) { - MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; - - if (mapTypeInfo.getKeyTypeInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) { - fieldType = - Types.map(repetition) - .value( - convertField( - MAP_VALUE, - mapTypeInfo.getValueTypeInfo(), - Type.Repetition.OPTIONAL, - legacyMode)) - .named(fieldName); - } else { - throw new UnsupportedOperationException( - String.format( - "Can not convert Flink MapTypeInfo %s to Parquet" - + " Map type as key has to be String", - typeInfo.toString())); - } - } else if (typeInfo instanceof ObjectArrayTypeInfo) { - ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo; - - // Get all required sub fields - GroupType componentGroup = - (GroupType) - convertField( - LIST_ELEMENT, - objectArrayTypeInfo.getComponentInfo(), - Type.Repetition.REQUIRED, - legacyMode); - - if (legacyMode) { - // LegacyMode is 2 Level List schema - fieldType = - Types.buildGroup(repetition) - .addField(componentGroup) - .as(OriginalType.LIST) - .named(fieldName); - } else { - // Add extra layer of Group according to Parquet's standard - Type listGroup = - Types.repeatedGroup().addField(componentGroup).named(LIST_GROUP_NAME); - fieldType = - Types.buildGroup(repetition) - .addField(listGroup) - .as(OriginalType.LIST) - .named(fieldName); - } - } else if (typeInfo instanceof BasicArrayTypeInfo) { - BasicArrayTypeInfo basicArrayType = (BasicArrayTypeInfo) typeInfo; - - // LegacyMode is 2 Level List schema - if (legacyMode) { - PrimitiveType primitiveTyp = - convertField( - fieldName, - basicArrayType.getComponentInfo(), - Type.Repetition.REQUIRED, - legacyMode) - .asPrimitiveType(); - fieldType = - Types.buildGroup(repetition) - .addField(primitiveTyp) - .as(OriginalType.LIST) - .named(fieldName); - } else { - // Add extra layer of Group according to Parquet's standard - Type listGroup = - Types.repeatedGroup() - .addField( - convertField( - LIST_ELEMENT, - basicArrayType.getComponentInfo(), - Type.Repetition.REQUIRED, - legacyMode)) - .named(LIST_GROUP_NAME); - - fieldType = - Types.buildGroup(repetition) - .addField(listGroup) - .as(OriginalType.LIST) - .named(fieldName); - } - } else if (typeInfo instanceof SqlTimeTypeInfo) { - if (typeInfo.equals(SqlTimeTypeInfo.DATE)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(OriginalType.DATE) - .named(fieldName); - } else if (typeInfo.equals(SqlTimeTypeInfo.TIME)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(OriginalType.TIME_MILLIS) - .named(fieldName); - } else if (typeInfo.equals(SqlTimeTypeInfo.TIMESTAMP)) { - fieldType = - Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition) - .as(OriginalType.TIMESTAMP_MILLIS) - .named(fieldName); - } else { - throw new UnsupportedOperationException( - "Unsupported SqlTimeTypeInfo " + typeInfo.toString()); - } - - } else { - RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; - List types = new ArrayList<>(); - String[] fieldNames = rowTypeInfo.getFieldNames(); - TypeInformation[] fieldTypes = rowTypeInfo.getFieldTypes(); - for (int i = 0; i < rowTypeInfo.getArity(); i++) { - types.add(convertField(fieldNames[i], fieldTypes[i], repetition, legacyMode)); - } - - if (fieldName == null) { - fieldType = new MessageType(MESSAGE_ROOT, types); - } else { - fieldType = new GroupType(repetition, fieldName, types); - } - } - - return fieldType; - } public static MessageType convertToParquetMessageType(String name, RowType rowType) { Type[] types = new Type[rowType.getFieldCount()]; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java deleted file mode 100644 index a88cd05f0b6e3..0000000000000 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.parquet.utils; - -import org.apache.parquet.io.api.Binary; - -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; - -/** - * Utility class for decoding INT96 encoded parquet timestamp to timestamp millis in GMT. This class - * is equivalent of @see org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime, which produces - * less intermediate objects during decoding. - */ -public final class ParquetTimestampUtils { - private static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588; - private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); - private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); - - private ParquetTimestampUtils() {} - - /** - * Returns GMT timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of - * day nanos). - * - * @param timestampBinary INT96 parquet timestamp - * @return timestamp in millis, GMT timezone - */ - public static long getTimestampMillis(Binary timestampBinary) { - if (timestampBinary.length() != 12) { - throw new IllegalArgumentException( - "Parquet timestamp must be 12 bytes, actual " + timestampBinary.length()); - } - byte[] bytes = timestampBinary.getBytes(); - - // little endian encoding - need to invert byte order - long timeOfDayNanos = - ByteBuffer.wrap( - new byte[] { - bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], - bytes[1], bytes[0] - }) - .getLong(); - int julianDay = - ByteBuffer.wrap(new byte[] {bytes[11], bytes[10], bytes[9], bytes[8]}).getInt(); - - return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND); - } - - private static long julianDayToMillis(int julianDay) { - return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY; - } -} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java deleted file mode 100644 index 90f57bf3b35d9..0000000000000 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java +++ /dev/null @@ -1,541 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.parquet.utils; - -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.types.Row; - -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.Converter; -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.PrimitiveConverter; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; - -import java.lang.reflect.Array; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.parquet.schema.Type.Repetition.REPEATED; - -/** Extends from {@link GroupConverter} to convert an nested Parquet Record into Row. */ -public class RowConverter extends GroupConverter implements ParentDataHolder { - private final Converter[] converters; - private final ParentDataHolder parentDataHolder; - private final TypeInformation typeInfo; - private Row currentRow; - private int posInParentRow; - - public RowConverter(MessageType messageType, TypeInformation typeInfo) { - this(messageType, typeInfo, null, 0); - } - - public RowConverter( - GroupType schema, TypeInformation typeInfo, ParentDataHolder parent, int pos) { - this.typeInfo = typeInfo; - this.parentDataHolder = parent; - this.posInParentRow = pos; - this.converters = new Converter[schema.getFieldCount()]; - - int i = 0; - if (typeInfo.getArity() >= 1 && (typeInfo instanceof CompositeType)) { - for (Type field : schema.getFields()) { - converters[i] = - createConverter(field, i, ((CompositeType) typeInfo).getTypeAt(i), this); - i++; - } - } - } - - private static Converter createConverter( - Type field, - int fieldPos, - TypeInformation typeInformation, - ParentDataHolder parentDataHolder) { - if (field.isPrimitive()) { - return new RowConverter.RowPrimitiveConverter(field, parentDataHolder, fieldPos); - } else if (typeInformation instanceof MapTypeInfo) { - return new RowConverter.MapConverter( - (GroupType) field, (MapTypeInfo) typeInformation, parentDataHolder, fieldPos); - } else if (typeInformation instanceof BasicArrayTypeInfo) { - Type elementType = field.asGroupType().getFields().get(0); - Class typeClass = - ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass(); - - if (typeClass.equals(Character.class)) { - return new RowConverter.ArrayConverter( - elementType, - Character.class, - BasicTypeInfo.CHAR_TYPE_INFO, - parentDataHolder, - fieldPos); - } else if (typeClass.equals(Boolean.class)) { - return new RowConverter.ArrayConverter( - elementType, - Boolean.class, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - parentDataHolder, - fieldPos); - } else if (typeClass.equals(Short.class)) { - return new RowConverter.ArrayConverter( - elementType, - Short.class, - BasicTypeInfo.SHORT_TYPE_INFO, - parentDataHolder, - fieldPos); - } else if (typeClass.equals(Integer.class)) { - return new RowConverter.ArrayConverter( - elementType, - Integer.class, - BasicTypeInfo.INSTANT_TYPE_INFO, - parentDataHolder, - fieldPos); - } else if (typeClass.equals(Long.class)) { - return new RowConverter.ArrayConverter( - elementType, - Long.class, - BasicTypeInfo.LONG_TYPE_INFO, - parentDataHolder, - fieldPos); - } else if (typeClass.equals(Double.class)) { - return new RowConverter.ArrayConverter( - elementType, - Double.class, - BasicTypeInfo.DOUBLE_TYPE_INFO, - parentDataHolder, - fieldPos); - } else if (typeClass.equals(String.class)) { - return new RowConverter.ArrayConverter( - elementType, - String.class, - BasicTypeInfo.STRING_TYPE_INFO, - parentDataHolder, - fieldPos); - } else if (typeClass.equals(Date.class)) { - return new RowConverter.ArrayConverter( - elementType, Date.class, SqlTimeTypeInfo.DATE, parentDataHolder, fieldPos); - } else if (typeClass.equals(Time.class)) { - return new RowConverter.ArrayConverter