@@ -112,16 +112,27 @@ struct conn_info_head {
112112 ipc::shm::handle acc_h_;
113113
114114 conn_info_head (char const * prefix, char const * name)
115- : prefix_ {ipc::make_string (prefix)}
116- , name_ {ipc::make_string (name)}
117- , cc_id_ {}
118- , cc_waiter_{ipc::make_prefix (prefix_, {" CC_CONN__" , name_}).c_str ()}
119- , wt_waiter_{ipc::make_prefix (prefix_, {" WT_CONN__" , name_}).c_str ()}
120- , rd_waiter_{ipc::make_prefix (prefix_, {" RD_CONN__" , name_}).c_str ()}
121- , acc_h_ {ipc::make_prefix (prefix_, {" AC_CONN__" , name_}).c_str (), sizeof (acc_t )} {
115+ : prefix_{ipc::make_string (prefix)}
116+ , name_ {ipc::make_string (name)}
117+ , cc_id_ {} {}
118+
119+ void init () {
120+ if (!cc_waiter_.valid ()) cc_waiter_.open (ipc::make_prefix (prefix_, {" CC_CONN__" , name_}).c_str ());
121+ if (!wt_waiter_.valid ()) wt_waiter_.open (ipc::make_prefix (prefix_, {" WT_CONN__" , name_}).c_str ());
122+ if (!rd_waiter_.valid ()) rd_waiter_.open (ipc::make_prefix (prefix_, {" RD_CONN__" , name_}).c_str ());
123+ if (!acc_h_.valid ()) acc_h_.acquire (ipc::make_prefix (prefix_, {" AC_CONN__" , name_}).c_str (), sizeof (acc_t ));
124+ if (cc_id_ != 0 ) {
125+ return ;
126+ }
122127 acc_t *pacc = cc_acc (prefix_);
123- if (pacc != nullptr ) {
124- cc_id_ = pacc->fetch_add (1 , std::memory_order_relaxed);
128+ if (pacc == nullptr ) {
129+ // Failed to obtain the global accumulator.
130+ return ;
131+ }
132+ cc_id_ = pacc->fetch_add (1 , std::memory_order_relaxed) + 1 ;
133+ if (cc_id_ == 0 ) {
134+ // The identity cannot be 0.
135+ cc_id_ = pacc->fetch_add (1 , std::memory_order_relaxed) + 1 ;
125136 }
126137 }
127138
@@ -362,12 +373,18 @@ struct queue_generator {
362373 queue_t que_;
363374
364375 conn_info_t (char const * pref, char const * name)
365- : conn_info_head{pref, name}
366- , que_{ipc::make_prefix (prefix_, {
367- " QU_CONN__" ,
368- ipc::to_string (DataSize), " __" ,
369- ipc::to_string (AlignSize), " __" ,
370- name}).c_str ()} {}
376+ : conn_info_head{pref, name} { init (); }
377+
378+ void init () {
379+ conn_info_head::init ();
380+ if (!que_.valid ()) {
381+ que_.open (ipc::make_prefix (prefix_, {
382+ " QU_CONN__" ,
383+ ipc::to_string (DataSize), " __" ,
384+ ipc::to_string (AlignSize), " __" ,
385+ this ->name_ }).c_str ());
386+ }
387+ }
371388
372389 void disconnect_receiver () {
373390 bool dis = que_.disconnect ();
@@ -397,6 +414,18 @@ constexpr static queue_t* queue_of(ipc::handle_t h) noexcept {
397414
398415/* API implementations */
399416
417+ static bool connect (ipc::handle_t * ph, ipc::prefix pref, char const * name, bool start_to_recv) {
418+ assert (ph != nullptr );
419+ if (*ph == nullptr ) {
420+ *ph = ipc::mem::alloc<conn_info_t >(pref.str , name);
421+ }
422+ return reconnect (ph, start_to_recv);
423+ }
424+
425+ static bool connect (ipc::handle_t * ph, char const * name, bool start_to_recv) {
426+ return connect (ph, {nullptr }, name, start_to_recv);
427+ }
428+
400429static void disconnect (ipc::handle_t h) {
401430 auto que = queue_of (h);
402431 if (que == nullptr ) {
@@ -414,6 +443,7 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) {
414443 if (que == nullptr ) {
415444 return false ;
416445 }
446+ info_of (*ph)->init ();
417447 if (start_to_recv) {
418448 que->shut_sending ();
419449 if (que->connect ()) { // wouldn't connect twice
@@ -429,18 +459,6 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) {
429459 return que->ready_sending ();
430460}
431461
432- static bool connect (ipc::handle_t * ph, ipc::prefix pref, char const * name, bool start_to_recv) {
433- assert (ph != nullptr );
434- if (*ph == nullptr ) {
435- *ph = ipc::mem::alloc<conn_info_t >(pref.str , name);
436- }
437- return reconnect (ph, start_to_recv);
438- }
439-
440- static bool connect (ipc::handle_t * ph, char const * name, bool start_to_recv) {
441- return connect (ph, {nullptr }, name, start_to_recv);
442- }
443-
444462static void destroy (ipc::handle_t h) {
445463 disconnect (h);
446464 ipc::mem::free (info_of (h));
0 commit comments