Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,47 @@ target_link_libraries(${PROJECT_NAME} PRIVATE clickhouse-cpp-lib)
- run `rm -rf build && cmake -B build -S . && cmake --build build -j32` to remove remainders of the previous builds, run CMake and build the
application. The generated binary is located in location `build/application-example`.

## Batch Insertion

In addition to the `Insert` method, which inserts all the data in a block in a
single call, you can use the `BeginInsert` / `InsertData` / `EndInsert`
pattern to insert batches of data. This can be useful for managing larger data
sets without inflating memory with the entire set.

To use it pass `BeginInsert` an `INSERT` statement ending in `VALUES` but with
no actual values. Use the resulting `Block` to append batches of data, sending
each to the sever with `InsertData`. Finally, call `EndInsert` (or let the
client go out of scope) to signal the server that insertion is complete.
Example:

```cpp
// Start the insertion.
auto block = client->BeginInsert("INSERT INTO foo (id, name) VALUES");

// Grab the columns from the block.
auto col1 = block[0]->As<ColumnUInt64>();
auto col2 = block[1]->As<ColumnString>();

// Add a couple of records to the block.
col1.Append(1);
col1.Append(2);
col2.Append("holden");
col2.Append("naomi");

// Send those records.
block.RefreshRowCount();
client->InsertData(block);
block.Clear();

// Add another record.
col1.Append(3);
col2.Append("amos");

// Send it and finish.
block.RefreshRowCount();
client->EndInsert(block);
```

## Thread-safety
⚠ Please note that `Client` instance is NOT thread-safe. I.e. you must create a separate `Client` for each thread or utilize some synchronization techniques. ⚠

Expand Down
4 changes: 2 additions & 2 deletions clickhouse/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ class Block {
return columns_.at(idx).name;
}

/// Convinience method to wipe out all rows from all columns
/// Convenience method to wipe out all rows from all columns
void Clear();

/// Convinience method to do Reserve() on all columns
/// Convenience method to do Reserve() on all columns
void Reserve(size_t new_cap);

/// Reference to column by index in the block.
Expand Down
163 changes: 157 additions & 6 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ class Client::Impl {

void Insert(const std::string& table_name, const std::string& query_id, const Block& block);

Block BeginInsert(Query query);

void InsertData(const Block& block);

void EndInsert();

void Ping();

void ResetConnection();
Expand All @@ -175,6 +181,7 @@ class Client::Impl {
bool Handshake();

bool ReceivePacket(uint64_t* server_packet = nullptr);
bool ReceivePreparePackets(uint64_t* server_packet = nullptr);

void SendQuery(const Query& query, bool finalize = true);
void FinalizeQuery();
Expand Down Expand Up @@ -208,6 +215,7 @@ class Client::Impl {
}

private:
bool inserting;
/// In case of network errors tries to reconnect to server and
/// call fuc several times.
void RetryGuard(std::function<void()> func);
Expand Down Expand Up @@ -280,10 +288,13 @@ Client::Impl::Impl(const ClientOptions& opts,
}
}

Client::Impl::~Impl()
{ }
Client::Impl::~Impl() {
// Wrap up an insert if one is in progress.
EndInsert();
}

void Client::Impl::ExecuteQuery(Query query) {
assert(!inserting);
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);

if (options_.ping_before_query) {
Expand All @@ -299,6 +310,7 @@ void Client::Impl::ExecuteQuery(Query query) {


void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
assert(!inserting);
if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
}
Expand Down Expand Up @@ -362,6 +374,7 @@ std::string NameToQueryString(const std::string &input)
}

void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
assert(!inserting);
if (options_.ping_before_query) {
RetryGuard([this]() { Ping(); });
}
Expand Down Expand Up @@ -397,10 +410,22 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
}

// Send data.
inserting = true;
SendData(block);
// Send empty block as marker of
// end of data.
EndInsert();
}

void Client::Impl::InsertData(const Block& block) {
assert(inserting);
SendData(block);
}

void Client::Impl::EndInsert() {
if (!inserting) return;

// Send empty block as marker of end of data.
SendData(Block());
inserting = false;

// Wait for EOS.
uint64_t eos_packet{0};
Expand All @@ -416,6 +441,7 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
}

