@@ -43,6 +43,7 @@ KafkaSource::KafkaSource(
43
43
, max_block_size(max_block_size_)
44
44
, log(log_)
45
45
, header(header_)
46
+ , non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized ())
46
47
, consume_ctx(kafka->topic (), shard, offset)
47
48
, read_buffer(" " , 0 )
48
49
, virtual_time_columns_calc(header.columns(), nullptr)
@@ -178,24 +179,29 @@ void KafkaSource::parseRaw(const rd_kafka_message_t * kmessage)
178
179
void KafkaSource::parseFormat (const rd_kafka_message_t * kmessage)
179
180
{
180
181
assert (format_executor);
182
+ assert (convert_non_virtual_to_physical_action);
181
183
182
184
ReadBufferFromMemory buffer (static_cast <const char *>(kmessage->payload ), kmessage->len );
183
185
auto new_rows = format_executor->execute (buffer);
184
186
if (!new_rows)
185
187
return ;
186
188
189
+ auto result_block = non_virtual_header.cloneWithColumns (format_executor->getResultColumns ());
190
+ convert_non_virtual_to_physical_action->execute (result_block);
191
+
192
+ MutableColumns new_data (result_block.mutateColumns ());
193
+
187
194
if (!request_virtual_columns)
188
195
{
189
196
if (!current_batch.empty ())
190
197
{
191
198
// / Merge all data in the current batch into the same chunk to avoid too many small chunks
192
- auto new_data (format_executor->getResultColumns ());
193
199
for (size_t pos = 0 ; pos < current_batch.size (); ++pos)
194
200
current_batch[pos]->insertRangeFrom (*new_data[pos], 0 , new_rows);
195
201
}
196
202
else
197
203
{
198
- current_batch = format_executor-> getResultColumns ( );
204
+ current_batch = std::move (new_data );
199
205
}
200
206
}
201
207
else
@@ -206,7 +212,6 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage)
206
212
assert (current_batch.size () == virtual_time_columns_calc.size ());
207
213
208
214
// / slower path
209
- auto new_data (format_executor->getResultColumns ());
210
215
for (size_t i = 0 , j = 0 , n = virtual_time_columns_calc.size (); i < n; ++i)
211
216
{
212
217
if (!virtual_time_columns_calc[i])
@@ -224,7 +229,6 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage)
224
229
else
225
230
{
226
231
// / slower path
227
- auto new_data (format_executor->getResultColumns ());
228
232
for (size_t i = 0 , j = 0 , n = virtual_time_columns_calc.size (); i < n; ++i)
229
233
{
230
234
if (!virtual_time_columns_calc[i])
@@ -272,10 +276,17 @@ void KafkaSource::initFormatExecutor(const Kafka * kafka)
272
276
if (!data_format.empty ())
273
277
{
274
278
auto input_format
275
- = FormatFactory::instance ().getInputFormat (data_format, read_buffer, physical_header , query_context, max_block_size);
279
+ = FormatFactory::instance ().getInputFormat (data_format, read_buffer, non_virtual_header , query_context, max_block_size);
276
280
277
281
format_executor = std::make_unique<StreamingFormatExecutor>(
278
- physical_header, std::move (input_format), [](const MutableColumns &, Exception &) -> size_t { return 0 ; });
282
+ non_virtual_header, std::move (input_format), [](const MutableColumns &, Exception &) -> size_t { return 0 ; });
283
+
284
+ auto converting_dag = ActionsDAG::makeConvertingActions (
285
+ non_virtual_header.cloneEmpty ().getColumnsWithTypeAndName (),
286
+ physical_header.cloneEmpty ().getColumnsWithTypeAndName (),
287
+ ActionsDAG::MatchColumnsMode::Name);
288
+
289
+ convert_non_virtual_to_physical_action = std::make_shared<ExpressionActions>(std::move (converting_dag));
279
290
}
280
291
}
281
292
0 commit comments