Skip to content

Commit 3f1190f

Browse files
authored
Add commitCb method (#15)
to avoid blocking while committing and return a Promise without having to call consume()
1 parent 9f5c734 commit 3f1190f

File tree

6 files changed

+149
-13
lines changed

6 files changed

+149
-13
lines changed

lib/kafka-consumer.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,26 @@ KafkaConsumer.prototype.commitMessageSync = function(msg) {
582582
return this;
583583
};
584584

585+
/**
586+
* Commits a list of offsets per topic partition, using provided callback.
587+
*
588+
* @param {TopicPartition[]} toppars - Topic partition list to commit
589+
* offsets for. Defaults to the current assignment
590+
* @param {Function} cb - Callback method to execute when finished
591+
* @return {Client} - Returns itself
592+
*/
593+
KafkaConsumer.prototype.commitCb = function(toppars, cb) {
594+
this._client.commitCb(toppars, function(err) {
595+
if (err) {
596+
cb(LibrdKafkaError.create(err));
597+
return;
598+
}
599+
600+
cb(null);
601+
});
602+
return this;
603+
};
604+
585605
/**
586606
* Get last known offsets from the client.
587607
*

lib/kafkajs/_consumer.js

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -328,22 +328,30 @@ class Consumer {
328328
*/
329329
async commitOffsets(topicPartitions = null) {
330330
if (this.#state !== ConsumerState.CONNECTED) {
331-
throw new error.KafkaJSError('Commit can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
331+
return Promise.reject(new error.KafkaJSError('Commit can only be called while connected.', { code: error.ErrorCodes.ERR__STATE }));
332332
}
333333

334-
try {
335-
if (topicPartitions === null) {
336-
this.#internalClient.commitSync();
337-
} else {
338-
const topicPartitions = topicPartitions.map(
339-
topicPartitionOffsetToRdKafka);
340-
this.#internalClient.commitSync(topicPartitions);
341-
}
342-
} catch (e) {
343-
if (!e.code || e.code !== error.ErrorCodes.ERR__NO_OFFSET) {
344-
throw createKafkaJsErrorFromLibRdKafkaError(e);
334+
return new Promise((resolve, reject) => {
335+
try {
336+
let cb = (e) => {
337+
if (e)
338+
reject(createKafkaJsErrorFromLibRdKafkaError(e));
339+
else
340+
resolve();
341+
};
342+
343+
if (topicPartitions)
344+
topicPartitions = topicPartitions.map(topicPartitionOffsetToRdKafka);
345+
else
346+
topicPartitions = null;
347+
this.#internalClient.commitCb(topicPartitions, cb);
348+
} catch (e) {
349+
if (!e.code || e.code !== error.ErrorCodes.ERR__NO_OFFSET)
350+
reject(createKafkaJsErrorFromLibRdKafkaError(e));
351+
else
352+
resolve();
345353
}
346-
}
354+
});
347355
}
348356

349357
/**

src/kafka-consumer.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
532532

533533
Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
534534
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
535+
Nan::SetPrototypeMethod(tpl, "commitCb", NodeCommitCb);
535536
Nan::SetPrototypeMethod(tpl, "offsetsStore", NodeOffsetsStore);
536537

537538
constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
@@ -875,6 +876,45 @@ NAN_METHOD(KafkaConsumer::NodeCommitSync) {
875876
info.GetReturnValue().Set(Nan::New<v8::Number>(error_code));
876877
}
877878

879+
NAN_METHOD(KafkaConsumer::NodeCommitCb) {
880+
Nan::HandleScope scope;
881+
int error_code;
882+
std::optional<std::vector<RdKafka::TopicPartition *>> toppars = std::nullopt;
883+
Nan::Callback *callback;
884+
885+
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
886+
887+
if (!consumer->IsConnected()) {
888+
Nan::ThrowError("KafkaConsumer is disconnected");
889+
return;
890+
}
891+
892+
if (info.Length() != 2) {
893+
Nan::ThrowError("Two arguments are required");
894+
return;
895+
}
896+
897+
if (!(
898+
(info[0]->IsArray() || info[0]->IsNull()) &&
899+
info[1]->IsFunction()
900+
)) {
901+
Nan::ThrowError("First argument should be an array or null and second one a callback");
902+
return;
903+
}
904+
905+
if (info[0]->IsArray()) {
906+
toppars =
907+
Conversion::TopicPartition::FromV8Array(info[0].As<v8::Array>());
908+
}
909+
callback = new Nan::Callback(info[1].As<v8::Function>());
910+
911+
Nan::AsyncQueueWorker(
912+
new Workers::KafkaConsumerCommitCb(callback, consumer,
913+
toppars));
914+
915+
info.GetReturnValue().Set(Nan::Null());
916+
}
917+
878918
NAN_METHOD(KafkaConsumer::NodeSubscribe) {
879919
Nan::HandleScope scope;
880920

src/kafka-consumer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ class KafkaConsumer : public Connection {
110110
static NAN_METHOD(NodeUnsubscribe);
111111
static NAN_METHOD(NodeCommit);
112112
static NAN_METHOD(NodeCommitSync);
113+
static NAN_METHOD(NodeCommitCb);
113114
static NAN_METHOD(NodeOffsetsStore);
114115
static NAN_METHOD(NodeCommitted);
115116
static NAN_METHOD(NodePosition);

src/workers.cc

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,58 @@ void KafkaConsumerCommitted::HandleErrorCallback() {
10361036
callback->Call(argc, argv);
10371037
}
10381038

1039+
/**
1040+
* @brief KafkaConsumer commit offsets with a callback function.
1041+
*
1042+
* The first callback argument is the commit error, or null on success.
1043+
*
1044+
* @see RdKafka::KafkaConsumer::commitSync
1045+
*/
1046+
KafkaConsumerCommitCb::KafkaConsumerCommitCb(Nan::Callback *callback,
1047+
KafkaConsumer* consumer,
1048+
std::optional<std::vector<RdKafka::TopicPartition*>> & t) :
1049+
ErrorAwareWorker(callback),
1050+
m_consumer(consumer),
1051+
m_topic_partitions(t) {}
1052+
1053+
KafkaConsumerCommitCb::~KafkaConsumerCommitCb() {
1054+
// Delete the underlying topic partitions as they are ephemeral or cloned
1055+
if (m_topic_partitions.has_value())
1056+
RdKafka::TopicPartition::destroy(m_topic_partitions.value());
1057+
}
1058+
1059+
void KafkaConsumerCommitCb::Execute() {
1060+
Baton b = Baton(NULL);
1061+
if (m_topic_partitions.has_value()) {
1062+
b = m_consumer->Commit(m_topic_partitions.value());
1063+
} else {
1064+
b = m_consumer->Commit();
1065+
}
1066+
if (b.err() != RdKafka::ERR_NO_ERROR) {
1067+
SetErrorBaton(b);
1068+
}
1069+
}
1070+
1071+
void KafkaConsumerCommitCb::HandleOKCallback() {
1072+
Nan::HandleScope scope;
1073+
1074+
const unsigned int argc = 1;
1075+
v8::Local<v8::Value> argv[argc];
1076+
1077+
argv[0] = Nan::Null();
1078+
1079+
callback->Call(argc, argv);
1080+
}
1081+
1082+
void KafkaConsumerCommitCb::HandleErrorCallback() {
1083+
Nan::HandleScope scope;
1084+
1085+
const unsigned int argc = 1;
1086+
v8::Local<v8::Value> argv[argc] = { GetErrorObject() };
1087+
1088+
callback->Call(argc, argv);
1089+
}
1090+
10391091
/**
10401092
* @brief KafkaConsumer seek
10411093
*

src/workers.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <uv.h>
1414
#include <nan.h>
1515
#include <string>
16+
#include <optional>
1617
#include <vector>
1718

1819
#include "src/common.h"
@@ -417,6 +418,20 @@ class KafkaConsumerCommitted : public ErrorAwareWorker {
417418
const int m_timeout_ms;
418419
};
419420

421+
class KafkaConsumerCommitCb : public ErrorAwareWorker {
422+
public:
423+
KafkaConsumerCommitCb(Nan::Callback*,
424+
NodeKafka::KafkaConsumer*, std::optional<std::vector<RdKafka::TopicPartition*>> &);
425+
~KafkaConsumerCommitCb();
426+
427+
void Execute();
428+
void HandleOKCallback();
429+
void HandleErrorCallback();
430+
private:
431+
NodeKafka::KafkaConsumer * m_consumer;
432+
std::optional<std::vector<RdKafka::TopicPartition*>> m_topic_partitions;
433+
};
434+
420435
class KafkaConsumerSeek : public ErrorAwareWorker {
421436
public:
422437
KafkaConsumerSeek(Nan::Callback*, NodeKafka::KafkaConsumer*,

0 commit comments

Comments
 (0)