Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion examples/kafkajs/consumer.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { Kafka } = require('../..').KafkaJS
const { Kafka } = require('../..').KafkaJS;
//const { Kafka } = require('kafkajs')

async function consumerStart() {
Expand Down Expand Up @@ -32,7 +32,12 @@ async function consumerStart() {
}
},
rdKafka: {
globalConfig: {
'enable.auto.commit': false
},
topicConfig: {
'auto.offset.reset': 'earliest'
},
}
});

Expand Down
7 changes: 5 additions & 2 deletions examples/kafkajs/eos.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ async function eosStart() {
const consumer = kafka.consumer({
groupId: 'groupId',
rdKafka: {
"enable.auto.commit": false,
globalConfig: {
"enable.auto.commit": false,
}
},
});

Expand All @@ -34,7 +36,8 @@ async function eosStart() {
// The run method acts like a consume-transform-produce loop.
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const msgAckString = JSON.stringify({topic,
const msgAckString = JSON.stringify({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
Expand Down
47 changes: 28 additions & 19 deletions lib/kafkajs/_common.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,55 @@
/**
* @function kafkaJSToRdKafkaConfig()
* @param {object} config
* @returns {{globalConfig: import("../../types/config").ConsumerGlobalConfig|import("../../types/config").ProducerTopicConfig, topicConfig: import("../../types/config").ConsumerTopicConfig|import("../../types/config").ProducerTopicConfig}}
*/
async function kafkaJSToRdKafkaConfig(config) {
const ret = {
'allow.auto.create.topics': 'false'
}
ret['bootstrap.servers'] = config['brokers'].join(',');
const globalConfig = {
"allow.auto.create.topics": "false",
};
const topicConfig = {};
globalConfig["bootstrap.servers"] = config["brokers"].join(",");

let withSASL = false;

if (config.sasl) {
const sasl = config.sasl;
if (sasl.mechanism === 'plain' &&
typeof sasl.username === 'string' &&
typeof sasl.password === 'string') {
ret['sasl.mechanism'] = 'PLAIN';
ret['sasl.username'] = sasl.username;
ret['sasl.password'] = sasl.password;
withSASL = true;
const sasl = config.sasl;
if (
sasl.mechanism === "plain" &&
typeof sasl.username === "string" &&
typeof sasl.password === "string"
) {
globalConfig["sasl.mechanism"] = "PLAIN";
globalConfig["sasl.username"] = sasl.username;
globalConfig["sasl.password"] = sasl.password;
withSASL = true;
}
}

if (config.ssl === true && withSASL) {
ret['security.protocol'] = 'sasl_ssl';
globalConfig["security.protocol"] = "sasl_ssl";
} else if (withSASL) {
ret['security.protocol'] = 'sasl_plaintext';
globalConfig["security.protocol"] = "sasl_plaintext";
}

if (config.rdKafka) {
if (config.rdKafka.constructor === Function) {
await config.rdKafka(ret);
await config.rdKafka(globalConfig, topicConfig);
} else {
Object.assign(ret, config.rdKafka);
Object.assign(globalConfig, config.rdKafka.globalConfig);
Object.assign(topicConfig, config.rdKafka.topicConfig);
}
}

return ret;
return { globalConfig, topicConfig };
}

function topicPartitionOffsetToRdKafka(tpo) {
return {
topic: tpo.topic,
partition: tpo.partition,
offset: Number(tpo.offset),
}
};
}

module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka }
module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka };
135 changes: 70 additions & 65 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const RdKafka = require('../rdkafka');
const { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka } = require('./_common');

const ConsumerState = Object.freeze({
INIT: 0,
CONNECTING: 1,
INIT: 0,
CONNECTING: 1,
CONNECTED: 2,
DISCONNECTING: 3,
DISCONNECTED: 4,
Expand All @@ -17,38 +17,42 @@ class Consumer {
#connectPromiseFunc = {};
#state = ConsumerState.INIT;

/**
* @constructor
* @param {import("../../types/kafkajs").ConsumerConfig} kJSConfig
*/
constructor(kJSConfig) {
this.#kJSConfig = kJSConfig;
}

#config() {
async #config() {
if (!this.#rdKafkaConfig)
this.#rdKafkaConfig = this.#finalizedConfig();
this.#rdKafkaConfig = await this.#finalizedConfig();
return this.#rdKafkaConfig;
}

async #finalizedConfig() {
const config = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
const { globalConfig, topicConfig } = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
if (this.#kJSConfig.groupId) {
config['group.id'] = this.#kJSConfig.groupId;
globalConfig['group.id'] = this.#kJSConfig.groupId;
}
config['offset_commit_cb'] = true;
globalConfig['offset_commit_cb'] = true;
if (this.#kJSConfig.rebalanceListener) {
config['rebalance_cb'] = (err, assignment) => {
globalConfig['rebalance_cb'] = (err, assignment) => {
// Create the librdkafka error
err = LibrdKafkaError.create(err);

let call;
switch(err.code) {
switch (err.code) {
case LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS:
call = (this.#kJSConfig.rebalanceListener.onPartitionsAssigned ?
this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) :
Promise.resolve()).catch(console.error);
this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) :
Promise.resolve()).catch(console.error);
break;
case LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS:
call = (this.#kJSConfig.rebalanceListener.onPartitionsRevoked ?
this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) :
Promise.resolve()).catch(console.error);
this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) :
Promise.resolve()).catch(console.error);
break;
default:
call = Promise.reject().catch(() => {
Expand All @@ -58,46 +62,46 @@ class Consumer {
}

call
.finally(() => {
// Emit the event
this.#internalClient.emit('rebalance', err, assignment);

try {
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
this.#internalClient.assign(assignment);
} else {
this.#internalClient.unassign();
.finally(() => {
// Emit the event
this.#internalClient.emit('rebalance', err, assignment);

try {
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
this.#internalClient.assign(assignment);
} else {
this.#internalClient.unassign();
}
} catch (e) {
// Ignore exceptions if we are not connected
if (this.#internalClient.isConnected()) {
this.#internalClient.emit('rebalance.error', e);
}
}
} catch (e) {
// Ignore exceptions if we are not connected
if (this.#internalClient.isConnected()) {
this.#internalClient.emit('rebalance.error', e);
}
}
});
});
};
}
return config;
return { globalConfig, topicConfig };
}

#readyCb(arg) {
if (this.#state !== ConsumerState.CONNECTING) {
// I really don't know how to handle this now.
return;
}
this.#state = ConsumerState.CONNECTED;
if (this.#state !== ConsumerState.CONNECTING) {
// I really don't know how to handle this now.
return;
}
this.#state = ConsumerState.CONNECTED;

// Resolve the promise.
this.#connectPromiseFunc['resolve']();
// Resolve the promise.
this.#connectPromiseFunc['resolve']();
}

#errorCb(args) {
console.log('error', args);
if (this.#state === ConsumerState.CONNECTING) {
this.#connectPromiseFunc['reject'](args);
} else {
// do nothing for now.
}
console.log('error', args);
if (this.#state === ConsumerState.CONNECTING) {
this.#connectPromiseFunc['reject'](args);
} else {
// do nothing for now.
}
}

#notImplemented() {
Expand All @@ -111,7 +115,7 @@ class Consumer {
}

let timestamp = message.timestamp ? new Date(message.timestamp).toISOString()
: '';
: '';

var headers = undefined;
if (message.headers) {
Expand Down Expand Up @@ -139,14 +143,14 @@ class Consumer {
size: message.size,
headers
},
heartbeat: async () => {},
pause: () => {}
heartbeat: async () => { },
pause: () => { }
}
}

async #consumeSingle() {
return new Promise((resolve, reject) => {
this.#internalClient.consume(1, function(err, messages) {
this.#internalClient.consume(1, function (err, messages) {
if (err) {
reject(`Consume error code ${err.code}`);
return;
Expand All @@ -168,7 +172,7 @@ class Consumer {
});
else {
for (let partition of topic.partitions) {
ret.push({topic: topic.topic, partition});
ret.push({ topic: topic.topic, partition });
}
}
}
Expand All @@ -180,22 +184,23 @@ class Consumer {
}

async connect() {
if (this.#state !== ConsumerState.INIT) {
return Promise.reject('Connect has already been called elsewhere.');
}
if (this.#state !== ConsumerState.INIT) {
return Promise.reject('Connect has already been called elsewhere.');
}

this.#state = ConsumerState.CONNECTING;
this.#internalClient = new RdKafka.KafkaConsumer(await this.#config());
this.#internalClient.on('ready', this.#readyCb.bind(this));
this.#internalClient.on('event.error', this.#errorCb.bind(this));
this.#internalClient.on('event.log', console.log);

return new Promise((resolve, reject) => {
this.#connectPromiseFunc = {resolve, reject};
console.log('Connecting....');
this.#internalClient.connect();
console.log('connect() called');
});
this.#state = ConsumerState.CONNECTING;
const { globalConfig, topicConfig } = await this.#config();
this.#internalClient = new RdKafka.KafkaConsumer(globalConfig, topicConfig);
this.#internalClient.on('ready', this.#readyCb.bind(this));
this.#internalClient.on('event.error', this.#errorCb.bind(this));
this.#internalClient.on('event.log', console.log);

return new Promise((resolve, reject) => {
this.#connectPromiseFunc = { resolve, reject };
console.log('Connecting....');
this.#internalClient.connect();
console.log('connect() called');
});
}

async subscribe(subscription) {
Expand All @@ -208,7 +213,7 @@ class Consumer {

async run(config) {
if (this.#state !== ConsumerState.CONNECTED) {
throw new Error('Run must be called in state CONNECTED.');
throw new Error('Run must be called in state CONNECTED.');
}

while (this.#state === ConsumerState.CONNECTED) {
Expand Down Expand Up @@ -240,7 +245,7 @@ class Consumer {
seek(topicPartitionOffset) {
return new Promise((resolve, reject) => {
const rdKafkaTopicPartitionOffset =
topicPartitionOffsetToRdKafka(topicPartitionOffset);
topicPartitionOffsetToRdKafka(topicPartitionOffset);
this.#internalClient.seek(rdKafkaTopicPartitionOffset, 0, (err) => {
if (err) {
reject(new Error(`Seek error code ${err.code}`));
Expand Down
Loading