Skip to content

[MultiNodePipelineBase] perf cluster pipeline if all the keys on same node #4148

@xrayw

Description

@xrayw

I think there are some issues at here.

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
    = pipelinedResponses.entrySet().iterator();
while (pipelinedResponsesIterator.hasNext()) {
  Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
  HostAndPort nodeKey = entry.getKey();
  Queue<Response<?>> queue = entry.getValue();
  Connection connection = connections.get(nodeKey);
  executorService.submit(() -> {                        // we can run the last entry on current thread to save resources
    try {
      List<Object> unformatted = connection.getMany(queue.size());
      for (Object o : unformatted) {
        queue.poll().set(o);
      }
    } catch (JedisConnectionException jce) {
      log.error("Error with connection to " + nodeKey, jce);
      // cleanup the connection
      pipelinedResponsesIterator.remove();            // should not be used cross thread, it's not thread-safe, and when run to here, the iter may already reached to the end(hasNext=false)
      connections.remove(nodeKey);
      IOUtils.closeQuietly(connection);               // the connection should be return to pool in finally block
    } finally {
      countDownLatch.countDown();
    }
  });
}

and do we need to clear the pipelinedResponses and connections at the end? instead of remove the exceptioned one at catch block

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions