Skip to content

Commit d054c36

Browse files
committed
Add BeginInsert/InsertData/EndInsert flow
Add a new pattern for "prepared inserts". It works like this: * Call `BeginInsert` with an `INSERT` query with optional columns and ending in `VALUES`. No values should be included in the string. * It returns a `Block` pre-configured with columns as declared in the `INSERT` statement * Add data to the block and periodically call `InsertData` to insert data and clear the block. * Call `EndInsert()` or just let the `Client` object go out of scope to signal the server that it's done inserting. This allows one to send smaller batches of blocks, thereby using less memory, but still in a single ClickHouse `INSERT` operation. Expected to be useful in the Postgres foreign data wrapper insert API, where multiple rows can be inserted at once but its API handles one-at-a-time insertion. It will also support the FDW COPY API, which can submit huge batches of data to insert, as well.
1 parent 6919524 commit d054c36

File tree

5 files changed

+297
-9
lines changed

5 files changed

+297
-9
lines changed

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,47 @@ target_link_libraries(${PROJECT_NAME} PRIVATE clickhouse-cpp-lib)
157157
- run `rm -rf build && cmake -B build -S . && cmake --build build -j32` to remove remainders of the previous builds, run CMake and build the
158158
application. The generated binary is located in location `build/application-example`.
159159

160+
## Batch Insertion
161+
162+
In addition to the `Insert` method, which inserts all the data in a block in a
163+
single call, you can use the `BeginInsert` / `InsertData` / `EndInsert`
164+
pattern to insert batches of data. This can be useful for managing larger data
165+
sets without inflating memory with the entire set.
166+
167+
To use it pass `BeginInsert` an `INSERT` statement ending in `VALUES` but with
168+
no actual values. Use the resulting `Block` to append batches of data, sending
169+
each to the sever with `InsertData`. Finally, call `EndInsert` (or let the
170+
client go out of scope) to signal the server that insertion is complete.
171+
Example:
172+
173+
```cpp
174+
// Start the insertion.
175+
auto block = client->BeginInsert("INSERT INTO foo (id, name) VALUES");
176+
177+
// Grab the columns from the block.
178+
auto col1 = block[0]->As<ColumnUInt64>();
179+
auto col2 = block[1]->As<ColumnString>();
180+
181+
// Add a couple of records to the block.
182+
col1.Append(1);
183+
col1.Append(2);
184+
col2.Append("holden");
185+
col2.Append("naomi");
186+
187+
// Send those records.
188+
client->InsertData(block);
189+
190+
// Add another record.
191+
col1.Append(3);
192+
col2.Append("amos");
193+
194+
// Send it and finish.
195+
client->EndInsert(block);
196+
197+
// Free the block when done.
198+
delete block;
199+
```
200+
160201
## Thread-safety
161202
⚠ Please note that `Client` instance is NOT thread-safe. I.e. you must create a separate `Client` for each thread or utilize some synchronization techniques. ⚠
162203

clickhouse/block.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ class Block {
8585
return columns_.at(idx).name;
8686
}
8787

88-
/// Convinience method to wipe out all rows from all columns
88+
/// Convenience method to wipe out all rows from all columns
8989
void Clear();
9090

91-
/// Convinience method to do Reserve() on all columns
91+
/// Convenience method to do Reserve() on all columns
9292
void Reserve(size_t new_cap);
9393

9494
/// Reference to column by index in the block.

clickhouse/client.cpp

