@@ -161,6 +161,12 @@ class Client::Impl {
161161
162162 void Insert (const std::string& table_name, const std::string& query_id, const Block& block);
163163
164+ std::unique_ptr<Block> BeginInsert (Query query);
165+
166+ void InsertData (Block& block);
167+
168+ void EndInsert ();
169+
164170 void Ping ();
165171
166172 void ResetConnection ();
@@ -175,6 +181,7 @@ class Client::Impl {
175181 bool Handshake ();
176182
177183 bool ReceivePacket (uint64_t * server_packet = nullptr );
184+ bool ReceivePreparePackets (uint64_t * server_packet = nullptr );
178185
179186 void SendQuery (const Query& query, bool finalize = true );
180187 void FinalizeQuery ();
@@ -208,6 +215,7 @@ class Client::Impl {
208215 }
209216
210217private:
218+ bool inserting;
211219 // / In case of network errors tries to reconnect to server and
212220 // / call fuc several times.
213221 void RetryGuard (std::function<void ()> func);
@@ -280,10 +288,13 @@ Client::Impl::Impl(const ClientOptions& opts,
280288 }
281289}
282290
283- Client::Impl::~Impl ()
284- { }
291+ Client::Impl::~Impl () {
292+ // Wrap up an insert if one is in progress.
293+ EndInsert ();
294+ }
285295
286296void Client::Impl::ExecuteQuery (Query query) {
297+ assert (!inserting);
287298 EnsureNull en (static_cast <QueryEvents*>(&query), &events_);
288299
289300 if (options_.ping_before_query ) {
@@ -299,6 +310,7 @@ void Client::Impl::ExecuteQuery(Query query) {
299310
300311
301312void Client::Impl::SelectWithExternalData (Query query, const ExternalTables& external_tables) {
313+ assert (!inserting);
302314 if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
303315 throw UnimplementedError (" This version of ClickHouse server doesn't support temporary tables" );
304316 }
@@ -362,6 +374,7 @@ std::string NameToQueryString(const std::string &input)
362374}
363375
364376void Client::Impl::Insert (const std::string& table_name, const std::string& query_id, const Block& block) {
377+ assert (!inserting);
365378 if (options_.ping_before_query ) {
366379 RetryGuard ([this ]() { Ping (); });
367380 }
@@ -397,10 +410,24 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
397410 }
398411
399412 // Send data.
413+ inserting = true ;
400414 SendData (block);
401- // Send empty block as marker of
402- // end of data.
415+ EndInsert ();
416+ }
417+
418+ void Client::Impl::InsertData (Block& block) {
419+ assert (inserting);
420+ block.RefreshRowCount ();
421+ SendData (block);
422+ block.Clear ();
423+ }
424+
425+ void Client::Impl::EndInsert () {
426+ if (!inserting) return ;
427+
428+ // Send empty block as marker of end of data.
403429 SendData (Block ());
430+ inserting = false ;
404431
405432 // Wait for EOS.
406433 uint64_t eos_packet{0 };
@@ -416,6 +443,7 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
416443}
417444
418445void Client::Impl::Ping () {
446+ assert (!inserting);
419447 WireFormat::WriteUInt64 (*output_, ClientCodes::Ping);
420448 output_->Flush ();
421449
@@ -429,6 +457,7 @@ void Client::Impl::Ping() {
429457
430458void Client::Impl::ResetConnection () {
431459 InitializeStreams (socket_factory_->connect (options_, current_endpoint_.value ()));
460+ inserting = false ;
432461
433462 if (!Handshake ()) {
434463 throw ProtocolError (" fail to connect to " + options_.host );
@@ -648,6 +677,78 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
648677 }
649678}
650679
680+ bool Client::Impl::ReceivePreparePackets (uint64_t * server_packet) {
681+ uint64_t packet_type = 0 ;
682+
683+ while (true ) {
684+ if (!WireFormat::ReadVarint64 (*input_, &packet_type)) {
685+ throw std::runtime_error (" unexpected package type " +
686+ std::to_string ((int )packet_type) + " for insert query" );
687+ }
688+ if (server_packet) {
689+ *server_packet = packet_type;
690+ }
691+
692+ switch (packet_type) {
693+ case ServerCodes::Data: {
694+ if (!ReceiveData ()) {
695+ throw ProtocolError (" can't read data packet from input stream" );
696+ }
697+ return true ;
698+ }
699+
700+ case ServerCodes::Exception: {
701+ ReceiveException ();
702+ return false ;
703+ }
704+
705+ case ServerCodes::ProfileInfo:
706+ case ServerCodes::Progress:
707+ case ServerCodes::Pong:
708+ case ServerCodes::Hello:
709+ continue ;
710+
711+ case ServerCodes::Log: {
712+ // log tag
713+ if (!WireFormat::SkipString (*input_)) {
714+ return false ;
715+ }
716+ Block block;
717+
718+ // Use uncompressed stream since log blocks usually contain only one row
719+ if (!ReadBlock (*input_, &block)) {
720+ return false ;
721+ }
722+
723+ if (events_) {
724+ events_->OnServerLog (block);
725+ }
726+ continue ;
727+ }
728+
729+ case ServerCodes::TableColumns: {
730+ // external table name
731+ if (!WireFormat::SkipString (*input_)) {
732+ return false ;
733+ }
734+
735+ // columns metadata
736+ if (!WireFormat::SkipString (*input_)) {
737+ return false ;
738+ }
739+ continue ;
740+ }
741+
742+ // No others expected.
743+ case ServerCodes::EndOfStream:
744+ case ServerCodes::ProfileEvents:
745+ default :
746+ throw UnimplementedError (" unimplemented " + std::to_string ((int )packet_type));
747+ break ;
748+ }
749+ }
750+ }
751+
651752bool Client::Impl::ReadBlock (InputStream& input, Block* block) {
652753 // Additional information about block.
653754 if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -923,7 +1024,6 @@ void Client::Impl::FinalizeQuery() {
9231024 output_->Flush ();
9241025}
9251026
926-
9271027void Client::Impl::WriteBlock (const Block& block, OutputStream& output) {
9281028 // Additional information about block.
9291029 if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -1063,7 +1163,7 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10631163 }
10641164 }
10651165 }
1066- // Connectiong with current_endpoint_ are broken.
1166+ // Connections with current_endpoint_ are broken.
10671167 // Trying to establish with the another one from the list.
10681168 size_t connection_attempts_count = GetConnectionAttempts ();
10691169 for (size_t i = 0 ; i < connection_attempts_count;)
@@ -1085,6 +1185,43 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10851185 }
10861186}
10871187
1188+ std::unique_ptr<Block> Client::Impl::BeginInsert (Query query) {
1189+ assert (!inserting);
1190+ // Arrange a query callback to extract a block that corresponds to the
1191+ // query columns.
1192+ auto block = std::make_unique<Block>();
1193+ query.OnData ([&block](const Block& b) {
1194+ for (Block::Iterator bi (b); bi.IsValid (); bi.Next ()) {
1195+ // Create the ClickHouse column type.
1196+ clickhouse::ColumnRef col = bi.Column ();
1197+ auto chtype = col->Type ();
1198+ if (chtype->GetCode () == Type::LowCardinality) {
1199+ chtype = col->As <ColumnLowCardinality>()->GetNestedType ();
1200+ }
1201+ block->AppendColumn (bi.Name (), clickhouse::CreateColumnByType (chtype->GetName ()));
1202+ }
1203+
1204+ return true ;
1205+ });
1206+
1207+
1208+ EnsureNull en (static_cast <QueryEvents*>(&query), &events_);
1209+
1210+ if (options_.ping_before_query ) {
1211+ RetryGuard ([this ]() { Ping (); });
1212+ }
1213+
1214+ SendQuery (query.GetText ());
1215+
1216+ // Receive data packet but keep the query/connection open.
1217+ if (!ReceivePreparePackets ()) {
1218+ throw std::runtime_error (" fail to receive data packet" );
1219+ }
1220+
1221+ inserting = true ;
1222+ return block;
1223+ }
1224+
10881225Client::Client (const ClientOptions& opts)
10891226 : options_(opts)
10901227 , impl_(new Impl(opts))
@@ -1149,6 +1286,27 @@ void Client::Insert(const std::string& table_name, const std::string& query_id,
11491286 impl_->Insert (table_name, query_id, block);
11501287}
11511288
1289+ std::unique_ptr<Block> Client::BeginInsert (const std::string& query) {
1290+ return impl_->BeginInsert (Query (query));
1291+ }
1292+
1293+ std::unique_ptr<Block> Client::BeginInsert (const std::string& query, const std::string& query_id) {
1294+ return impl_->BeginInsert (Query (query, query_id));
1295+ }
1296+
1297+ void Client::InsertData (Block& block) {
1298+ impl_->InsertData (block);
1299+ }
1300+
1301+ void Client::EndInsert (Block& block) {
1302+ impl_->InsertData (block);
1303+ impl_->EndInsert ();
1304+ }
1305+
1306+ void Client::EndInsert () {
1307+ impl_->EndInsert ();
1308+ }
1309+
11521310void Client::Ping () {
11531311 impl_->Ping ();
11541312}
0 commit comments