Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ v0.5.0 is a limited availability feature release. It is supported for all usage.
1. Add support for an Admin API to delete records.(#141).
2. Fixes an issue with unresolved raced Promises leaking in the consumer (#151).
3. Add support for an Admin API to describe topics.(#155).
4. Add support for dependent Admin client (#153).


# confluent-kafka-javascript v0.4.0
Expand Down
83 changes: 83 additions & 0 deletions e2e/admin-dependent.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* confluent-kafka-javascript - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2024 Confluent, Inc.
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

var Kafka = require('../');
var t = require('assert');

var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
var time = Date.now();

describe('Dependent Admin', function () {
describe('from Producer', function () {
let producer;

this.beforeEach(function (done) {
producer = new Kafka.Producer({
'metadata.broker.list': kafkaBrokerList,
});
done();
});

it('should be created and useable from connected producer', function (done) {
producer.on('ready', function () {
let admin = Kafka.AdminClient.createFrom(producer);
admin.listTopics(null, function (err, res) {
t.ifError(err);
t.ok(res);
producer.disconnect(done);
admin = null;
});
t.ok(admin);
});
producer.connect();
});

it('should fail to be created from unconnected producer', function (done) {
t.throws(function () {
Kafka.AdminClient.createFrom(producer);
}, /Existing client must be connected before creating a new client from it/);
done();
});

});

describe('from Consumer', function () {
let consumer;

this.beforeEach(function (done) {
consumer = new Kafka.KafkaConsumer({
'metadata.broker.list': kafkaBrokerList,
'group.id': 'kafka-mocha-grp-' + time,
});
done();
});

it('should be created and useable from connected consumer', function (done) {
consumer.on('ready', function () {
let admin = Kafka.AdminClient.createFrom(consumer);
admin.listTopics(null, function (err, res) {
t.ifError(err);
t.ok(res);
consumer.disconnect(done);
admin = null;
});
t.ok(admin);
});
consumer.connect();
});

it('should fail to be created from unconnected consumer', function (done) {
t.throws(function () {
Kafka.AdminClient.createFrom(consumer);
}, /Existing client must be connected before creating a new client from it/);
done();
});

});
});
94 changes: 94 additions & 0 deletions examples/kafkajs/admin/dependent-admin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;

async function adminFromConsumer() {
const kafka = new Kafka({
kafkaJS: {
brokers: ['localhost:9092'],
}
});

const consumer = kafka.consumer({
kafkaJS: {
groupId: 'test-group',
fromBeginning: true,
}
});

await consumer.connect();

// The consumer can be used as normal
await consumer.subscribe({ topic: 'test-topic' });
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
value: message.value.toString(),
});
},
});

// And the same consumer can create an admin client - the consumer must have successfully
// been connected before the admin client can be created.
const admin = consumer.dependentAdmin();
await admin.connect();

// The admin client can be used until the consumer is connected.
const listTopicsResult = await admin.listTopics();
console.log(listTopicsResult);

await new Promise(resolve => setTimeout(resolve, 10000));

// Disconnect the consumer and admin clients in the correct order.
await admin.disconnect();
await consumer.disconnect();
}

async function adminFromProducer() {
const kafka = new Kafka({
kafkaJS: {
brokers: ['localhost:9092'],
}
});

const producer = kafka.producer({
'metadata.max.age.ms': 900000, /* This is set to the default value. */
});

await producer.connect();

// And the same producer can create an admin client - the producer must have successfully
// been connected before the admin client can be created.
const admin = producer.dependentAdmin();
await admin.connect();

// The admin client can be used until the producer is connected.
const listTopicsResult = await admin.listTopics();
console.log(listTopicsResult);

// A common use case for the dependent admin client is to make sure the topic
// is cached before producing to it. This avoids delay in sending the first
// message to any topic. Using the admin client linked to the producer allows
// us to do this, by calling `fetchTopicMetadata` before we produce.
// Here, we cache all possible topics, but it's advisable to only cache the
// topics you are going to produce to (if you know it in advance),
// and avoid calling listTopics().
// Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
// which is 15 minutes by default, after which it will be removed if
// it has not been produced to.
await admin.fetchTopicMetadata({ topics: listTopicsResult }).catch(e => {
console.error('Error caching topics: ', e);
})

// The producer can be used as usual.
await producer.send({ topic: 'test-topic', messages: [{ value: 'Hello!' }] });

// Disconnect the producer and admin clients in the correct order.
await admin.disconnect();
await producer.disconnect();
}

