Skip to content

Producer/consumer connect corrupts libuv async queue #161

@trevorr

Description

@trevorr

We recently switched from KafkaJS to this library, which has mostly been great, but we've been hitting an intermittent hang. I don't have a shareable reproducible test case yet, but I can explain where and why the issue arises. It affects any OS and Node version.

It was a regression introduced in 34e4bd3 by moving ActivateDispatchers from HandleOKCallback to Execute in ProducerConnect (here) and KafkaConsumerConnect (here). That method results in a call to uv_async_init, which is not thread safe and must be called from the event loop thread. HandleOKCallback and HandleErrorCallback are called from that thread, but Execute is called from a worker thread.

To demonstrate the issue, I added an assertion that dumps a stack trace whenever uv_async_init is called from a different thread. It ends up failing here:

0   confluent-kafka-javascript.node     0x000000011a0186f8 _ZN9NodeKafka18assert_main_threadEv + 228
1   confluent-kafka-javascript.node     0x000000011a0047a4 _ZN9NodeKafka9Callbacks10Dispatcher8ActivateEv + 64
2   confluent-kafka-javascript.node     0x000000011a029180 _ZN9NodeKafka4Conf6listenEv + 64
3   confluent-kafka-javascript.node     0x000000011a02e228 _ZN9NodeKafka13KafkaConsumer19ActivateDispatchersEv + 32
4   confluent-kafka-javascript.node     0x000000011a040010 _ZN9NodeKafka7Workers20KafkaConsumerConnect7ExecuteEv + 40
5   confluent-kafka-javascript.node     0x000000011a02c8f8 _ZN3Nan12AsyncExecuteEP9uv_work_s + 44
6   node                                0x00000001033616d4 worker + 248
7   libsystem_pthread.dylib             0x00000001867372e4 _pthread_start + 136
8   libsystem_pthread.dylib             0x00000001867320fc thread_start + 8
Assertion failed: (false), function assert_main_thread, file common.cc, line 46.

I believe those ActivateDispatchers calls should be moved back to HandleOKCallback. If that's too late for the scenario mentioned in the comment, they could possibly be moved to the worker constructor. I'm happy to submit a PR doing one of those things, but wanted to get the discussion started first.

Background

When I attached a debugger to our hung Node process, it was in an infinite loop here in libuv:

  while (!uv__queue_empty(&queue)) {
    q = uv__queue_head(&queue);
    h = uv__queue_data(q, uv_async_t, queue);

    uv__queue_remove(q);
    uv__queue_insert_tail(&loop->async_handles, q);

    /* Atomically fetch and clear pending flag */
    pending = (_Atomic int*) &h->pending;
    if (atomic_exchange(pending, 0) == 0)
      continue;

That loop is supposed to move all current async handles to a temporary queue, then put them back in the async handles queue one at a time, calling the async callback if it is defined and pending is set. However, it ends up with the same handle each time:

* thread #1, queue = 'com.apple.main-thread', stop reason = step over
    frame #0: 0x0000000105154cc8 node`uv__async_io(loop=0x00000001087a0c90, w=<unavailable>, events=<unavailable>) at async.c:165:5 [opt]
   162 	    q = uv__queue_head(&queue);
   163 	    h = uv__queue_data(q, uv_async_t, queue);
   164 	
-> 165 	    uv__queue_remove(q);
   166 	    uv__queue_insert_tail(&loop->async_handles, q);
   167 	
   168 	    /* Atomically fetch and clear pending flag */
(llnode) p queue
(uv__queue) {
  next = 0x0000000134807748
  prev = 0x000000012381f790
}
(llnode) n
Process 22379 stopped
* thread #1, queue = 'com.apple.main-thread', stop reason = step over
    frame #0: 0x0000000105154cd8 node`uv__async_io(loop=0x00000001087a0c90, w=<unavailable>, events=<unavailable>) at async.c:166:5 [opt]
   163 	    h = uv__queue_data(q, uv_async_t, queue);
   164 	
   165 	    uv__queue_remove(q);
-> 166 	    uv__queue_insert_tail(&loop->async_handles, q);
   167 	
   168 	    /* Atomically fetch and clear pending flag */
   169 	    pending = (_Atomic int*) &h->pending;
(llnode) p queue
(uv__queue) {
  next = 0x00000001087a0ee8
  prev = 0x000000012381f790
}
(llnode) n
Process 22379 stopped
* thread #1, queue = 'com.apple.main-thread', stop reason = step over
    frame #0: 0x0000000105154cec node`uv__async_io(loop=0x00000001087a0c90, w=<unavailable>, events=<unavailable>) at async.c:169:34 [opt]
   166 	    uv__queue_insert_tail(&loop->async_handles, q);
   167 	
   168 	    /* Atomically fetch and clear pending flag */
-> 169 	    pending = (_Atomic int*) &h->pending;
   170 	    if (atomic_exchange(pending, 0) == 0)
   171 	      continue;
   172 	
(llnode) p queue
(uv__queue) {
  next = 0x0000000134807748 // it's back in queue?????????????????
  prev = 0x000000012381f790
}

This should only be possible if both queues share nodes, which they're not supposed to. However, queue->next == loop->async_handles->prev:

(llnode) p queue // temp queue
(uv__queue) {
  next = 0x0000000134807748 // !!!
  prev = 0x000000012381f790
}
(llnode) p loop->async_handles // event loop async queue
(uv__queue) {
  next = 0x00000001087a0dc0
  prev = 0x0000000134807748 // !!!
}

I believe this is caused by calling uv_async_init from the wrong thread. Our app creates several Kafka producers and consumers at startup, which makes this more likely to hit. A standalone reproduction would likely involve connecting and disconnecting producers and/or consumers in a loop until the process hangs.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingfixed-present-in-next-releaseBug or improvement that's done, it is in the development branch but yet unreleasedgreat report

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions