Skip to content

KafkaJS.Consumer: unpause function returned by consumer.pause() incorrectly unpauses all partitions #63

@justjake

Description

@justjake

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: macOS 14.5 (23F79)
  • Node Version [e.g. 8.2.1]: v18.17.0
  • NPM Version [e.g. 5.4.2]: 9.6.7
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: Xcode 14
  • confluent-kafka-javascript version [e.g. 2.3.3]: "@confluentinc/kafka-javascript": "0.1.16-devel"

Steps to Reproduce

There is a logic error in the Consumer.pause() method. See the WARNING and ERROR comments I left below.

  pause(topics) {
    if (this.#state !== ConsumerState.CONNECTED) {
      throw new error.KafkaJSError('Pause can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
    }

    for (let topic of topics) {
      if (typeof topic.topic !== 'string') {
        throw new error.KafkaJSError('Topic must be a string.', { code: error.ErrorCodes.ERR__INVALID_ARG });
      }

      if (!topic.partitions) {
        // WARNING: incorrect passing [{ topic: string, partition: number }] will result
        // in unpausing all assigned partitions for that topic!
        topic.partitions = this.#getAllAssignedPartition(topic.topic);
      }
    }
    
    // ERROR: topics was Array<{ topic: string, partitions: number[] }>,
    // but now is Array<{ topic: string, partition: number }> since it's been flattened
    // Recommendation: don't change the types of local variables
    topics = this.#flattenTopicPartitions(topics);
    if (topics.length === 0) {
      return;
    }
    this.#internalClient.pause(topics);

    /* Mark the messages in the cache as stale, runInternal* will deal with
     * making it unusable. */
    this.#messageCache.markStale(topics);

    /* If anyone's using eachBatch, mark the batch as stale. */
    topics.map(partitionKey)
      .filter(key => this.#topicPartitionToBatchPayload.has(key))
      .forEach(key => this.#topicPartitionToBatchPayload.get(key)._stale = true);

    topics.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.add(topicPartition));

    // ERROR: this.resume(topics) expects Array<{ topic: string, partitions: number[] }>,
    // but is now Array<{ topic: string, partition: number }> since it's been flattened
    // Given the branch `if (!topic.partitions)` above, we expect to unpause all partitions
    return () => this.resume(topics);
  }

Metadata

Metadata

Assignees

Labels

bugSomething isn't workingfixed-present-in-next-releaseBug or improvement that's done, it is in the development branch but yet unreleasedgreat report

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions