-
Notifications
You must be signed in to change notification settings - Fork 21
Description
Consumers, even idle ones, are leaking memory. Primarily in the form of unfulfilled promises.
Repro steps:
Started with a test app that starts up 16 consumers, each for a different topic, and does nothing else.
src/memory-leak.ts
import {KafkaJS as Confluent, RdKafka} from "@confluentinc/kafka-javascript";
const groupId = (t: number) => `test-confluent-group-${t}`;
const topic = (t: number) => `test-confluent-topic-${t}`;
const kafka: Confluent.Kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
const admin: Confluent.Admin = kafka.admin();
await admin.connect();
let consumers: Confluent.Consumer[] = [];
let consumersReady = new Set<number>();
runTest(16).catch(console.error);
async function runTest(topicCount: number) {
const topics: Confluent.ITopicConfig[] = [];
for (let t = 0; t < topicCount; t++) {
topics.push({topic: topic(t)});
}
await admin.createTopics({topics});
let promises: Promise<any>[] = [];
for (let t = 0; t < topicCount; t++) {
promises.push(setupConsumer(t));
}
await Promise.all(promises);
await until(() => consumersReady.size === topicCount);
console.log("All consumers ready");
}
async function setupConsumer(t: number) {
let ready = false;
const consumer = kafka.consumer({
kafkaJS: {groupId: groupId(t), maxWaitTimeInMs: 100},
rebalance_cb: (err: any, assignment: any, consumer: any) => {
if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
if (!ready) {
ready = true;
consumersReady.add(t);
}
}
});
await consumer.connect();
await consumer.subscribe({topic: topic(t)});
await consumer.run({eachMessage: async () => {}});
consumers.push(consumer);
}
async function until(condition: () => boolean) {
const timeout = 30000;
const finish = Date.now() + timeout;
while (Date.now() <= finish) {
const result = condition();
if (result) return;
await new Promise(resolve => setTimeout(resolve, 50));
}
throw new Error(`Failed within ${timeout!}ms`);
}
Start the app:
node --loader=ts-node/esm --heapsnapshot-signal=SIGUSR2 src/memory-leak.ts
After the All consumers ready
confirmation, triggered a heap dump snapshot using pid of running process.
kill -SIGUSR2 21898
This created the file Heap.20241107.120052.21898.0.001.heapsnapshot.
Waited approximately 10 minutes, and triggered another heap dump snapshot.
This created the file Heap.20241107.121022.21898.0.003.heapsnapshot.
Opened Chrome, entered URL chrome://inspect
, selected Open dedicated DevTools for Node link, then opened Memory tab.
Clicked Load profile
button to load each file mentioned above.
Selected the 003 file and then the Comparison option against the 001 file.
This shows that in the ~10 minutes between snapshot, overall memory usage increased by 78MB (Size Delta column), the bulk of which came from 53K more promises added than resolved (Delta column).
We've seen in our application that the memory increase is inexorable until the app eventually crashes.
<--- JS stacktrace --->
FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory
----- Native stack trace -----