Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8ecc215
support qualified columns in queries
houqp Apr 19, 2021
696f8e0
handle coalesced hash join partition in HashJoinStream
houqp Apr 25, 2021
cdc5fb7
implement Into<Column> for &str
houqp Apr 25, 2021
723ee5d
add todo for ARROW-10971
houqp Apr 25, 2021
2ae97c4
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp Apr 26, 2021
9cf494f
fix cross join handling in production push down optimizer
houqp Apr 27, 2021
fff2e1d
maintain field order during plan optimization using projections
houqp May 10, 2021
202b87e
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp May 10, 2021
eaf1edc
change TableScane name from Option<String> to String
houqp May 15, 2021
6c674cb
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp May 15, 2021
7f253c7
WIP: fix ballista
houqp May 16, 2021
5c413dd
separate logical and physical expressions in proto, fix ballista build
houqp May 24, 2021
841159f
fix join schema handling in production push down optimizer
houqp May 24, 2021
9ab4711
tpch 7 & 8 are now passing!
houqp May 24, 2021
babb252
fix roundtrip_join test
houqp May 24, 2021
d18065b
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp May 24, 2021
bbde69a
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp Jun 1, 2021
6aaa148
fix clippy warnings
houqp Jun 2, 2021
040d28c
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp Jun 2, 2021
2ef668d
fix sql planner test error checking with matches
houqp Jun 5, 2021
9513c19
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp Jun 5, 2021
7b70f04
address FIXMEs
houqp Jun 6, 2021
fd3005f
honor datafusion field name semantic
houqp Jun 12, 2021
0782ee2
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp Jun 12, 2021
a9ba4c6
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp Jun 12, 2021
071f86b
add more comment
houqp Jun 12, 2021
80a5168
enable more queries in benchmark/run.sh
houqp Jun 12, 2021
713fbe1
use unzip to avoid unnecessary iterators
houqp Jun 12, 2021
e4677b9
reduce diff by discarding style related changes
houqp Jun 13, 2021
6f6ecdf
simplify hash_join tests
houqp Jun 13, 2021
1643617
reduce diff for easier revuew
houqp Jun 13, 2021
660af1e
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp Jun 14, 2021
cad0d5e
fix unnecessary reference clippy error
houqp Jun 14, 2021
7b72c9c
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp Jun 15, 2021
593785a
incorporate code review feedback
houqp Jun 15, 2021
071d8ac
Merge remote-tracking branch 'upstream/master' into HEAD
houqp Jun 20, 2021
d26b54c
fix window schema handling in projection pushdown optimizer
houqp Jun 20, 2021
9c1e94d
Merge remote-tracking branch 'upstream/master' into qp_qualified
houqp Jun 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 155 additions & 20 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,29 @@ option java_outer_classname = "BallistaProto";
// Ballista Logical Plan
///////////////////////////////////////////////////////////////////////////////////////////////////

message ColumnRelation {
string relation = 1;
}

message Column {
string name = 1;
ColumnRelation relation = 2;
}

message DfField{
Field field = 1;
ColumnRelation qualifier = 2;
}

message DfSchema {
repeated DfField columns = 1;
}