adminFromProducer().then(() => adminFromConsumer()).catch(console.error);
120 changes: 120 additions & 0 deletions examples/node-rdkafka/dependent-admin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
const Kafka = require('@confluentinc/kafka-javascript');
const admin = require('../../lib/admin');

const bootstrapServers = 'localhost:9092';

function adminFromProducer(callback) {
const producer = new Kafka.Producer({
'bootstrap.servers': bootstrapServers,
'dr_msg_cb': true,
});

const createAdminAndListAndDescribeTopics = (done) => {
// Create an admin client from the producer, which must be connected.
// Thus, this is called from the producer's 'ready' event.
const admin = Kafka.AdminClient.createFrom(producer);

// The admin client can be used until the producer is connected.
admin.listTopics((err, topics) => {
if (err) {
console.error(err);
return;
}
console.log("Topics: ", topics);

// A common use case for the dependent admin client is to make sure the topic
// is cached before producing to it. This avoids delay in sending the first
// message to any topic. Using the admin client linked to the producer allows
// us to do this, by calling `describeTopics` before we produce.
// Here, we cache all possible topics, but it's advisable to only cache the
// topics you are going to produce to (if you know it in advance),
// and avoid calling listTopics().
// Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
// which is 15 minutes by default, after which it will be removed if
// it has not been produced to.
admin.describeTopics(topics, null, (err, topicDescriptions) => {
if (err) {
console.error(err);
return;
}
console.log("Topic descriptions fetched successfully");
admin.disconnect();
done();
});
});
};

producer.connect();

producer.on('ready', () => {
console.log("Producer is ready");
producer.setPollInterval(100);

// After the producer is ready, it can be used to create an admin client.
createAdminAndListAndDescribeTopics(() => {
// The producer can also be used normally to produce messages.
producer.produce('test-topic', null, Buffer.from('Hello World!'), null, Date.now());
});

});

producer.on('event.error', (err) => {
console.error(err);
producer.disconnect(callback);
});

producer.on('delivery-report', (err, report) => {
console.log("Delivery report received:", report);
producer.disconnect(callback);
});
}

function adminFromConsumer() {
const consumer = new Kafka.KafkaConsumer({
'bootstrap.servers': bootstrapServers,
'group.id': 'test-group',
'auto.offset.reset': 'earliest',
});

const createAdminAndListTopics = () => {
// Create an admin client from the consumer, which must be connected.
// Thus, this is called from the consumer's 'ready' event.
const admin = Kafka.AdminClient.createFrom(consumer);

// The admin client can be used until the consumer is connected.
admin.listTopics((err, topics) => {
if (err) {
console.error(err);
return;
}
console.log("Topics: ", topics);
admin.disconnect();
});
};

consumer.connect();

consumer.on('ready', () => {
console.log("Consumer is ready");

// After the consumer is ready, it can be used to create an admin client.
createAdminAndListTopics();

// It can also be used normally to consume messages.
consumer.subscribe(['test-topic']);
consumer.consume();
});

consumer.on('data', (data) => {
// Quit after receiving a message.
console.log("Consumer:data", data);
consumer.disconnect();
});

consumer.on('event.error', (err) => {
console.error("Consumer:error", err);
consumer.disconnect();
});
}

adminFromProducer(() => adminFromConsumer());
Loading