Skip to content

Commit d3b7dcd

Browse files
committed
Add BeginInsert/InsertData/EndInsert flow
Add a new pattern for "prepared inserts". It works like this: * Call `BeginInsert` with an `INSERT` query with optional columns and ending in `VALUES`. No values should be included in the string. * It returns a `Block` pre-configured with columns as declared in the `INSERT` statement * Add data to the block and periodically call `InsertData` to insert data and clear the block. * Call `EndInsert()` or just let the `Client` object go out of scope to signal the server that it's done inserting. This allows one to send smaller batches of blocks, thereby using less memory, but still in a single ClickHouse `INSERT` operation. Expected to be useful in the Postgres foreign data wrapper insert API, where multiple rows can be inserted at once but its API handles one-at-a-time insertion. It will also support the FDW COPY API, which can submit huge batches of data to insert, as well.
1 parent 6919524 commit d3b7dcd

File tree

5 files changed

+303
-9
lines changed

5 files changed

+303
-9
lines changed

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,47 @@ target_link_libraries(${PROJECT_NAME} PRIVATE clickhouse-cpp-lib)
157157
- run `rm -rf build && cmake -B build -S . && cmake --build build -j32` to remove remainders of the previous builds, run CMake and build the
158158
application. The generated binary is located in location `build/application-example`.
159159

160+
## Batch Insertion
161+
162+
In addition to the `Insert` method, which inserts all the data in a block in a
163+
single call, you can use the `BeginInsert` / `InsertData` / `EndInsert`
164+
pattern to insert batches of data. This can be useful for managing larger data
165+
sets without inflating memory with the entire set.
166+
167+
To use it pass `BeginInsert` an `INSERT` statement ending in `VALUES` but with
168+
no actual values. Use the resulting `Block` to append batches of data, sending
169+
each to the sever with `InsertData`. Finally, call `EndInsert` (or let the
170+
client go out of scope) to signal the server that insertion is complete.
171+
Example:
172+
173+
```cpp
174+
// Start the insertion.
175+
auto block = client->BeginInsert("INSERT INTO foo (id, name) VALUES");
176+
177+
// Grab the columns from the block.
178+
auto col1 = block[0]->As<ColumnUInt64>();
179+
auto col2 = block[1]->As<ColumnString>();
180+
181+
// Add a couple of records to the block.
182+
col1.Append(1);
183+
col1.Append(2);
184+
col2.Append("holden");
185+
col2.Append("naomi");
186+
187+
// Send those records.
188+
block.RefreshRowCount();
189+
client->InsertData(block);
190+
block.Clear();
191+
192+
// Add another record.
193+
col1.Append(3);
194+
col2.Append("amos");
195+
196+
// Send it and finish.
197+
block.RefreshRowCount();
198+
client->EndInsert(block);
199+
```
200+
160201
## Thread-safety
161202
⚠ Please note that `Client` instance is NOT thread-safe. I.e. you must create a separate `Client` for each thread or utilize some synchronization techniques. ⚠
162203

clickhouse/block.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ class Block {
8585
return columns_.at(idx).name;
8686
}
8787

88-
/// Convinience method to wipe out all rows from all columns
88+
/// Convenience method to wipe out all rows from all columns
8989
void Clear();
9090

91-
/// Convinience method to do Reserve() on all columns
91+
/// Convenience method to do Reserve() on all columns
9292
void Reserve(size_t new_cap);
9393

9494
/// Reference to column by index in the block.

clickhouse/client.cpp

