Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ blocks:
- export BUILD_LIBRDKAFKA=0
- npm run install-from-source
jobs:
- name: "Performance Test (Classic Protocol)"
- name: "Performance Test"
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
- docker compose -f test/docker/docker-compose.yml up -d && sleep 30
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ v1.4.0 is a feature release. It is supported for all usage.
## Enhancements

1. References librdkafka v2.11.0. Refer to the [librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0) for more information.
2. [KIP-848] `describeGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to being undefined.
2. [KIP-848] `describeGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to being undefined (#329)
3. [KIP-848] Admin API for listing consumer groups now has an optional filter to return only groups of given types (#328)

# confluent-kafka-javascript v1.3.2

Expand Down
18 changes: 15 additions & 3 deletions examples/kafkajs/admin/list-groups.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka, ConsumerGroupStates } = require('@confluentinc/kafka-javascript').KafkaJS;
const { Kafka, ConsumerGroupStates, ConsumerGroupTypes } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

async function adminStart() {
Expand All @@ -20,13 +20,20 @@ async function adminStart() {
short: 's',
multiple: true,
default: [],
}
},
'types': {
type: 'string',
short: 't',
multiple: true,
default: [],
},
},
});

let {
'bootstrap-servers': bootstrapServers,
states: matchConsumerGroupStates,
types: matchConsumerGroupTypes,
timeout,
} = args.values;

Expand All @@ -36,6 +43,9 @@ async function adminStart() {
matchConsumerGroupStates = matchConsumerGroupStates.map(
state => ConsumerGroupStates[state]);

matchConsumerGroupTypes = matchConsumerGroupTypes.map(
type => ConsumerGroupTypes[type]);

const kafka = new Kafka({
kafkaJS: {
brokers: [bootstrapServers],
Expand All @@ -48,13 +58,15 @@ async function adminStart() {
try {
const groupOverview = await admin.listGroups({
timeout,
matchConsumerGroupStates
matchConsumerGroupStates,
matchConsumerGroupTypes
});
for (const group of groupOverview.groups) {
console.log(`Group id: ${group.groupId}`);
console.log(`\tType: ${group.protocolType}`);
console.log(`\tIs simple: ${group.isSimpleConsumerGroup}`);
console.log(`\tState: ${group.state}`);
console.log(`\tType: ${group.type}`);
}
} catch(err) {
console.log('List topics failed', err);
Expand Down
2 changes: 2 additions & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ AdminClient.prototype.createPartitions = function (topic, totalPartitions, timeo
* May be unset (default: 5000).
* @param {Array<RdKafka.ConsumerGroupStates>?} options.matchConsumerGroupStates -
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
* @param {Array<RdKafka.ConsumerGroupTypes>?} options.matchConsumerGroupTypes -
* A list of consumer group types to match. May be unset, fetches all types (default: unset).
* @param {function} cb - The callback to be executed when finished.
* @example
* // Valid ways to call this function:
Expand Down
4 changes: 3 additions & 1 deletion lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,9 @@ class Admin {
* May be unset (default: 5000).
* @param {Array<KafkaJS.ConsumerGroupStates>?} options.matchConsumerGroupStates -
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
* @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates}>, errors: Array<RdKafka.LibrdKafkaError> }>}
* @param {Array<KafkaJS.ConsumerGroupTypes>?} options.matchConsumerGroupTypes -
* A list of consumer group types to match. May be unset, fetches all types (default: unset).
* @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates, type: KafkaJS.ConsumerGroupTypes}>, errors: Array<RdKafka.LibrdKafkaError> }>}
* Resolves with the list of consumer groups, rejects on error.
*/
async listGroups(options = {}) {
Expand Down
32 changes: 30 additions & 2 deletions src/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,9 @@ Baton AdminClient::CreatePartitions(

Baton AdminClient::ListGroups(
bool is_match_states_set,
std::vector<rd_kafka_consumer_group_state_t> &match_states, int timeout_ms,
std::vector<rd_kafka_consumer_group_state_t> &match_states,
bool is_match_types_set,
std::vector<rd_kafka_consumer_group_type_t> &match_types, int timeout_ms,
/* out */ rd_kafka_event_t **event_response) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE);
Expand Down Expand Up @@ -515,6 +517,15 @@ Baton AdminClient::ListGroups(
}
}

if (is_match_types_set) {
rd_kafka_error_t *error =
rd_kafka_AdminOptions_set_match_consumer_group_types(
options, &match_types[0], match_types.size());
if (error) {
return Baton::BatonFromErrorAndDestroy(error);
}
}

// Create queue just for this operation.
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr());

Expand Down Expand Up @@ -1195,9 +1206,26 @@ NAN_METHOD(AdminClient::NodeListGroups) {
}
}

std::vector<rd_kafka_consumer_group_type_t> match_types;
v8::Local<v8::String> match_consumer_group_types_key =
Nan::New("matchConsumerGroupTypes").ToLocalChecked();
bool is_match_types_set =
Nan::Has(config, match_consumer_group_types_key).FromMaybe(false);
v8::Local<v8::Array> match_types_array = Nan::New<v8::Array>();

if (is_match_types_set) {
match_types_array = GetParameter<v8::Local<v8::Array>>(
config, "matchConsumerGroupTypes", match_types_array);
if (match_types_array->Length()) {
match_types = Conversion::Admin::FromV8GroupTypeArray(
match_types_array);
}
}

// Queue the work.
Nan::AsyncQueueWorker(new Workers::AdminClientListGroups(
callback, client, is_match_states_set, match_states, timeout_ms));
callback, client, is_match_states_set, match_states, is_match_types_set,
match_types, timeout_ms));
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class AdminClient : public Connection {
// Baton DescribeConfig(rd_kafka_NewTopic_t* topic, int timeout_ms);
Baton ListGroups(bool is_match_states_set,
std::vector<rd_kafka_consumer_group_state_t>& match_states,
bool is_match_types_set,
std::vector<rd_kafka_consumer_group_type_t>& match_types,
int timeout_ms,
rd_kafka_event_t** event_response);
Baton DescribeGroups(std::vector<std::string>& groups,
Expand Down
33 changes: 33 additions & 0 deletions src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,35 @@ std::vector<rd_kafka_consumer_group_state_t> FromV8GroupStateArray(
return returnVec;
}

/**
* @brief Converts a v8 array of group types into a vector of
* rd_kafka_consumer_group_type_t.
*/
std::vector<rd_kafka_consumer_group_type_t> FromV8GroupTypeArray(
v8::Local<v8::Array> array) {
v8::Local<v8::Array> parameter = array.As<v8::Array>();
std::vector<rd_kafka_consumer_group_type_t> returnVec;
if (parameter->Length() >= 1) {
for (unsigned int i = 0; i < parameter->Length(); i++) {
v8::Local<v8::Value> v;
if (!Nan::Get(parameter, i).ToLocal(&v)) {
continue;
}
Nan::Maybe<int64_t> maybeT = Nan::To<int64_t>(v);
if (maybeT.IsNothing()) {
continue;
}
int64_t type_number = maybeT.FromJust();
if (type_number < 0 || type_number >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) {
continue;
}
returnVec.push_back(
static_cast<rd_kafka_consumer_group_type_t>(type_number));
}
}
return returnVec;
}

/**
* @brief Converts a rd_kafka_ListConsumerGroups_result_t* into a v8 object.
*/
Expand All @@ -920,6 +949,7 @@ v8::Local<v8::Object> FromListConsumerGroupsResult(
protocolType: string,
isSimpleConsumerGroup: boolean,
state: ConsumerGroupState (internally a number)
type: ConsumerGroupType (internally a number)
}[],
errors: LibrdKafkaError[]
}
Expand Down Expand Up @@ -957,6 +987,9 @@ v8::Local<v8::Object> FromListConsumerGroupsResult(
Nan::Set(groupObject, Nan::New("state").ToLocalChecked(),
Nan::New<v8::Number>(rd_kafka_ConsumerGroupListing_state(group)));

Nan::Set(groupObject, Nan::New("type").ToLocalChecked(),
Nan::New<v8::Number>(rd_kafka_ConsumerGroupListing_type(group)));

Nan::Set(groups, i, groupObject);
}

Expand Down
4 changes: 4 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ rd_kafka_NewTopic_t **FromV8TopicObjectArray(v8::Local<v8::Array>);
std::vector<rd_kafka_consumer_group_state_t> FromV8GroupStateArray(
v8::Local<v8::Array>);

// ListGroups: request
std::vector<rd_kafka_consumer_group_type_t> FromV8GroupTypeArray(
v8::Local<v8::Array> array);

// ListGroups: response
v8::Local<v8::Object> FromListConsumerGroupsResult(
const rd_kafka_ListConsumerGroups_result_t *);
Expand Down
5 changes: 5 additions & 0 deletions src/workers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1317,11 +1317,15 @@ void AdminClientCreatePartitions::HandleErrorCallback() {
AdminClientListGroups::AdminClientListGroups(
Nan::Callback* callback, AdminClient* client, bool is_match_states_set,
std::vector<rd_kafka_consumer_group_state_t>& match_states,
bool is_match_types_set,
std::vector<rd_kafka_consumer_group_type_t>& match_types,
const int& timeout_ms)
: ErrorAwareWorker(callback),
m_client(client),
m_is_match_states_set(is_match_states_set),
m_match_states(match_states),
m_is_match_types_set(is_match_types_set),
m_match_types(match_types),
m_timeout_ms(timeout_ms) {}

AdminClientListGroups::~AdminClientListGroups() {
Expand All @@ -1332,6 +1336,7 @@ AdminClientListGroups::~AdminClientListGroups() {

void AdminClientListGroups::Execute() {
Baton b = m_client->ListGroups(m_is_match_states_set, m_match_states,
m_is_match_types_set, m_match_types,
m_timeout_ms, &m_event_response);
if (b.err() != RdKafka::ERR_NO_ERROR) {
SetErrorBaton(b);
Expand Down
4 changes: 4 additions & 0 deletions src/workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ class AdminClientListGroups : public ErrorAwareWorker {
public:
AdminClientListGroups(Nan::Callback *, NodeKafka::AdminClient *, bool,
std::vector<rd_kafka_consumer_group_state_t> &,
bool,
std::vector<rd_kafka_consumer_group_type_t> &,
const int &);
~AdminClientListGroups();

Expand All @@ -543,6 +545,8 @@ class AdminClientListGroups : public ErrorAwareWorker {
NodeKafka::AdminClient *m_client;
const bool m_is_match_states_set;
std::vector<rd_kafka_consumer_group_state_t> m_match_states;
const bool m_is_match_types_set;
std::vector<rd_kafka_consumer_group_type_t> m_match_types;
const int m_timeout_ms;
rd_kafka_event_t *m_event_response;
};
Expand Down
17 changes: 16 additions & 1 deletion test/promisified/admin/list_groups.spec.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
jest.setTimeout(30000);

const {
testConsumerGroupProtocolClassic,
createConsumer,
secureRandom,
createTopic,
waitFor,
createAdmin,
} = require('../testhelpers');
const { ConsumerGroupStates, ErrorCodes } = require('../../../lib').KafkaJS;
const { ConsumerGroupStates, ConsumerGroupTypes, ErrorCodes } = require('../../../lib').KafkaJS;

describe('Admin > listGroups', () => {
let topicName, groupId, consumer, admin;
Expand All @@ -19,6 +20,7 @@ describe('Admin > listGroups', () => {
consumer = createConsumer({
groupId,
fromBeginning: true,
autoCommit: true,
});

await createTopic({ topic: topicName, partitions: 2 });
Expand Down Expand Up @@ -46,10 +48,12 @@ describe('Admin > listGroups', () => {
await consumer.run({ eachMessage: async () => {} });

await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
const groupType = testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CLASSIC : ConsumerGroupTypes.CONSUMER;

await admin.connect();
let listGroupsResult = await admin.listGroups({
matchConsumerGroupStates: undefined,
matchConsumerGroupTypes: undefined,
});
expect(listGroupsResult.errors).toEqual([]);
expect(listGroupsResult.groups).toEqual(
Expand All @@ -59,10 +63,20 @@ describe('Admin > listGroups', () => {
isSimpleConsumerGroup: false,
protocolType: 'consumer',
state: ConsumerGroupStates.STABLE,
type: groupType,
}),
])
);

// Consumer group should not show up when filtering for opposite group type.
let oppositeGroupType = testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CONSUMER : ConsumerGroupTypes.CLASSIC;
listGroupsResult = await admin.listGroups({
matchConsumerGroupTypes: [ oppositeGroupType ],
});
expect(listGroupsResult.errors).toEqual([]);
expect(listGroupsResult.groups.map(group => group.groupId)).not.toContain(groupId);


// Disconnect the consumer to make the group EMPTY.
await consumer.disconnect();
consumer = null;
Expand All @@ -76,6 +90,7 @@ describe('Admin > listGroups', () => {
isSimpleConsumerGroup: false,
protocolType: 'consumer',
state: ConsumerGroupStates.EMPTY,
type: groupType,
}),
])
);
Expand Down
7 changes: 5 additions & 2 deletions types/kafkajs.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import {
Node,
AclOperationTypes,
Uuid,
IsolationLevel
IsolationLevel,
ConsumerGroupTypes
} from './rdkafka'

import {
Expand All @@ -34,6 +35,7 @@ export {
AclOperationTypes,
Uuid,
IsolationLevel,
ConsumerGroupTypes
} from './rdkafka'

export interface OauthbearerProviderResponse {
Expand Down Expand Up @@ -413,7 +415,8 @@ export type Admin = {
listTopics(options?: { timeout?: number }): Promise<string[]>
listGroups(options?: {
timeout?: number,
matchConsumerGroupStates?: ConsumerGroupStates[]
matchConsumerGroupStates?: ConsumerGroupStates[],
matchConsumerGroupTypes?: ConsumerGroupTypes[]
}): Promise<{ groups: GroupOverview[], errors: LibrdKafkaError[] }>
describeGroups(
groups: string[],
Expand Down
3 changes: 2 additions & 1 deletion types/rdkafka.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ export interface GroupOverview {
protocolType: string;
isSimpleConsumerGroup: boolean;
state: ConsumerGroupStates;
type: ConsumerGroupTypes;
}

export enum AclOperationTypes {
Expand Down Expand Up @@ -504,7 +505,7 @@ export interface IAdminClient {
listTopics(options?: { timeout?: number }, cb?: (err: LibrdKafkaError, topics: string[]) => any): void;

listGroups(cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;
listGroups(options?: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[] },
listGroups(options?: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[], matchConsumerGroupTypes?: ConsumerGroupTypes[] },
cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;

describeGroups(groupIds: string[], cb?: (err: LibrdKafkaError, result: GroupDescriptions) => any): void;
Expand Down