Lines changed: 159 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ class Client::Impl {
161161

162162
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
163163

164+
Block BeginInsert(Query query);
165+
166+
void InsertData(Block& block);
167+
168+
void EndInsert();
169+
164170
void Ping();
165171

166172
void ResetConnection();
@@ -175,6 +181,7 @@ class Client::Impl {
175181
bool Handshake();
176182

177183
bool ReceivePacket(uint64_t* server_packet = nullptr);
184+
bool ReceivePreparePackets(uint64_t* server_packet = nullptr);
178185

179186
void SendQuery(const Query& query, bool finalize = true);
180187
void FinalizeQuery();
@@ -208,6 +215,7 @@ class Client::Impl {
208215
}
209216

210217
private:
218+
bool inserting;
211219
/// In case of network errors tries to reconnect to server and
212220
/// call fuc several times.
213221
void RetryGuard(std::function<void()> func);
@@ -280,10 +288,13 @@ Client::Impl::Impl(const ClientOptions& opts,
280288
}
281289
}
282290

283-
Client::Impl::~Impl()
284-
{ }
291+
Client::Impl::~Impl() {
292+
// Wrap up an insert if one is in progress.
293+
EndInsert();
294+
}
285295

286296
void Client::Impl::ExecuteQuery(Query query) {
297+
assert(!inserting);
287298
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
288299

289300
if (options_.ping_before_query) {
@@ -299,6 +310,7 @@ void Client::Impl::ExecuteQuery(Query query) {
299310

300311

301312
void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
313+
assert(!inserting);
302314
if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
303315
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
304316
}
@@ -362,6 +374,7 @@ std::string NameToQueryString(const std::string &input)
362374
}
363375

364376
void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
377+
assert(!inserting);
365378
if (options_.ping_before_query) {
366379
RetryGuard([this]() { Ping(); });
367380
}
@@ -397,10 +410,24 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
397410
}
398411

399412
// Send data.
413+
inserting = true;
400414
SendData(block);
401-
// Send empty block as marker of
402-
// end of data.
415+
EndInsert();
416+
}
417+
418+
void Client::Impl::InsertData(Block& block) {
419+
assert(inserting);
420+
block.RefreshRowCount();
421+
SendData(block);
422+
block.Clear();
423+
}
424+
425+
void Client::Impl::EndInsert() {
426+
if (!inserting) return;
427+
428+
// Send empty block as marker of end of data.
403429
SendData(Block());
430+
inserting = false;
404431

405432
// Wait for EOS.
406433
uint64_t eos_packet{0};
@@ -416,6 +443,7 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
416443
}
417444

418445
void Client::Impl::Ping() {
446+
assert(!inserting);
419447
WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
420448
output_->Flush();
421449

@@ -429,6 +457,7 @@ void Client::Impl::Ping() {
429457

430458
void Client::Impl::ResetConnection() {
431459
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
460+
inserting = false;
432461

433462
if (!Handshake()) {
434463
throw ProtocolError("fail to connect to " + options_.host);
@@ -648,6 +677,78 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
648677
}
649678
}
650679

680+
bool Client::Impl::ReceivePreparePackets(uint64_t* server_packet) {
681+
uint64_t packet_type = 0;
682+
683+
while (true) {
684+
if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
685+
throw std::runtime_error("unexpected package type " +
686+
std::to_string((int)packet_type) + " for insert query");
687+
}
688+
if (server_packet) {
689+
*server_packet = packet_type;
690+
}
691+
692+
switch (packet_type) {
693+
case ServerCodes::Data: {
694+
if (!ReceiveData()) {
695+
throw ProtocolError("can't read data packet from input stream");
696+
}
697+
return true;
698+
}
699+
700+
case ServerCodes::Exception: {
701+
ReceiveException();
702+
return false;
703+
}
704+
705+
case ServerCodes::ProfileInfo:
706+
case ServerCodes::Progress:
707+
case ServerCodes::Pong:
708+
case ServerCodes::Hello:
709+
continue;
710+
711+
case ServerCodes::Log: {
712+
// log tag
713+
if (!WireFormat::SkipString(*input_)) {
714+
return false;
715+
}
716+
Block block;
717+
718+
// Use uncompressed stream since log blocks usually contain only one row
719+
if (!ReadBlock(*input_, &block)) {
720+
return false;
721+
}
722+
723+
if (events_) {
724+
events_->OnServerLog(block);
725+
}
726+
continue;
727+
}
728+
729+
case ServerCodes::TableColumns: {
730+
// external table name
731+
if (!WireFormat::SkipString(*input_)) {
732+
return false;
733+
}
734+
735+
// columns metadata
736+
if (!WireFormat::SkipString(*input_)) {
737+
return false;
738+
}
739+
continue;
740+
}
741+
742+
// No others expected.
743+
case ServerCodes::EndOfStream:
744+
case ServerCodes::ProfileEvents:
745+
default:
746+
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
747+
break;
748+
}
749+
}
750+
}
751+
651752
bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
652753
// Additional information about block.
653754
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -923,7 +1024,6 @@ void Client::Impl::FinalizeQuery() {
9231024
output_->Flush();
9241025
}
9251026

926-
9271027
void Client::Impl::WriteBlock(const Block& block, OutputStream& output) {
9281028
// Additional information about block.
9291029
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -1063,7 +1163,7 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10631163
}
10641164
}
10651165
}
1066-
// Connectiong with current_endpoint_ are broken.
1166+
// Connections with current_endpoint_ are broken.
10671167
// Trying to establish with the another one from the list.
10681168
size_t connection_attempts_count = GetConnectionAttempts();
10691169
for (size_t i = 0; i < connection_attempts_count;)
@@ -1085,6 +1185,38 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10851185
}
10861186
}
10871187