void Client::Impl::Ping() {
assert(!inserting);
WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
output_->Flush();

Expand All @@ -429,6 +455,7 @@ void Client::Impl::Ping() {

void Client::Impl::ResetConnection() {
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
inserting = false;

if (!Handshake()) {
throw ProtocolError("fail to connect to " + options_.host);
Expand Down Expand Up @@ -648,6 +675,78 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
}
}

bool Client::Impl::ReceivePreparePackets(uint64_t* server_packet) {
uint64_t packet_type = 0;

while (true) {
if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
throw std::runtime_error("unexpected package type " +
std::to_string((int)packet_type) + " for insert query");
}
if (server_packet) {
*server_packet = packet_type;
}

switch (packet_type) {
case ServerCodes::Data: {
if (!ReceiveData()) {
throw ProtocolError("can't read data packet from input stream");
}
return true;
}

case ServerCodes::Exception: {
ReceiveException();
return false;
}

case ServerCodes::ProfileInfo:
case ServerCodes::Progress:
case ServerCodes::Pong:
case ServerCodes::Hello:
continue;

case ServerCodes::Log: {
// log tag
if (!WireFormat::SkipString(*input_)) {
return false;
}
Block block;

// Use uncompressed stream since log blocks usually contain only one row
if (!ReadBlock(*input_, &block)) {
return false;
}

if (events_) {
events_->OnServerLog(block);
}
continue;
}

case ServerCodes::TableColumns: {
// external table name
if (!WireFormat::SkipString(*input_)) {
return false;
}

// columns metadata
if (!WireFormat::SkipString(*input_)) {
return false;
}
continue;
}

// No others expected.
case ServerCodes::EndOfStream:
case ServerCodes::ProfileEvents:
default:
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
break;
}
}
}

bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
// Additional information about block.
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
Expand Down Expand Up @@ -923,7 +1022,6 @@ void Client::Impl::FinalizeQuery() {
output_->Flush();
}


void Client::Impl::WriteBlock(const Block& block, OutputStream& output) {
// Additional information about block.
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
Expand Down Expand Up @@ -1063,7 +1161,7 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
}
}
}
// Connectiong with current_endpoint_ are broken.
// Connections with current_endpoint_ are broken.
// Trying to establish with the another one from the list.
size_t connection_attempts_count = GetConnectionAttempts();
for (size_t i = 0; i < connection_attempts_count;)
Expand All @@ -1085,6 +1183,38 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
}
}

Block Client::Impl::BeginInsert(Query query) {
assert(!inserting);
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);

if (options_.ping_before_query) {
RetryGuard([this]() { Ping(); });
}

// Arrange a query callback to extract a block that corresponds to the
// query columns.
auto block = Block();
query.OnData([&block](const Block& b) {
for (Block::Iterator bi(b); bi.IsValid(); bi.Next()) {
// Create a copy of the column type.
auto chtype = bi.Column()->Type()->GetName();
block.AppendColumn(bi.Name(), clickhouse::CreateColumnByType(chtype));
}

return true;
});

SendQuery(query.GetText());

// Receive data packet but keep the query/connection open.
if (!ReceivePreparePackets()) {
throw std::runtime_error("fail to receive data packet");
}

inserting = true;
return block;
}

Client::Client(const ClientOptions& opts)
: options_(opts)
, impl_(new Impl(opts))
Expand Down Expand Up @@ -1149,6 +1279,27 @@ void Client::Insert(const std::string& table_name, const std::string& query_id,
impl_->Insert(table_name, query_id, block);
}

Block Client::BeginInsert(const std::string& query) {
return impl_->BeginInsert(Query(query));
}

Block Client::BeginInsert(const std::string& query, const std::string& query_id) {
return impl_->BeginInsert(Query(query, query_id));
}

void Client::InsertData(const Block& block) {
impl_->InsertData(block);
}

void Client::EndInsert(const Block& block) {
impl_->InsertData(block);
impl_->EndInsert();
}

void Client::EndInsert() {
impl_->EndInsert();
}

void Client::Ping() {
impl_->Ping();
}
Expand Down
11 changes: 11 additions & 0 deletions clickhouse/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,17 @@ class Client {
void Insert(const std::string& table_name, const Block& block);
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);

/// Start an \p INSERT statement, insert batches of data, then finish the insert.
Block BeginInsert(const std::string& query);
Block BeginInsert(const std::string& query, const std::string& query_id);

/// Insert data using a \p block returned by \p BeginInsert.
void InsertData(const Block& block);

/// End an \p INSERT session started by \p BeginInsert.
void EndInsert();
void EndInsert(const Block& block);

/// Ping server for aliveness.
void Ping();

Expand Down
Loading
Loading