Skip to content

Commit ab4c8af

Browse files
authored
fix: time-based seek_to does not work with Kafka external streams (#309)
* fix: time-based seek_to does not work with Kafka external streams * renamed IStorage.nativeSeekToSupport to IStorage.supportsNativeSeekTo * renamed IStorage.supportsNativeSeekTo to IStorage.supportsAccurateSeekTo
1 parent 74088ac commit ab4c8af

File tree

3 files changed

+13
-5
lines changed

3 files changed

+13
-5
lines changed

src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3414,7 +3414,7 @@ void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery()
34143414
/// For now, for the following scenarios, we disable backfill from historic data store
34153415
/// 1) User select some virtual columns which is only available in streaming store, like `_tp_sn`, `_tp_index_time`
34163416
/// 2) Seek by streaming store sequence number
3417-
/// 3) Replaying a stream.
3417+
/// 3) Replaying a stream.
34183418
/// TODO, ideally we shall check if historical data store has `_tp_sn` etc columns, if they have, we can backfill from
34193419
/// the historical data store as well technically. This will be a future enhancement.
34203420
const auto & settings = context->getSettingsRef();
@@ -3604,10 +3604,12 @@ void InterpreterSelectQuery::handleSeekToSetting()
36043604
if (seek_points.size() != 1)
36053605
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "It doesn't support time based `seek_to` settings for multiple shards");
36063606

3607-
/// Here we rewrite WHERE predicates of SELECT query.
3608-
/// Example : SELECT * FROM stream SETTINGS seek_to='2022-01-01 00:01:01' =>
3609-
/// SELECT * FROM stream WHERE _tp_time >= '2022-01-01 00:01:01'
3610-
addEventTimePredicate(getSelectQuery(), seek_points[0]);
3607+
/// If the storage can do accurate seek_to (for example, Kafka external streams) no extra work is needed.
3608+
/// Otherwise, the WHERE predicates of SELECT query will be rewritten by adding a filter for filtering
3609+
/// records by `_tp_time`. For example: `SELECT * FROM stream SETTINGS seek_to='2022-01-01 00:01:01'` will
3610+
/// be rewritten to `SELECT * FROM stream WHERE _tp_time >= '2022-01-01 00:01:01'`.
3611+
if (storage && !storage->supportsAccurateSeekTo())
3612+
addEventTimePredicate(getSelectQuery(), seek_points[0]);
36113613
}
36123614
else
36133615
{

src/Storages/ExternalStream/StorageExternalStream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class StorageExternalStream final : public shared_ptr_helper<StorageExternalStre
2222
void shutdown() override;
2323
bool supportsSubcolumns() const override;
2424
bool squashInsert() const noexcept override { return false; }
25+
bool supportsAccurateSeekTo() const noexcept override { return true; }
2526
NamesAndTypesList getVirtuals() const override;
2627

2728
Pipe read(

src/Storages/IStorage.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,11 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
228228
/// to skip using squashing.
229229
virtual bool squashInsert() const noexcept { return true; }
230230

231+
/// If a query uses time-based seek_to and the storage does not support accurate seek_to,
232+
/// the query will be rewritten by adding a prediction for filtering records by time
233+
/// (see InterpreterSelectQuery.cpp).
234+
virtual bool supportsAccurateSeekTo() const noexcept { return false; }
235+
231236
virtual bool supportsStreamingQuery() const { return false; }
232237
/// proton: ends.
233238

0 commit comments

Comments
 (0)