1188+
Block Client::Impl::BeginInsert(Query query) {
1189+
assert(!inserting);
1190+
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
1191+
1192+
if (options_.ping_before_query) {
1193+
RetryGuard([this]() { Ping(); });
1194+
}
1195+
1196+
// Arrange a query callback to extract a block that corresponds to the
1197+
// query columns.
1198+
auto block = Block();
1199+
query.OnData([&block](const Block& b) {
1200+
for (Block::Iterator bi(b); bi.IsValid(); bi.Next()) {
1201+
// Create a copy of the column type.
1202+
auto chtype = bi.Column()->Type()->GetName();
1203+
block.AppendColumn(bi.Name(), clickhouse::CreateColumnByType(chtype));
1204+
}
1205+
1206+
return true;
1207+
});
1208+
1209+
SendQuery(query.GetText());
1210+
1211+
// Receive data packet but keep the query/connection open.
1212+
if (!ReceivePreparePackets()) {
1213+
throw std::runtime_error("fail to receive data packet");
1214+
}
1215+
1216+
inserting = true;
1217+
return block;
1218+
}
1219+
10881220
Client::Client(const ClientOptions& opts)
10891221
: options_(opts)
10901222
, impl_(new Impl(opts))
@@ -1149,6 +1281,27 @@ void Client::Insert(const std::string& table_name, const std::string& query_id,
11491281
impl_->Insert(table_name, query_id, block);
11501282
}
11511283

1284+
Block Client::BeginInsert(const std::string& query) {
1285+
return impl_->BeginInsert(Query(query));
1286+
}
1287+
1288+
Block Client::BeginInsert(const std::string& query, const std::string& query_id) {
1289+
return impl_->BeginInsert(Query(query, query_id));
1290+
}
1291+
1292+
void Client::InsertData(Block& block) {
1293+
impl_->InsertData(block);
1294+
}
1295+
1296+
void Client::EndInsert(Block& block) {
1297+
impl_->InsertData(block);
1298+
impl_->EndInsert();
1299+
}
1300+
1301+
void Client::EndInsert() {
1302+
impl_->EndInsert();
1303+
}
1304+
11521305
void Client::Ping() {
11531306
impl_->Ping();
11541307
}

clickhouse/client.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,13 @@ class Client {
273273
void Insert(const std::string& table_name, const Block& block);
274274
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
275275

276+
/// Start an \p INSERT statement, insert batches of data, then finish the insert.
277+
Block BeginInsert(const std::string& query);
278+
Block BeginInsert(const std::string& query, const std::string& query_id);
279+
void InsertData(Block& block);
280+
void EndInsert();
281+
void EndInsert(Block& block);
282+
276283
/// Ping server for aliveness.
277284
void Ping();
278285

0 commit comments

Comments
 (0)