diff --git a/CHANGELOG.md b/CHANGELOG.md
index dfbf4087..52d2ce76 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ v0.5.0 is a limited availability feature release. It is supported for all usage.
1. Add support for an Admin API to delete records.(#141).
2. Fixes an issue with unresolved raced Promises leaking in the consumer (#151).
3. Add support for an Admin API to describe topics.(#155).
+4. Add support for dependent Admin client (#153).
# confluent-kafka-javascript v0.4.0
diff --git a/e2e/admin-dependent.spec.js b/e2e/admin-dependent.spec.js
new file mode 100644
index 00000000..f3cd5c6c
--- /dev/null
+++ b/e2e/admin-dependent.spec.js
@@ -0,0 +1,83 @@
+/*
+ * confluent-kafka-javascript - Node.js wrapper for RdKafka C/C++ library
+ *
+ * Copyright (c) 2024 Confluent, Inc.
+ *
+ * This software may be modified and distributed under the terms
+ * of the MIT license. See the LICENSE.txt file for details.
+ */
+
+var Kafka = require('../');
+var t = require('assert');
+
+var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
+var time = Date.now();
+
+describe('Dependent Admin', function () {
+ describe('from Producer', function () {
+ let producer;
+
+ this.beforeEach(function (done) {
+ producer = new Kafka.Producer({
+ 'metadata.broker.list': kafkaBrokerList,
+ });
+ done();
+ });
+
+ it('should be created and useable from connected producer', function (done) {
+ producer.on('ready', function () {
+ let admin = Kafka.AdminClient.createFrom(producer);
+ admin.listTopics(null, function (err, res) {
+ t.ifError(err);
+ t.ok(res);
+ producer.disconnect(done);
+ admin = null;
+ });
+ t.ok(admin);
+ });
+ producer.connect();
+ });
+
+ it('should fail to be created from unconnected producer', function (done) {
+ t.throws(function () {
+ Kafka.AdminClient.createFrom(producer);
+ }, /Existing client must be connected before creating a new client from it/);
+ done();
+ });
+
+ });
+
+ describe('from Consumer', function () {
+ let consumer;
+
+ this.beforeEach(function (done) {
+ consumer = new Kafka.KafkaConsumer({
+ 'metadata.broker.list': kafkaBrokerList,
+ 'group.id': 'kafka-mocha-grp-' + time,
+ });
+ done();
+ });
+
+ it('should be created and useable from connected consumer', function (done) {
+ consumer.on('ready', function () {
+ let admin = Kafka.AdminClient.createFrom(consumer);
+ admin.listTopics(null, function (err, res) {
+ t.ifError(err);
+ t.ok(res);
+ consumer.disconnect(done);
+ admin = null;
+ });
+ t.ok(admin);
+ });
+ consumer.connect();
+ });
+
+ it('should fail to be created from unconnected consumer', function (done) {
+ t.throws(function () {
+ Kafka.AdminClient.createFrom(consumer);
+ }, /Existing client must be connected before creating a new client from it/);
+ done();
+ });
+
+ });
+});
diff --git a/examples/kafkajs/admin/dependent-admin.js b/examples/kafkajs/admin/dependent-admin.js
new file mode 100644
index 00000000..91875821
--- /dev/null
+++ b/examples/kafkajs/admin/dependent-admin.js
@@ -0,0 +1,94 @@
+// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
+const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
+
+async function adminFromConsumer() {
+ const kafka = new Kafka({
+ kafkaJS: {
+ brokers: ['localhost:9092'],
+ }
+ });
+
+ const consumer = kafka.consumer({
+ kafkaJS: {
+ groupId: 'test-group',
+ fromBeginning: true,
+ }
+ });
+
+ await consumer.connect();
+
+ // The consumer can be used as normal
+ await consumer.subscribe({ topic: 'test-topic' });
+ consumer.run({
+ eachMessage: async ({ topic, partition, message }) => {
+ console.log({
+ topic,
+ partition,
+ offset: message.offset,
+ key: message.key?.toString(),
+ value: message.value.toString(),
+ });
+ },
+ });
+
+ // And the same consumer can create an admin client - the consumer must have successfully
+ // been connected before the admin client can be created.
+ const admin = consumer.dependentAdmin();
+ await admin.connect();
+
+ // The admin client can be used until the consumer is connected.
+ const listTopicsResult = await admin.listTopics();
+ console.log(listTopicsResult);
+
+ await new Promise(resolve => setTimeout(resolve, 10000));
+
+ // Disconnect the consumer and admin clients in the correct order.
+ await admin.disconnect();
+ await consumer.disconnect();
+}
+
+async function adminFromProducer() {
+ const kafka = new Kafka({
+ kafkaJS: {
+ brokers: ['localhost:9092'],
+ }
+ });
+
+ const producer = kafka.producer({
+ 'metadata.max.age.ms': 900000, /* This is set to the default value. */
+ });
+
+ await producer.connect();
+
+ // And the same producer can create an admin client - the producer must have successfully
+ // been connected before the admin client can be created.
+ const admin = producer.dependentAdmin();
+ await admin.connect();
+
+ // The admin client can be used until the producer is connected.
+ const listTopicsResult = await admin.listTopics();
+ console.log(listTopicsResult);
+
+ // A common use case for the dependent admin client is to make sure the topic
+ // is cached before producing to it. This avoids delay in sending the first
+ // message to any topic. Using the admin client linked to the producer allows
+ // us to do this, by calling `fetchTopicMetadata` before we produce.
+ // Here, we cache all possible topics, but it's advisable to only cache the
+ // topics you are going to produce to (if you know it in advance),
+ // and avoid calling listTopics().
+ // Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
+ // which is 15 minutes by default, after which it will be removed if
+ // it has not been produced to.
+ await admin.fetchTopicMetadata({ topics: listTopicsResult }).catch(e => {
+ console.error('Error caching topics: ', e);
+ })
+
+ // The producer can be used as usual.
+ await producer.send({ topic: 'test-topic', messages: [{ value: 'Hello!' }] });
+
+ // Disconnect the producer and admin clients in the correct order.
+ await admin.disconnect();
+ await producer.disconnect();
+}
+
+adminFromProducer().then(() => adminFromConsumer()).catch(console.error);
diff --git a/examples/node-rdkafka/dependent-admin.js b/examples/node-rdkafka/dependent-admin.js
new file mode 100644
index 00000000..4aee994a
--- /dev/null
+++ b/examples/node-rdkafka/dependent-admin.js
@@ -0,0 +1,120 @@
+const Kafka = require('@confluentinc/kafka-javascript');
+const admin = require('../../lib/admin');
+
+const bootstrapServers = 'localhost:9092';
+
+function adminFromProducer(callback) {
+ const producer = new Kafka.Producer({
+ 'bootstrap.servers': bootstrapServers,
+ 'dr_msg_cb': true,
+ });
+
+ const createAdminAndListAndDescribeTopics = (done) => {
+ // Create an admin client from the producer, which must be connected.
+ // Thus, this is called from the producer's 'ready' event.
+ const admin = Kafka.AdminClient.createFrom(producer);
+
+ // The admin client can be used until the producer is connected.
+ admin.listTopics((err, topics) => {
+ if (err) {
+ console.error(err);
+ return;
+ }
+ console.log("Topics: ", topics);
+
+ // A common use case for the dependent admin client is to make sure the topic
+ // is cached before producing to it. This avoids delay in sending the first
+ // message to any topic. Using the admin client linked to the producer allows
+ // us to do this, by calling `describeTopics` before we produce.
+ // Here, we cache all possible topics, but it's advisable to only cache the
+ // topics you are going to produce to (if you know it in advance),
+ // and avoid calling listTopics().
+ // Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
+ // which is 15 minutes by default, after which it will be removed if
+ // it has not been produced to.
+ admin.describeTopics(topics, null, (err, topicDescriptions) => {
+ if (err) {
+ console.error(err);
+ return;
+ }
+ console.log("Topic descriptions fetched successfully");
+ admin.disconnect();
+ done();
+ });
+ });
+ };
+
+ producer.connect();
+
+ producer.on('ready', () => {
+ console.log("Producer is ready");
+ producer.setPollInterval(100);
+
+ // After the producer is ready, it can be used to create an admin client.
+ createAdminAndListAndDescribeTopics(() => {
+ // The producer can also be used normally to produce messages.
+ producer.produce('test-topic', null, Buffer.from('Hello World!'), null, Date.now());
+ });
+
+ });
+
+ producer.on('event.error', (err) => {
+ console.error(err);
+ producer.disconnect(callback);
+ });
+
+ producer.on('delivery-report', (err, report) => {
+ console.log("Delivery report received:", report);
+ producer.disconnect(callback);
+ });
+}
+
+function adminFromConsumer() {
+ const consumer = new Kafka.KafkaConsumer({
+ 'bootstrap.servers': bootstrapServers,
+ 'group.id': 'test-group',
+ 'auto.offset.reset': 'earliest',
+ });
+
+ const createAdminAndListTopics = () => {
+ // Create an admin client from the consumer, which must be connected.
+ // Thus, this is called from the consumer's 'ready' event.
+ const admin = Kafka.AdminClient.createFrom(consumer);
+
+ // The admin client can be used until the consumer is connected.
+ admin.listTopics((err, topics) => {
+ if (err) {
+ console.error(err);
+ return;
+ }
+ console.log("Topics: ", topics);
+ admin.disconnect();
+ });
+ };
+
+ consumer.connect();
+
+ consumer.on('ready', () => {
+ console.log("Consumer is ready");
+
+ // After the consumer is ready, it can be used to create an admin client.
+ createAdminAndListTopics();
+
+ // It can also be used normally to consume messages.
+ consumer.subscribe(['test-topic']);
+ consumer.consume();
+ });
+
+ consumer.on('data', (data) => {
+ // Quit after receiving a message.
+ console.log("Consumer:data", data);
+ consumer.disconnect();
+ });
+
+ consumer.on('event.error', (err) => {
+ console.error("Consumer:error", err);
+ consumer.disconnect();
+ });
+}
+
+adminFromProducer(() => adminFromConsumer());
diff --git a/lib/admin.js b/lib/admin.js
index a180add7..24e38f21 100644
--- a/lib/admin.js
+++ b/lib/admin.js
@@ -40,6 +40,7 @@ const AclOperationTypes = Object.seal({
module.exports = {
create: createAdminClient,
+ createFrom: createAdminClientFrom,
ConsumerGroupStates,
AclOperationTypes,
};
@@ -78,6 +79,33 @@ function createAdminClient(conf, eventHandlers) {
return client;
}
+/**
+ * Create a new AdminClient from an existing producer or consumer.
+ *
+ * This is a factory method because it immediately starts an
+ * active handle with the brokers.
+ *
+ * The producer or consumer being used must be connected.
+ * The client can only be used while the producer or consumer is connected.
+ * Logging and other events from this client will be emitted on the producer or consumer.
+ * @param {import('../types/rdkafka').Producer | import('../types/rdkafka').KafkaConsumer} existingClient a producer or consumer to create the admin client from
+ * @param {object} eventHandlers optional key value pairs of event handlers to attach to the client
+ */
+function createAdminClientFrom(existingClient, eventHandlers) {
+ var client = new AdminClient(null, existingClient);
+
+ if (eventHandlers && typeof eventHandlers === 'object') {
+ for (const key in eventHandlers) {
+ client.on(key, eventHandlers[key]);
+ }
+ }
+
+ LibrdKafkaError.wrap(client.connect(), true);
+
+ // Return the client if we succeeded
+ return client;
+}
+
/**
* AdminClient class for administering Kafka
*
@@ -86,7 +114,8 @@ function createAdminClient(conf, eventHandlers) {
* should be made using the factory method.
*
*
- * var client = AdminClient.create({ ... });
+ * var client = AdminClient.create({ ... }); // From configuration
+ * var client = AdminClient.createFrom(existingClient); // From existing producer or consumer
*
*
* Once you instantiate this object, it will have a handle to the kafka broker.
@@ -96,31 +125,27 @@ function createAdminClient(conf, eventHandlers) {
*
* @param {object} conf - Key value pairs to configure the admin client
* topic configuration
+ * @param {import('../types/rdkafka').Producer | import('../types/rdkafka').KafkaConsumer | null} existingClient
* @constructor
*/
-function AdminClient(conf) {
+function AdminClient(conf, existingClient) {
if (!(this instanceof AdminClient)) {
return new AdminClient(conf);
}
- conf = shallowCopy(conf);
+ if (conf) {
+ conf = shallowCopy(conf);
+ }
- /**
- * NewTopic model.
- *
- * This is the representation of a new message that is requested to be made
- * using the Admin client.
- *
- * @typedef {object} AdminClient~NewTopic
- * @property {string} topic - the topic name to create
- * @property {number} num_partitions - the number of partitions to give the topic
- * @property {number} replication_factor - the replication factor of the topic
- * @property {object} config - a list of key values to be passed as configuration
- * for the topic.
- */
+ Client.call(this, conf, Kafka.AdminClient, null, existingClient);
- Client.call(this, conf, Kafka.AdminClient);
- this._isConnected = false;
+ if (existingClient) {
+ this._isConnected = true;
+ this._hasUnderlyingClient = true;
+ } else {
+ this._isConnected = false;
+ this._hasUnderlyingClient = false;
+ }
this.globalConfig = conf;
}
@@ -133,8 +158,12 @@ function AdminClient(conf) {
* Unlike the other connect methods, this one is synchronous.
*/
AdminClient.prototype.connect = function () {
- this._client.configureCallbacks(true, this._cb_configs);
- LibrdKafkaError.wrap(this._client.connect(), true);
+ if (!this._hasUnderlyingClient) {
+ this._client.configureCallbacks(true, this._cb_configs);
+ LibrdKafkaError.wrap(this._client.connect(), true);
+ }
+ // While this could be a no-op for an existing client, we still emit the event
+ // to have a consistent API.
this._isConnected = true;
this.emit('ready', { name: this._client.name() });
};
@@ -146,11 +175,16 @@ AdminClient.prototype.connect = function () {
* some memory and shut some threads down
*/
AdminClient.prototype.disconnect = function () {
+ if (this._hasUnderlyingClient) {
+ // no-op if we're from an existing client, we're just reusing the handle.
+ return;
+ }
+
LibrdKafkaError.wrap(this._client.disconnect(), true);
- this._isConnected = false;
// The AdminClient doesn't provide a callback. So we can't
// wait for completion.
this._client.configureCallbacks(false, this._cb_configs);
+ this._isConnected = false;
};
/**
diff --git a/lib/client.js b/lib/client.js
index 84f976cf..2ce92c07 100644
--- a/lib/client.js
+++ b/lib/client.js
@@ -31,12 +31,43 @@ util.inherits(Client, Emitter);
* @param {function} SubClientType - The function representing the subclient
* type. In C++ land this needs to be a class that inherits from Connection.
* @param {object} topicConf - Topic configuration in key value pairs
+ * @param {object} existingClient - a producer or a consumer to derive this client from.
+ * only used by the AdminClient. Must be connected.
* @constructor
* @extends Emitter
*/
-function Client(globalConf, SubClientType, topicConf) {
+function Client(globalConf, SubClientType, topicConf, existingClient) {
if (!(this instanceof Client)) {
- return new Client(globalConf, SubClientType, topicConf);
+ return new Client(globalConf, SubClientType, topicConf, existingClient);
+ }
+
+ // Throw an error early - this allows us to set confs to {} and avoid all
+ // the null-checking in case of existingClient being set.
+ if (!existingClient && !globalConf) {
+ throw new Error('Global configuration data must be specified');
+ }
+
+ if (existingClient && globalConf) {
+ throw new Error('Global configuration data must not be specified when creating a client from an existing client');
+ }
+
+ if (existingClient && topicConf) {
+ throw new Error('Topic configuration data must not be specified when creating a client from an existing client');
+ }
+
+ if (existingClient && !(existingClient._client instanceof Kafka.Producer) && !(existingClient._client instanceof Kafka.KafkaConsumer)) {
+ throw new Error('Existing client must be a Producer or Consumer instance');
+ }
+
+ if (existingClient && !existingClient._isConnected) {
+ throw new Error('Existing client must be connected before creating a new client from it');
+ }
+
+ let existingInternalClient;
+ if (existingClient) {
+ globalConf = {};
+ topicConf = {};
+ existingInternalClient = existingClient.getClient();
}
Emitter.call(this);
@@ -58,7 +89,7 @@ function Client(globalConf, SubClientType, topicConf) {
globalConf['client.software.name'] = 'confluent-kafka-javascript';
globalConf['client.software.version'] = `${bindingVersion}-librdkafka-${Kafka.librdkafkaVersion}`;
- this._client = new SubClientType(globalConf, topicConf);
+ this._client = new SubClientType(globalConf, topicConf, existingInternalClient);
// We should not modify the globalConf object. We have cloned it already.
delete globalConf['client.software.name'];
@@ -80,7 +111,7 @@ function Client(globalConf, SubClientType, topicConf) {
event: {},
};
- if (!no_event_cb) {
+ if (!existingClient && !no_event_cb) {
this._cb_configs.event.event_cb = function(eventType, eventData) {
switch (eventType) {
case 'error':
@@ -193,6 +224,13 @@ function Client(globalConf, SubClientType, topicConf) {
* @return {Client} - Returns itself.
*/
Client.prototype.connect = function(metadataOptions, cb) {
+ if (this._hasUnderlyingClient) {
+ // This is a derived client. We don't want to connect it, it's already connected.
+ // No one should be reaching this method in the first place if they use the
+ // API correctly, but it is possible to do so accidentally.
+ throw new Error('Cannot connect an existing client');
+ }
+
var self = this;
var next = function(err, data) {
@@ -346,6 +384,14 @@ Client.prototype.getLastError = function() {
* @return {function} - Callback to call when disconnection is complete.
*/
Client.prototype.disconnect = function(cb) {
+ if (this._hasUnderlyingClient) {
+ // This is a derived client.
+ // We don't want to disconnect it as it's controlled by the underlying client.
+ // No one should be reaching this method in the first place if they use the
+ // API correctly, but it is possible to do so accidentally.
+ throw new Error('Cannot disconnect an existing client');
+ }
+
var self = this;
if (!this._isDisconnecting && this._client) {
diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js
index 78c8b64f..99527c53 100644
--- a/lib/kafkajs/_admin.js
+++ b/lib/kafkajs/_admin.js
@@ -67,6 +67,11 @@ class Admin {
*/
#clientName = undefined;
+ /**
+ * The existing client to use as basis for this admin client, if any is provided.
+ */
+ #existingClient = null;
+
/**
* Convenience function to create the metadata object needed for logging.
*/
@@ -76,10 +81,12 @@ class Admin {
/**
* @constructor
- * @param {import("../../types/kafkajs").AdminConstructorConfig} config
+ * @param {import("../../types/kafkajs").AdminConstructorConfig?} config
+ * @param {import("../../types/kafkajs").Client?} existingClient
*/
- constructor(config) {
+ constructor(config, existingClient) {
this.#userConfig = config;
+ this.#existingClient = existingClient;
}
#config() {
@@ -178,17 +185,27 @@ class Admin {
this.#state = AdminState.CONNECTING;
- const config = this.#config();
-
return new Promise((resolve, reject) => {
try {
/* AdminClient creation is a synchronous operation for node-rdkafka */
this.#connectPromiseFunc = { resolve, reject };
- this.#internalClient = RdKafka.AdminClient.create(config, {
- 'error': this.#errorCb.bind(this),
- 'ready': this.#readyCb.bind(this),
- 'event.log': (msg) => loggerTrampoline(msg, this.#logger),
- });
+ if (!this.#existingClient) {
+ const config = this.#config();
+ this.#internalClient = RdKafka.AdminClient.create(config, {
+ 'error': this.#errorCb.bind(this),
+ 'ready': this.#readyCb.bind(this),
+ 'event.log': (msg) => loggerTrampoline(msg, this.#logger),
+ });
+ } else {
+ const underlyingClient = this.#existingClient._getInternalClient();
+ if (!underlyingClient) {
+ throw new error.KafkaJSError("Underlying client is not connected.", { code: error.ErrorCodes.ERR__STATE });
+ }
+ this.#logger = this.#existingClient.logger();
+ this.#internalClient = RdKafka.AdminClient.createFrom(underlyingClient, {
+ 'ready': this.#readyCb.bind(this),
+ });
+ }
this.#clientName = this.#internalClient.name;
this.#logger.info("Admin client connected", this.#createAdminBindingMessageMetadata());
diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js
index b772a104..eb7c678c 100644
--- a/lib/kafkajs/_consumer.js
+++ b/lib/kafkajs/_consumer.js
@@ -1,4 +1,5 @@
const LibrdKafkaError = require('../error');
+const { Admin } = require('./_admin');
const error = require('./_error');
const RdKafka = require('../rdkafka');
const {
@@ -242,6 +243,26 @@ class Consumer {
this.#userConfig = kJSConfig;
}
+ /**
+ * @returns {import("../rdkafka").Consumer | null} the internal node-rdkafka client.
+ * @note only for internal use and subject to API changes.
+ */
+ _getInternalClient() {
+ return this.#internalClient;
+ }
+
+ /**
+ * Create a new admin client using the underlying connections of the consumer.
+ *
+ * The consumer must be connected before connecting the resulting admin client.
+ * The usage of the admin client is limited to the lifetime of the consumer.
+ * The consumer's logger is shared with the admin client.
+ * @returns {import("../../types/kafkajs").Admin}
+ */
+ dependentAdmin() {
+ return new Admin(null, this);
+ }
+
#config() {
if (!this.#internalConfig)
this.#internalConfig = this.#finalizedConfig();
@@ -985,13 +1006,6 @@ class Consumer {
return ret;
}
- /**
- * @returns {import("../rdkafka").Consumer} the internal node-rdkafka client.
- */
- _getInternalConsumer() {
- return this.#internalClient;
- }
-
/**
* Set up the client and connect to the bootstrap brokers.
* @returns {Promise} a promise that resolves when the consumer is connected.
diff --git a/lib/kafkajs/_producer.js b/lib/kafkajs/_producer.js
index f2542f39..89d7eeed 100644
--- a/lib/kafkajs/_producer.js
+++ b/lib/kafkajs/_producer.js
@@ -1,4 +1,5 @@
const RdKafka = require('../rdkafka');
+const { Admin } = require('./_admin');
const { kafkaJSToRdKafkaConfig,
topicPartitionOffsetToRdKafka,
createKafkaJsErrorFromLibRdKafkaError,
@@ -83,6 +84,26 @@ class Producer {
this.#userConfig = kJSConfig;
}
+ /**
+ * @returns {import("../rdkafka").Producer | null} the internal node-rdkafka client.
+ * @note only for internal use and subject to API changes.
+ */
+ _getInternalClient() {
+ return this.#internalClient;
+ }
+
+ /**
+ * Create a new admin client using the underlying connections of the producer.
+ *
+ * The producer must be connected before connecting the resulting admin client.
+ * The usage of the admin client is limited to the lifetime of the producer.
+ * The producer's logger is shared with the admin client.
+ * @returns {import("../../types/kafkajs").Admin}
+ */
+ dependentAdmin() {
+ return new Admin(null, this);
+ }
+
/**
* The client name used by the producer for logging - determined by librdkafka
* using a combination of clientId and an integer.
diff --git a/src/admin.cc b/src/admin.cc
index 082d8b1a..51eadbb2 100644
--- a/src/admin.cc
+++ b/src/admin.cc
@@ -23,17 +23,16 @@ namespace NodeKafka {
/**
* @brief AdminClient v8 wrapped object.
*
- * Specializes the connection to wrap a consumer object through compositional
+ * Specializes the connection to wrap a producer object through compositional
* inheritence. Establishes its prototype in node through `Init`
*
* @sa RdKafka::Handle
* @sa NodeKafka::Client
*/
-AdminClient::AdminClient(Conf* gconfig):
- Connection(gconfig, NULL) {
- rkqu = NULL;
-}
+AdminClient::AdminClient(Conf *gconfig) : Connection(gconfig, NULL) {}
+
+AdminClient::AdminClient(Connection *connection) : Connection(connection) {}
AdminClient::~AdminClient() {
Disconnect();
@@ -44,6 +43,14 @@ Baton AdminClient::Connect() {
return Baton(RdKafka::ERR_NO_ERROR);
}
+ /* We should never fail the IsConnected check when we have an underlying
+ * client, as it should always be connected. */
+ if (m_has_underlying) {
+ return Baton(RdKafka::ERR__STATE,
+ "Existing client is not connected, and dependent client "
+ "cannot initiate connection.");
+ }
+
Baton baton = setupSaslOAuthBearerConfig();
if (baton.err() != RdKafka::ERR_NO_ERROR) {
return baton;
@@ -68,10 +75,6 @@ Baton AdminClient::Connect() {
/* Set the client name at the first possible opportunity for logging. */
m_event_cb.dispatcher.SetClientName(m_client->name());
- if (rkqu == NULL) {
- rkqu = rd_kafka_queue_new(m_client->c_ptr());
- }
-
baton = setupSaslOAuthBearerBackgroundQueue();
if (baton.err() != RdKafka::ERR_NO_ERROR) {
DeactivateDispatchers();
@@ -81,14 +84,16 @@ Baton AdminClient::Connect() {
}
Baton AdminClient::Disconnect() {
+ /* Dependent AdminClients don't need to do anything. We block the call to
+ * disconnect in JavaScript, but the destructor of AdminClient might trigger
+ * this call. */
+ if (m_has_underlying) {
+ return Baton(RdKafka::ERR_NO_ERROR);
+ }
+
if (IsConnected()) {
scoped_shared_write_lock lock(m_connection_lock);
- if (rkqu != NULL) {
- rd_kafka_queue_destroy(rkqu);
- rkqu = NULL;
- }
-
DeactivateDispatchers();
delete m_client;
@@ -145,25 +150,39 @@ void AdminClient::New(const Nan::FunctionCallbackInfo& info) {
}
if (info.Length() < 1) {
- return Nan::ThrowError("You must supply a global configuration");
+ return Nan::ThrowError("You must supply a global configuration or a preexisting client"); // NOLINT
}
- if (!info[0]->IsObject()) {
- return Nan::ThrowError("Global configuration data must be specified");
- }
+ Connection *connection = NULL;
+ Conf *gconfig = NULL;
+ AdminClient *client = NULL;
- std::string errstr;
+ if (info.Length() >= 3 && !info[2]->IsNull() && !info[2]->IsUndefined()) {
+ if (!info[2]->IsObject()) {
+ return Nan::ThrowError("Third argument, if provided, must be a client object"); // NOLINT
+ }
+ // We check whether this is a wrapped object within the calling JavaScript
+ // code, so it's safe to unwrap it here. We Unwrap it directly into a
+ // Connection object, since it's OK to unwrap into the parent class.
+ connection = ObjectWrap::Unwrap(
+ info[2]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
+ client = new AdminClient(connection);
+ } else {
+ if (!info[0]->IsObject()) {
+ return Nan::ThrowError("Global configuration data must be specified");
+ }
- Conf* gconfig =
- Conf::create(RdKafka::Conf::CONF_GLOBAL,
- (info[0]->ToObject(Nan::GetCurrentContext())).ToLocalChecked(), errstr);
+ std::string errstr;
+ gconfig = Conf::create(
+ RdKafka::Conf::CONF_GLOBAL,
+ (info[0]->ToObject(Nan::GetCurrentContext())).ToLocalChecked(), errstr);
- if (!gconfig) {
- return Nan::ThrowError(errstr.c_str());
+ if (!gconfig) {
+ return Nan::ThrowError(errstr.c_str());
+ }
+ client = new AdminClient(gconfig);
}
- AdminClient* client = new AdminClient(gconfig);
-
// Wrap it
client->Wrap(info.This());
diff --git a/src/admin.h b/src/admin.h
index 4a498067..12103cdc 100644
--- a/src/admin.h
+++ b/src/admin.h
@@ -77,9 +77,10 @@ class AdminClient : public Connection {
static void New(const Nan::FunctionCallbackInfo& info);
explicit AdminClient(Conf* globalConfig);
+ explicit AdminClient(Connection* existingConnection);
~AdminClient();
- rd_kafka_queue_t* rkqu;
+ bool is_derived = false;
private:
// Node methods
diff --git a/src/connection.cc b/src/connection.cc
index 833c34f2..189c10f1 100644
--- a/src/connection.cc
+++ b/src/connection.cc
@@ -57,9 +57,32 @@ Connection::Connection(Conf* gconfig, Conf* tconfig):
m_gconfig->set("event_cb", &m_event_cb, errstr);
}
+/* Use an existing Connection object as the underlying for this object.
+ * At this point, the underlying connection is assumed to be connected with
+ * the m_client set. */
+Connection::Connection(Connection *existing):
+ m_event_cb() {
+ m_client = existing->m_client;
+
+ m_gconfig = existing->m_gconfig;
+ m_tconfig = existing->m_tconfig;
+
+ m_is_closing = false;
+ m_has_underlying = true;
+
+ // We must share the same connection lock as the existing connection to
+ // avoid getting disconnected while the existing connection is still in use.
+ m_connection_lock = existing->m_connection_lock;
+ }
+
+
Connection::~Connection() {
- uv_rwlock_destroy(&m_connection_lock);
+ // The underlying connection will take care of cleanup.
+ if (m_has_underlying) {
+ return;
+ }
+ uv_rwlock_destroy(&m_connection_lock);
if (m_tconfig) {
delete m_tconfig;
}
@@ -254,6 +277,9 @@ Baton Connection::GetMetadata(
err = RdKafka::ERR__STATE;
}
+ if (topic != NULL)
+ delete topic;
+
if (err == RdKafka::ERR_NO_ERROR) {
return Baton(metadata);
} else {
diff --git a/src/connection.h b/src/connection.h
index c798814b..532468fe 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -79,6 +79,7 @@ class Connection : public Nan::ObjectWrap {
protected:
Connection(Conf*, Conf*);
+ explicit Connection(Connection *);
~Connection();
static Nan::Persistent constructor;
@@ -88,7 +89,6 @@ class Connection : public Nan::ObjectWrap {
Baton setupSaslOAuthBearerConfig();
Baton setupSaslOAuthBearerBackgroundQueue();
- bool m_has_been_disconnected;
bool m_is_closing;
Conf* m_gconfig;
@@ -96,6 +96,7 @@ class Connection : public Nan::ObjectWrap {
std::string m_errstr;
uv_rwlock_t m_connection_lock;
+ bool m_has_underlying = false;
RdKafka::Handle* m_client;
diff --git a/test/promisified/admin/dependent_client.spec.js b/test/promisified/admin/dependent_client.spec.js
new file mode 100644
index 00000000..85039eeb
--- /dev/null
+++ b/test/promisified/admin/dependent_client.spec.js
@@ -0,0 +1,51 @@
+jest.setTimeout(30000);
+
+const {
+ secureRandom,
+ createProducer,
+ createConsumer,
+} = require('../testhelpers');
+
+describe.each(["producer", "consumer"])('Dependent admin client (%s)', (dependentOn) => {
+ let admin, underlyingClient;
+
+ beforeEach(async () => {
+ if (dependentOn === "producer") {
+ underlyingClient = createProducer({});
+ } else {
+ underlyingClient = createConsumer({ groupId: `test-group-${secureRandom()}` });
+ }
+ admin = underlyingClient.dependentAdmin();
+ });
+
+ afterEach(async () => {
+ admin && (await admin.disconnect());
+ underlyingClient && (await underlyingClient.disconnect());
+ });
+
+ it('should connect and work for connected underlying client', async () => {
+ await underlyingClient.connect();
+ await admin.connect();
+
+ const listTopicsResult = await admin.listTopics();
+ expect(listTopicsResult).toBeInstanceOf(Array);
+ });
+
+ it('should not connect for unconnected underlying client', async () => {
+ await expect(admin.connect()).rejects.toHaveProperty('message', 'Underlying client is not connected.');
+
+ underlyingClient = null; // prevents disconnect call
+ admin = null; // prevents disconnect call
+ });
+
+ it('should not connect for disconnected underlying client', async () => {
+ await underlyingClient.connect();
+ await underlyingClient.disconnect();
+
+ await expect(admin.connect()).rejects.toHaveProperty('message', 'Existing client must be connected before creating a new client from it');
+
+ underlyingClient = null; // prevents disconnect call
+ admin = null; // prevents disconnect call
+ });
+});
+
diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts
index 0e7fb44c..4a129926 100644
--- a/types/kafkajs.d.ts
+++ b/types/kafkajs.d.ts
@@ -99,6 +99,7 @@ type Client = {
disconnect(): Promise
logger(): Logger
setSaslCredentialProvider(authInfo: { username: string, password: string }): void
+ dependentAdmin(): Admin
}
export enum CompressionTypes {
@@ -331,7 +332,7 @@ export interface OffsetsByTopicPartition {
topics: TopicOffsets[]
}
-export type FetchOffsetsPartition = PartitionOffset & { metadata: string | null, leaderEpoch: number | null, error?: LibrdKafkaError };
+export type FetchOffsetsPartition = PartitionOffset & { metadata: string | null, leaderEpoch: number | null, error?: LibrdKafkaError };
export type TopicInput = string[] | { topic: string; partitions: number[] }[]
@@ -403,11 +404,11 @@ export type Admin = {
groups: string[],
options?: { timeout?: number, includeAuthorizedOperations?: boolean }): Promise
deleteGroups(groupIds: string[], options?: { timeout?: number }): Promise
- fetchOffsets(options: {
+ fetchOffsets(options: {
groupId: string,
topics?: TopicInput,
timeout?: number,
- requireStableOffsets?: boolean }):
+ requireStableOffsets?: boolean }):
Promise>
deleteTopicRecords(options: {
topic: string; partitions: SeekEntry[];
diff --git a/types/rdkafka.d.ts b/types/rdkafka.d.ts
index 88d5f0eb..165786ea 100644
--- a/types/rdkafka.d.ts
+++ b/types/rdkafka.d.ts
@@ -503,6 +503,7 @@ export type EventHandlers = {
export abstract class AdminClient {
static create(conf: GlobalConfig, eventHandlers?: EventHandlers): IAdminClient;
+ static createFrom(existingClient: Producer | KafkaConsumer, eventHandlers?: EventHandlers): IAdminClient;
}
export type RdKafka = {