Skip to content
  •  
  •  
  •  
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ set(DUCKDB_SRC_FILES
src/duckdb/ub_src_common_row_operations.cpp
src/duckdb/ub_src_common_serializer.cpp
src/duckdb/ub_src_common_sort.cpp
src/duckdb/ub_src_common_sorting.cpp
src/duckdb/ub_src_common_tree_renderer.cpp
src/duckdb/ub_src_common_types.cpp
src/duckdb/ub_src_common_types_column.cpp
Expand Down Expand Up @@ -377,9 +376,11 @@ set(DUCKDB_SRC_FILES
src/duckdb/extension/parquet/parquet_timestamp.cpp
src/duckdb/extension/parquet/parquet_float16.cpp
src/duckdb/extension/parquet/parquet_statistics.cpp
src/duckdb/extension/parquet/parquet_shredding.cpp
src/duckdb/extension/parquet/parquet_multi_file_info.cpp
src/duckdb/extension/parquet/column_reader.cpp
src/duckdb/extension/parquet/geo_parquet.cpp
src/duckdb/extension/parquet/parquet_field_id.cpp
src/duckdb/extension/parquet/parquet_extension.cpp
src/duckdb/extension/parquet/column_writer.cpp
src/duckdb/extension/parquet/parquet_file_metadata_cache.cpp
Expand All @@ -389,6 +390,7 @@ set(DUCKDB_SRC_FILES
src/duckdb/ub_extension_parquet_reader.cpp
src/duckdb/ub_extension_parquet_reader_variant.cpp
src/duckdb/ub_extension_parquet_writer.cpp
src/duckdb/ub_extension_parquet_writer_variant.cpp
src/duckdb/third_party/parquet/parquet_types.cpp
src/duckdb/third_party/thrift/thrift/protocol/TProtocol.cpp
src/duckdb/third_party/thrift/thrift/transport/TTransportException.cpp
Expand Down

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/duckdb/extension/core_functions/function_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ static const StaticFunctionDefinition core_functions[] = {
DUCKDB_AGGREGATE_FUNCTION(ApproxTopKFun),
DUCKDB_AGGREGATE_FUNCTION_SET(ArgMaxFun),
DUCKDB_AGGREGATE_FUNCTION_SET(ArgMaxNullFun),
DUCKDB_AGGREGATE_FUNCTION_SET(ArgMaxNullsLastFun),
DUCKDB_AGGREGATE_FUNCTION_SET(ArgMinFun),
DUCKDB_AGGREGATE_FUNCTION_SET(ArgMinNullFun),
DUCKDB_AGGREGATE_FUNCTION_SET(ArgMinNullsLastFun),
DUCKDB_AGGREGATE_FUNCTION_SET_ALIAS(ArgmaxFun),
DUCKDB_AGGREGATE_FUNCTION_SET_ALIAS(ArgminFun),
DUCKDB_AGGREGATE_FUNCTION_ALIAS(ArrayAggFun),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ struct ArgMinNullFun {
static AggregateFunctionSet GetFunctions();
};

struct ArgMinNullsLastFun {
static constexpr const char *Name = "arg_min_nulls_last";
static constexpr const char *Parameters = "arg,val,N";
static constexpr const char *Description = "Finds the rows with N minimum vals, including nulls. Calculates the arg expression at that row.";
static constexpr const char *Example = "arg_min_null_val(A, B, N)";
static constexpr const char *Categories = "";

static AggregateFunctionSet GetFunctions();
};

struct ArgMaxFun {
static constexpr const char *Name = "arg_max";
static constexpr const char *Parameters = "arg,val";
Expand Down Expand Up @@ -89,6 +99,16 @@ struct ArgMaxNullFun {
static AggregateFunctionSet GetFunctions();
};

struct ArgMaxNullsLastFun {
static constexpr const char *Name = "arg_max_nulls_last";
static constexpr const char *Parameters = "arg,val,N";
static constexpr const char *Description = "Finds the rows with N maximum vals, including nulls. Calculates the arg expression at that row.";
static constexpr const char *Example = "arg_min_null_val(A, B, N)";
static constexpr const char *Categories = "";

static AggregateFunctionSet GetFunctions();
};

struct BitAndFun {
static constexpr const char *Name = "bit_and";
static constexpr const char *Parameters = "arg";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ unique_ptr<FunctionData> CurrentSettingBind(ClientContext &context, ScalarFuncti
if (!context.TryGetCurrentSetting(key, val)) {
auto extension_name = Catalog::AutoloadExtensionByConfigName(context, key);
// If autoloader didn't throw, the config is now available
if (!context.TryGetCurrentSetting(key, val)) {
throw InternalException("Extension %s did not provide the '%s' config setting",
extension_name.ToStdString(), key);
}
context.TryGetCurrentSetting(key, val);
}

bound_function.return_type = val.type();
Expand Down
6 changes: 2 additions & 4 deletions src/duckdb/extension/json/include/json_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "duckdb/common/operator/string_cast.hpp"
#include "duckdb/planner/expression/bound_function_expression.hpp"
#include "yyjson.hpp"
#include "duckdb/common/types/blob.hpp"

using namespace duckdb_yyjson; // NOLINT

Expand Down Expand Up @@ -228,11 +229,8 @@ struct JSONCommon {

static string FormatParseError(const char *data, idx_t length, yyjson_read_err &error, const string &extra = "") {
D_ASSERT(error.code != YYJSON_READ_SUCCESS);
// Go to blob so we can have a better error message for weird strings
auto blob = Value::BLOB(string(data, length));
// Truncate, so we don't print megabytes worth of JSON
string input = blob.ToString();
input = input.length() > 50 ? string(input.c_str(), 47) + "..." : input;
auto input = length > 50 ? string(data, 47) + "..." : string(data, length);
// Have to replace \r, otherwise output is unreadable
input = StringUtil::Replace(input, "\r", "\\r");
return StringUtil::Format("Malformed JSON at byte %lld of input: %s. %s Input: \"%s\"", error.pos, error.msg,
Expand Down
6 changes: 5 additions & 1 deletion src/duckdb/extension/json/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,11 @@ void JSONFunctions::RegisterSimpleCastFunctions(ExtensionLoader &loader) {
loader.RegisterCastFunction(LogicalType::LIST(LogicalType::JSON()), LogicalTypeId::VARCHAR, CastJSONListToVarchar,
json_list_to_varchar_cost);

// VARCHAR to JSON[] (also needs a special case otherwise get a VARCHAR -> VARCHAR[] cast first)
// JSON[] to JSON is allowed implicitly
loader.RegisterCastFunction(LogicalType::LIST(LogicalType::JSON()), LogicalType::JSON(), CastJSONListToVarchar,
100);

// VARCHAR to JSON[] (also needs a special case otherwise we get a VARCHAR -> VARCHAR[] cast first)
const auto varchar_to_json_list_cost =
CastFunctionSet::ImplicitCastCost(db, LogicalType::VARCHAR, LogicalType::LIST(LogicalType::JSON())) - 1;
BoundCastInfo varchar_to_json_list_info(CastVarcharToJSONList, nullptr, JSONFunctionLocalState::InitCastLocalState);
Expand Down
21 changes: 11 additions & 10 deletions src/duckdb/extension/json/json_functions/json_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ static unique_ptr<FunctionData> JSONCreateBindParams(ScalarFunction &bound_funct
auto &type = arguments[i]->return_type;
if (arguments[i]->HasParameter()) {
throw ParameterNotResolvedException();
} else if (type == LogicalTypeId::SQLNULL) {
// This is needed for macro's
bound_function.arguments.push_back(type);
} else if (object && i % 2 == 0) {
// Key, must be varchar
if (type != LogicalType::VARCHAR) {
throw BinderException("json_object() keys must be VARCHAR, add an explicit cast to argument \"%s\"",
arguments[i]->GetName());
}
bound_function.arguments.push_back(LogicalType::VARCHAR);
} else {
// Value, cast to types that we can put in JSON
Expand All @@ -128,7 +128,7 @@ static unique_ptr<FunctionData> JSONCreateBindParams(ScalarFunction &bound_funct
static unique_ptr<FunctionData> JSONObjectBind(ClientContext &context, ScalarFunction &bound_function,
vector<unique_ptr<Expression>> &arguments) {
if (arguments.size() % 2 != 0) {
throw InvalidInputException("json_object() requires an even number of arguments");
throw BinderException("json_object() requires an even number of arguments");
}
return JSONCreateBindParams(bound_function, arguments, true);
}
Expand All @@ -141,37 +141,37 @@ static unique_ptr<FunctionData> JSONArrayBind(ClientContext &context, ScalarFunc
static unique_ptr<FunctionData> ToJSONBind(ClientContext &context, ScalarFunction &bound_function,
vector<unique_ptr<Expression>> &arguments) {
if (arguments.size() != 1) {
throw InvalidInputException("to_json() takes exactly one argument");
throw BinderException("to_json() takes exactly one argument");
}
return JSONCreateBindParams(bound_function, arguments, false);
}

static unique_ptr<FunctionData> ArrayToJSONBind(ClientContext &context, ScalarFunction &bound_function,
vector<unique_ptr<Expression>> &arguments) {
if (arguments.size() != 1) {
throw InvalidInputException("array_to_json() takes exactly one argument");
throw BinderException("array_to_json() takes exactly one argument");
}
auto arg_id = arguments[0]->return_type.id();
if (arguments[0]->HasParameter()) {
throw ParameterNotResolvedException();
}
if (arg_id != LogicalTypeId::LIST && arg_id != LogicalTypeId::SQLNULL) {
throw InvalidInputException("array_to_json() argument type must be LIST");
throw BinderException("array_to_json() argument type must be LIST");
}
return JSONCreateBindParams(bound_function, arguments, false);
}

static unique_ptr<FunctionData> RowToJSONBind(ClientContext &context, ScalarFunction &bound_function,
vector<unique_ptr<Expression>> &arguments) {
if (arguments.size() != 1) {
throw InvalidInputException("row_to_json() takes exactly one argument");
throw BinderException("row_to_json() takes exactly one argument");
}
auto arg_id = arguments[0]->return_type.id();
if (arguments[0]->HasParameter()) {
throw ParameterNotResolvedException();
}
if (arguments[0]->return_type.id() != LogicalTypeId::STRUCT && arg_id != LogicalTypeId::SQLNULL) {
throw InvalidInputException("row_to_json() argument type must be STRUCT");
throw BinderException("row_to_json() argument type must be STRUCT");
}
return JSONCreateBindParams(bound_function, arguments, false);
}
Expand Down Expand Up @@ -616,6 +616,7 @@ static void CreateValues(const StructNames &names, yyjson_mut_doc *doc, yyjson_m
case LogicalTypeId::VALIDITY:
case LogicalTypeId::TABLE:
case LogicalTypeId::LAMBDA:
case LogicalTypeId::GEOMETRY: // TODO! Add support for GEOMETRY
throw InternalException("Unsupported type arrived at JSON create function");
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/duckdb/extension/json/json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ void JSONReader::OpenJSONFile() {
if (!IsOpen()) {
auto &fs = FileSystem::GetFileSystem(context);
auto regular_file_handle = fs.OpenFile(file, FileFlags::FILE_FLAGS_READ | options.compression);
file_handle = make_uniq<JSONFileHandle>(QueryContext(context), std::move(regular_file_handle),
BufferAllocator::Get(context));
file_handle = make_uniq<JSONFileHandle>(context, std::move(regular_file_handle), BufferAllocator::Get(context));
}
Reset();
}
Expand Down
100 changes: 89 additions & 11 deletions src/duckdb/extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "writer/list_column_writer.hpp"
#include "writer/primitive_column_writer.hpp"
#include "writer/struct_column_writer.hpp"
#include "writer/variant_column_writer.hpp"
#include "writer/templated_column_writer.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/operator/comparison_operators.hpp"
Expand Down Expand Up @@ -96,7 +97,7 @@ bool ColumnWriterStatistics::HasGeoStats() {
return false;
}

optional_ptr<GeometryStats> ColumnWriterStatistics::GetGeoStats() {
optional_ptr<GeometryStatsData> ColumnWriterStatistics::GetGeoStats() {
return nullptr;
}

Expand Down Expand Up @@ -181,8 +182,7 @@ void ColumnWriter::CompressPage(MemoryStream &temp_writer, size_t &compressed_si
}
}

void ColumnWriter::HandleRepeatLevels(ColumnWriterState &state, ColumnWriterState *parent, idx_t count,
idx_t max_repeat) const {
void ColumnWriter::HandleRepeatLevels(ColumnWriterState &state, ColumnWriterState *parent, idx_t count) const {
if (!parent) {
// no repeat levels without a parent node
return;
Expand Down Expand Up @@ -245,8 +245,9 @@ void ColumnWriter::HandleDefineLevels(ColumnWriterState &state, ColumnWriterStat
//===--------------------------------------------------------------------===//

ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::SchemaElement> &schemas,
const LogicalType &type, const string &name,
optional_ptr<const ChildFieldIDs> field_ids, idx_t max_repeat,
const LogicalType &type, const string &name, bool allow_geometry,
optional_ptr<const ChildFieldIDs> field_ids,
optional_ptr<const ShreddingType> shredding_types, idx_t max_repeat,
idx_t max_define, bool can_have_nulls) {
auto null_type = can_have_nulls ? FieldRepetitionType::OPTIONAL : FieldRepetitionType::REQUIRED;
if (!can_have_nulls) {
Expand All @@ -263,6 +264,70 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
child_field_ids = &field_id->child_field_ids;
}
}
optional_ptr<const ShreddingType> shredding_type;
if (shredding_types) {
shredding_type = shredding_types->GetChild(name);
}

if (type.id() == LogicalTypeId::STRUCT && type.GetAlias() == "PARQUET_VARIANT") {
// variant type
// variants are stored as follows:
// group <name> VARIANT {
// metadata BYTE_ARRAY,
// value BYTE_ARRAY,
// [<typed_value>]
// }

const bool is_shredded = shredding_type != nullptr;

child_list_t<LogicalType> child_types;
child_types.emplace_back("metadata", LogicalType::BLOB);
child_types.emplace_back("value", LogicalType::BLOB);
if (is_shredded) {
auto &typed_value_type = shredding_type->type;
if (typed_value_type.id() != LogicalTypeId::ANY) {
child_types.emplace_back("typed_value",
VariantColumnWriter::TransformTypedValueRecursive(typed_value_type));
}
}

// variant group
duckdb_parquet::SchemaElement top_element;
top_element.repetition_type = null_type;
top_element.num_children = child_types.size();
top_element.logicalType.__isset.VARIANT = true;
top_element.logicalType.VARIANT.__isset.specification_version = true;
top_element.logicalType.VARIANT.specification_version = 1;
top_element.__isset.logicalType = true;
top_element.__isset.num_children = true;
top_element.__isset.repetition_type = true;
top_element.name = name;
schemas.push_back(std::move(top_element));

ParquetColumnSchema variant_column(name, type, max_define, max_repeat, schema_idx, 0);
variant_column.children.reserve(child_types.size());
for (auto &child_type : child_types) {
auto &child_name = child_type.first;
bool is_optional;
if (child_name == "metadata") {
is_optional = false;
} else if (child_name == "value") {
if (is_shredded) {
//! When shredding the variant, the 'value' becomes optional
is_optional = true;
} else {
is_optional = false;
}
} else {
D_ASSERT(child_name == "typed_value");
is_optional = true;
}
variant_column.children.emplace_back(FillParquetSchema(schemas, child_type.second, child_type.first,
allow_geometry, child_field_ids, shredding_type,
max_repeat, max_define + 1, is_optional));
}
return variant_column;
}

if (type.id() == LogicalTypeId::STRUCT || type.id() == LogicalTypeId::UNION) {
auto &child_types = StructType::GetChildTypes(type);
Expand All @@ -285,7 +350,8 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
struct_column.children.reserve(child_types.size());
for (auto &child_type : child_types) {
struct_column.children.emplace_back(FillParquetSchema(schemas, child_type.second, child_type.first,
child_field_ids, max_repeat, max_define + 1));
allow_geometry, child_field_ids, shredding_type,
max_repeat, max_define + 1, true));
}
return struct_column;
}
Expand Down Expand Up @@ -321,8 +387,9 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
schemas.push_back(std::move(repeated_element));

ParquetColumnSchema list_column(name, type, max_define, max_repeat, schema_idx, 0);
list_column.children.push_back(
FillParquetSchema(schemas, child_type, "element", child_field_ids, max_repeat + 1, max_define + 2));
list_column.children.push_back(FillParquetSchema(schemas, child_type, "element", allow_geometry,
child_field_ids, shredding_type, max_repeat + 1,
max_define + 2, true));
return list_column;
}
if (type.id() == LogicalTypeId::MAP) {
Expand Down Expand Up @@ -369,8 +436,8 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
for (idx_t i = 0; i < 2; i++) {
// key needs to be marked as REQUIRED
bool is_key = i == 0;
auto child_schema = FillParquetSchema(schemas, kv_types[i], kv_names[i], child_field_ids, max_repeat + 1,
max_define + 2, !is_key);
auto child_schema = FillParquetSchema(schemas, kv_types[i], kv_names[i], allow_geometry, child_field_ids,
shredding_type, max_repeat + 1, max_define + 2, !is_key);

map_column.children.push_back(std::move(child_schema));
}
Expand All @@ -388,7 +455,7 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
schema_element.__isset.field_id = true;
schema_element.field_id = field_id->field_id;
}
ParquetWriter::SetSchemaProperties(type, schema_element);
ParquetWriter::SetSchemaProperties(type, schema_element, allow_geometry);
schemas.push_back(std::move(schema_element));
return ParquetColumnSchema(name, type, max_define, max_repeat, schema_idx, 0);
}
Expand All @@ -400,6 +467,17 @@ ColumnWriter::CreateWriterRecursive(ClientContext &context, ParquetWriter &write
auto &type = schema.type;
auto can_have_nulls = parquet_schemas[schema.schema_index].repetition_type == FieldRepetitionType::OPTIONAL;
path_in_schema.push_back(schema.name);

if (type.id() == LogicalTypeId::STRUCT && type.GetAlias() == "PARQUET_VARIANT") {
vector<unique_ptr<ColumnWriter>> child_writers;
child_writers.reserve(schema.children.size());
for (idx_t i = 0; i < schema.children.size(); i++) {
child_writers.push_back(
CreateWriterRecursive(context, writer, parquet_schemas, schema.children[i], path_in_schema));
}
return make_uniq<VariantColumnWriter>(writer, schema, path_in_schema, std::move(child_writers), can_have_nulls);
}

if (type.id() == LogicalTypeId::STRUCT || type.id() == LogicalTypeId::UNION) {
// construct the child writers recursively
vector<unique_ptr<ColumnWriter>> child_writers;
Expand Down
Loading
Loading