Lines changed: 157 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ class Client::Impl {
161161

162162
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
163163

164+
Block BeginInsert(Query query);
165+
166+
void InsertData(const Block& block);
167+
168+
void EndInsert();
169+
164170
void Ping();
165171

166172
void ResetConnection();
@@ -175,6 +181,7 @@ class Client::Impl {
175181
bool Handshake();
176182

177183
bool ReceivePacket(uint64_t* server_packet = nullptr);
184+
bool ReceivePreparePackets(uint64_t* server_packet = nullptr);
178185

179186
void SendQuery(const Query& query, bool finalize = true);
180187
void FinalizeQuery();
@@ -208,6 +215,7 @@ class Client::Impl {
208215
}
209216

210217
private:
218+
bool inserting;
211219
/// In case of network errors tries to reconnect to server and
212220
/// call fuc several times.
213221
void RetryGuard(std::function<void()> func);
@@ -280,10 +288,13 @@ Client::Impl::Impl(const ClientOptions& opts,
280288
}
281289
}
282290

283-
Client::Impl::~Impl()
284-
{ }
291+
Client::Impl::~Impl() {
292+
// Wrap up an insert if one is in progress.
293+
EndInsert();
294+
}
285295

286296
void Client::Impl::ExecuteQuery(Query query) {
297+
assert(!inserting);
287298
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
288299

289300
if (options_.ping_before_query) {
@@ -299,6 +310,7 @@ void Client::Impl::ExecuteQuery(Query query) {
299310

300311

301312
void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
313+
assert(!inserting);
302314
if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
303315
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
304316
}
@@ -362,6 +374,7 @@ std::string NameToQueryString(const std::string &input)
362374
}
363375

364376
void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
377+
assert(!inserting);
365378
if (options_.ping_before_query) {
366379
RetryGuard([this]() { Ping(); });
367380
}
@@ -397,10 +410,22 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
397410
}
398411

399412
// Send data.
413+
inserting = true;
400414
SendData(block);
401-
// Send empty block as marker of
402-
// end of data.
415+
EndInsert();
416+
}
417+
418+
void Client::Impl::InsertData(const Block& block) {
419+
assert(inserting);
420+
SendData(block);
421+
}
422+
423+
void Client::Impl::EndInsert() {
424+
if (!inserting) return;
425+
426+
// Send empty block as marker of end of data.
403427
SendData(Block());
428+
inserting = false;
404429

405430
// Wait for EOS.
406431
uint64_t eos_packet{0};
@@ -416,6 +441,7 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
416441
}
417442

418443
void Client::Impl::Ping() {
444+
assert(!inserting);
419445
WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
420446
output_->Flush();
421447

@@ -429,6 +455,7 @@ void Client::Impl::Ping() {
429455

430456
void Client::Impl::ResetConnection() {
431457
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
458+
inserting = false;
432459

433460
if (!Handshake()) {
434461
throw ProtocolError("fail to connect to " + options_.host);
@@ -648,6 +675,78 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
648675
}
649676
}
650677

678+
bool Client::Impl::ReceivePreparePackets(uint64_t* server_packet) {
679+
uint64_t packet_type = 0;
680+
681+
while (true) {
682+
if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
683+
throw std::runtime_error("unexpected package type " +
684+
std::to_string((int)packet_type) + " for insert query");
685+
}
686+
if (server_packet) {
687+
*server_packet = packet_type;
688+
}
689+
690+
switch (packet_type) {
691+
case ServerCodes::Data: {
692+
if (!ReceiveData()) {
693+
throw ProtocolError("can't read data packet from input stream");
694+
}
695+
return true;
696+
}
697+
698+
case ServerCodes::Exception: {
699+
ReceiveException();
700+
return false;
701+
}
702+
703+
case ServerCodes::ProfileInfo:
704+
case ServerCodes::Progress:
705+
case ServerCodes::Pong:
706+
case ServerCodes::Hello:
707+
continue;
708+
709+
case ServerCodes::Log: {
710+
// log tag
711+
if (!WireFormat::SkipString(*input_)) {
712+
return false;
713+
}
714+
Block block;
715+
716+
// Use uncompressed stream since log blocks usually contain only one row
717+
if (!ReadBlock(*input_, &block)) {
718+
return false;
719+
}
720+
721+
if (events_) {
722+
events_->OnServerLog(block);
723+
}
724+
continue;
725+
}
726+
727+
case ServerCodes::TableColumns: {
728+
// external table name
729+
if (!WireFormat::SkipString(*input_)) {
730+
return false;
731+
}
732+
733+
// columns metadata
734+
if (!WireFormat::SkipString(*input_)) {
735+
return false;
736+
}
737+
continue;
738+
}
739+
740+
// No others expected.
741+
case ServerCodes::EndOfStream:
742+
case ServerCodes::ProfileEvents:
743+
default:
744+
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
745+
break;
746+
}
747+
}
748+
}
749+
651750
bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
652751
// Additional information about block.
653752
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -923,7 +1022,6 @@ void Client::Impl::FinalizeQuery() {
9231022
output_->Flush();
9241023
}
9251024

926-
9271025
void Client::Impl::WriteBlock(const Block& block, OutputStream& output) {
9281026
// Additional information about block.
9291027
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -1063,7 +1161,7 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10631161
}
10641162
}
10651163
}
1066-
// Connectiong with current_endpoint_ are broken.
1164+
// Connections with current_endpoint_ are broken.
10671165
// Trying to establish with the another one from the list.
10681166
size_t connection_attempts_count = GetConnectionAttempts();
10691167
for (size_t i = 0; i < connection_attempts_count;)
@@ -1085,6 +1183,38 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10851183
}
10861184
}
10871185