// logical expressions
message LogicalExprNode {
oneof ExprType {
// column references
string column_name = 1;
Column column = 1;

// alias
AliasNode alias = 2;
Expand Down Expand Up @@ -295,7 +313,7 @@ message CreateExternalTableNode{
string location = 2;
FileType file_type = 3;
bool has_header = 4;
Schema schema = 5;
DfSchema schema = 5;
}

enum FileType{
Expand All @@ -309,11 +327,6 @@ message ExplainNode{
bool verbose = 2;
}

message DfField{
string qualifier = 2;
Field field = 1;
}

message AggregateNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode group_expr = 2;
Expand Down Expand Up @@ -369,8 +382,8 @@ message JoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
JoinType join_type = 3;
repeated string left_join_column = 4;
repeated string right_join_column = 5;
repeated Column left_join_column = 4;
repeated Column right_join_column = 5;
}

message LimitNode {
Expand Down Expand Up @@ -408,6 +421,119 @@ message PhysicalPlanNode {
}
}

// physical expressions
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Physical expressions are now serialized and deserialized without involving logical expressions. It is needed because logical columns and physical columns are represented differently, one with relation qualifier and the other one with column index. This should also speed up the deserialization for physical plane because we won't need to run physical expressions through physical planner anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a very common tactic in other systems and so 👍

My only regret (?) is that I have spent non trivial amounts of time tracking down bugs when the offsets get messed up -- lol! But I don't have any better suggestions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i have the same feeling when it comes to debugging index offset issues :(

message PhysicalExprNode {
oneof ExprType {
// column references
PhysicalColumn column = 1;

ScalarValue literal = 2;

// binary expressions
PhysicalBinaryExprNode binary_expr = 3;

// aggregate expressions
PhysicalAggregateExprNode aggregate_expr = 4;

// null checks
PhysicalIsNull is_null_expr = 5;
PhysicalIsNotNull is_not_null_expr = 6;
PhysicalNot not_expr = 7;

PhysicalCaseNode case_ = 8;
PhysicalCastNode cast = 9;
PhysicalSortExprNode sort = 10;
PhysicalNegativeNode negative = 11;
PhysicalInListNode in_list = 12;
PhysicalScalarFunctionNode scalar_function = 13;
PhysicalTryCastNode try_cast = 14;

// window expressions
PhysicalWindowExprNode window_expr = 15;
}
}

message PhysicalAggregateExprNode {
AggregateFunction aggr_function = 1;
PhysicalExprNode expr = 2;
}

message PhysicalWindowExprNode {
oneof window_function {
AggregateFunction aggr_function = 1;
BuiltInWindowFunction built_in_function = 2;
// udaf = 3
}
PhysicalExprNode expr = 4;
}

message PhysicalIsNull {
PhysicalExprNode expr = 1;
}

message PhysicalIsNotNull {
PhysicalExprNode expr = 1;
}

message PhysicalNot {
PhysicalExprNode expr = 1;
}

message PhysicalAliasNode {
PhysicalExprNode expr = 1;
string alias = 2;
}

message PhysicalBinaryExprNode {
PhysicalExprNode l = 1;
PhysicalExprNode r = 2;
string op = 3;
}

message PhysicalSortExprNode {
PhysicalExprNode expr = 1;
bool asc = 2;
bool nulls_first = 3;
}

message PhysicalWhenThen {
PhysicalExprNode when_expr = 1;
PhysicalExprNode then_expr = 2;
}

message PhysicalInListNode {
PhysicalExprNode expr = 1;
repeated PhysicalExprNode list = 2;
bool negated = 3;
}

message PhysicalCaseNode {
PhysicalExprNode expr = 1;
repeated PhysicalWhenThen when_then_expr = 2;
PhysicalExprNode else_expr = 3;
}

message PhysicalScalarFunctionNode {
string name = 1;
ScalarFunction fun = 2;
repeated PhysicalExprNode args = 3;
ArrowType return_type = 4;
}

message PhysicalTryCastNode {
PhysicalExprNode expr = 1;
ArrowType arrow_type = 2;
}

message PhysicalCastNode {
PhysicalExprNode expr = 1;
ArrowType arrow_type = 2;
}

message PhysicalNegativeNode {
PhysicalExprNode expr = 1;
}

message UnresolvedShuffleExecNode {
repeated uint32 query_stage_ids = 1;
Schema schema = 2;
Expand All @@ -416,7 +542,7 @@ message UnresolvedShuffleExecNode {

message FilterExecNode {
PhysicalPlanNode input = 1;
LogicalExprNode expr = 2;
PhysicalExprNode expr = 2;
}

message ParquetScanExecNode {
Expand Down Expand Up @@ -447,11 +573,15 @@ message HashJoinExecNode {

}

message JoinOn {
string left = 1;
string right = 2;
message PhysicalColumn {
string name = 1;
uint32 index = 2;
}

message JoinOn {
PhysicalColumn left = 1;
PhysicalColumn right = 2;
}

message EmptyExecNode {
bool produce_one_row = 1;
Expand All @@ -460,7 +590,7 @@ message EmptyExecNode {

message ProjectionExecNode {
PhysicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
repeated PhysicalExprNode expr = 2;
repeated string expr_name = 3;
}

Expand All @@ -472,14 +602,14 @@ enum AggregateMode {

message WindowAggExecNode {
PhysicalPlanNode input = 1;
repeated LogicalExprNode window_expr = 2;
repeated PhysicalExprNode window_expr = 2;
repeated string window_expr_name = 3;
Schema input_schema = 4;
}

message HashAggregateExecNode {
repeated LogicalExprNode group_expr = 1;
repeated LogicalExprNode aggr_expr = 2;
repeated PhysicalExprNode group_expr = 1;
repeated PhysicalExprNode aggr_expr = 2;
AggregateMode mode = 3;
PhysicalPlanNode input = 4;
repeated string group_expr_name = 5;
Expand Down Expand Up @@ -510,7 +640,7 @@ message LocalLimitExecNode {

message SortExecNode {
PhysicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
repeated PhysicalExprNode expr = 2;
}

message CoalesceBatchesExecNode {
Expand All @@ -522,11 +652,16 @@ message MergeExecNode {
PhysicalPlanNode input = 1;
}

message PhysicalHashRepartition {
repeated PhysicalExprNode hash_expr = 1;
uint64 partition_count = 2;
}

message RepartitionExecNode{
PhysicalPlanNode input = 1;
oneof partition_method {
uint64 round_robin = 2;
HashRepartition hash = 3;
PhysicalHashRepartition hash = 3;
uint64 unknown = 4;
}
}
Expand Down Expand Up @@ -803,7 +938,7 @@ message ScalarListValue{


message ScalarValue{
oneof value{
oneof value {
bool bool_value = 1;
string utf8_value = 2;
string large_utf8_value = 3;
Expand Down
Loading