Skip to content

Commit dd9e69c

Browse files
committed
Continue polling even when we don't receive any message, for example if all partitions are paused
1 parent 8b07557 commit dd9e69c

File tree

2 files changed

+32
-10
lines changed

2 files changed

+32
-10
lines changed

lib/kafkajs/_consumer.js

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,17 +1374,21 @@ class Consumer {
13741374
return ppc;
13751375
}
13761376

1377+
#notifyNonEmpty() {
1378+
if (this.#nonEmpty) {
1379+
this.#nonEmpty.resolve();
1380+
this.#nonEmpty = null;
1381+
}
1382+
if (this.#messageCache)
1383+
this.#messageCache.notifyAvailablePartitions();
1384+
}
1385+
13771386
#queueNonEmptyCb() {
13781387
const nonEmptyAction = async () => {
13791388
if (this.#fetchInProgress)
13801389
await this.#fetchInProgress;
13811390

1382-
if (this.#nonEmpty) {
1383-
this.#nonEmpty.resolve();
1384-
this.#nonEmpty = null;
1385-
}
1386-
if (this.#messageCache)
1387-
this.#messageCache.notifyAvailablePartitions();
1391+
this.#notifyNonEmpty();
13881392
};
13891393
nonEmptyAction().catch((e) => {
13901394
this.#logger.error(`Error in queueNonEmptyCb: ${e}`,
@@ -1457,19 +1461,32 @@ class Consumer {
14571461
* @private
14581462
*/
14591463
async #cacheExpirationLoop() {
1464+
const cacheExpirationInterval = BigInt(this.#cacheExpirationTimeoutMs * 1e6);
1465+
const maxFetchInterval = BigInt(1000 * 1e6);
14601466
while (!this.#workerTerminationScheduled.resolved) {
14611467
let now = hrtime.bigint();
1462-
const cacheExpiration = this.#lastFetchClockNs +
1463-
BigInt(this.#cacheExpirationTimeoutMs * 1e6);
1468+
const cacheExpirationTimeout = this.#lastFetchClockNs +
1469+
cacheExpirationInterval;
1470+
const maxFetchTimeout = this.#lastFetchClockNs +
1471+
maxFetchInterval;
14641472

1465-
if (now > cacheExpiration) {
1473+
if (now > cacheExpirationTimeout) {
14661474
this.#addPendingOperation(() =>
14671475
this.#clearCacheAndResetPositions());
14681476
await this.#checkMaxPollIntervalNotExceeded(now);
14691477
break;
14701478
}
1479+
if (now > maxFetchTimeout) {
1480+
/* We need to continue fetching even when we're
1481+
* not getting any messages, for example when all partitions are
1482+
* paused. */
1483+
this.#notifyNonEmpty();
1484+
}
1485+
1486+
const awakeTime = maxFetchTimeout < cacheExpirationTimeout ?
1487+
maxFetchTimeout : cacheExpirationTimeout;
14711488

1472-
let interval = Number(cacheExpiration - now) / 1e6;
1489+
let interval = Number(awakeTime - now) / 1e6;
14731490
if (interval < 100)
14741491
interval = 100;
14751492
await Timer.withTimeout(interval, this.#maxPollIntervalRestart);

test/promisified/admin/list_topics.spec.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const {
44
secureRandom,
55
createTopic,
66
createAdmin,
7+
sleep,
78
} = require('../testhelpers');
89
const { ErrorCodes } = require('../../../lib').KafkaJS;
910

@@ -26,6 +27,10 @@ describe('Admin > listTopics', () => {
2627
it('should timeout', async () => {
2728
await admin.connect();
2829

30+
/* Await for the learned brokers to avoid having to do
31+
* manual retries */
32+
await sleep(1000);
33+
2934
await expect(admin.listTopics({ timeout: 1 })).rejects.toHaveProperty(
3035
'code',
3136
ErrorCodes.ERR__TIMED_OUT

0 commit comments

Comments
 (0)