1186+
Block Client::Impl::BeginInsert(Query query) {
1187+
assert(!inserting);
1188+
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
1189+
1190+
if (options_.ping_before_query) {
1191+
RetryGuard([this]() { Ping(); });
1192+
}
1193+
1194+
// Arrange a query callback to extract a block that corresponds to the
1195+
// query columns.
1196+
auto block = Block();
1197+
query.OnData([&block](const Block& b) {
1198+
for (Block::Iterator bi(b); bi.IsValid(); bi.Next()) {
1199+
// Create a copy of the column type.
1200+
auto chtype = bi.Column()->Type()->GetName();
1201+
block.AppendColumn(bi.Name(), clickhouse::CreateColumnByType(chtype));
1202+
}
1203+
1204+
return true;
1205+
});
1206+
1207+
SendQuery(query.GetText());
1208+
1209+
// Receive data packet but keep the query/connection open.
1210+
if (!ReceivePreparePackets()) {
1211+
throw std::runtime_error("fail to receive data packet");
1212+
}
1213+
1214+
inserting = true;
1215+
return block;
1216+
}
1217+
10881218
Client::Client(const ClientOptions& opts)
10891219
: options_(opts)
10901220
, impl_(new Impl(opts))
@@ -1149,6 +1279,27 @@ void Client::Insert(const std::string& table_name, const std::string& query_id,
11491279
impl_->Insert(table_name, query_id, block);
11501280
}
11511281

1282+
Block Client::BeginInsert(const std::string& query) {
1283+
return impl_->BeginInsert(Query(query));
1284+
}
1285+
1286+
Block Client::BeginInsert(const std::string& query, const std::string& query_id) {
1287+
return impl_->BeginInsert(Query(query, query_id));
1288+
}
1289+
1290+
void Client::InsertData(const Block& block) {
1291+
impl_->InsertData(block);
1292+
}
1293+
1294+
void Client::EndInsert(const Block& block) {
1295+
impl_->InsertData(block);
1296+
impl_->EndInsert();
1297+
}
1298+
1299+
void Client::EndInsert() {
1300+
impl_->EndInsert();
1301+
}
1302+
11521303
void Client::Ping() {
11531304
impl_->Ping();
11541305
}

clickhouse/client.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,17 @@ class Client {
273273
void Insert(const std::string& table_name, const Block& block);
274274
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
275275

276+
/// Start an \p INSERT statement, insert batches of data, then finish the insert.
277+
Block BeginInsert(const std::string& query);
278+
Block BeginInsert(const std::string& query, const std::string& query_id);
279+
280+
/// Insert data using a \p block returned by \p BeginInsert.
281+
void InsertData(const Block& block);
282+
283+
/// End an \p INSERT session started by \p BeginInsert.
284+
void EndInsert();
285+
void EndInsert(const Block& block);
286+
276287
/// Ping server for aliveness.
277288
void Ping();
278289

0 commit comments

Comments
 (0)