Skip to content

Commit 9599e30

Browse files
committed
Implemented KIP 848 changes
1 parent 7c59a2f commit 9599e30

File tree

12 files changed

+105
-8
lines changed

12 files changed

+105
-8
lines changed

examples/kafkajs/admin/list-groups.js

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,20 @@ async function adminStart() {
2020
short: 's',
2121
multiple: true,
2222
default: [],
23-
}
23+
},
24+
'types': {
25+
type: 'string',
26+
short: 't',
27+
multiple: true,
28+
default: [],
29+
},
2430
},
2531
});
2632

2733
let {
2834
'bootstrap-servers': bootstrapServers,
2935
states: matchConsumerGroupStates,
36+
types: matchConsumerGroupTypes,
3037
timeout,
3138
} = args.values;
3239

@@ -36,6 +43,9 @@ async function adminStart() {
3643
matchConsumerGroupStates = matchConsumerGroupStates.map(
3744
state => ConsumerGroupStates[state]);
3845

46+
matchConsumerGroupTypes = matchConsumerGroupTypes.map(
47+
type => ConsumerGroupTypes[type]);
48+
3949
const kafka = new Kafka({
4050
kafkaJS: {
4151
brokers: [bootstrapServers],
@@ -55,6 +65,7 @@ async function adminStart() {
5565
console.log(`\tType: ${group.protocolType}`);
5666
console.log(`\tIs simple: ${group.isSimpleConsumerGroup}`);
5767
console.log(`\tState: ${group.state}`);
68+
console.log(`\tType: ${group.type}`);
5869
}
5970
} catch(err) {
6071
console.log('List topics failed', err);

lib/admin.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,8 @@ AdminClient.prototype.createPartitions = function (topic, totalPartitions, timeo
373373
* May be unset (default: 5000).
374374
* @param {Array<RdKafka.ConsumerGroupStates>?} options.matchConsumerGroupStates -
375375
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
376+
* @param {Array<RdKafka.ConsumerGroupTypes>?} options.matchConsumerGroupTypes -
377+
* A list of consumer group types to match. May be unset, fetches all types (default: unset).
376378
* @param {function} cb - The callback to be executed when finished.
377379
* @example
378380
* // Valid ways to call this function:

lib/kafkajs/_admin.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,9 @@ class Admin {
387387
* May be unset (default: 5000).
388388
* @param {Array<KafkaJS.ConsumerGroupStates>?} options.matchConsumerGroupStates -
389389
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
390-
* @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates}>, errors: Array<RdKafka.LibrdKafkaError> }>}
390+
* @param {Array<KafkaJS.ConsumerGroupTypes>?} options.matchConsumerGroupTypes -
391+
* A list of consumer group types to match. May be unset, fetches all types (default: unset).
392+
* @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates, type: KafkaJS.ConsumerGroupTypes}>, errors: Array<RdKafka.LibrdKafkaError> }>}
391393
* Resolves with the list of consumer groups, rejects on error.
392394
*/
393395
async listGroups(options = {}) {

src/admin.cc

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,9 @@ Baton AdminClient::CreatePartitions(
483483

484484
Baton AdminClient::ListGroups(
485485
bool is_match_states_set,
486-
std::vector<rd_kafka_consumer_group_state_t> &match_states, int timeout_ms,
486+
std::vector<rd_kafka_consumer_group_state_t> &match_states,
487+
bool is_match_types_set,
488+
std::vector<rd_kafka_consumer_group_type_t> &match_types, int timeout_ms,
487489
/* out */ rd_kafka_event_t **event_response) {
488490
if (!IsConnected()) {
489491
return Baton(RdKafka::ERR__STATE);
@@ -515,6 +517,15 @@ Baton AdminClient::ListGroups(
515517
}
516518
}
517519

520+
if (is_match_types_set) {
521+
rd_kafka_error_t *error =
522+
rd_kafka_AdminOptions_set_match_consumer_group_types(
523+
options, &match_types[0], match_types.size());
524+
if (error) {
525+
return Baton::BatonFromErrorAndDestroy(error);
526+
}
527+
}
528+
518529
// Create queue just for this operation.
519530
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr());
520531

@@ -1195,9 +1206,25 @@ NAN_METHOD(AdminClient::NodeListGroups) {
11951206
}
11961207
}
11971208

1209+
std::vector<rd_kafka_consumer_group_type_t> match_types;
1210+
v8::Local<v8::String> match_consumer_group_types_key =
1211+
Nan::New("matchConsumerGroupTypes").ToLocalChecked();
1212+
bool is_match_types_set =
1213+
Nan::Has(config, match_consumer_group_types_key).FromMaybe(false);
1214+
v8::Local<v8::Array> match_types_array = Nan::New<v8::Array>();
1215+
1216+
if (is_match_types_set) {
1217+
match_types_array = GetParameter<v8::Local<v8::Array>>(
1218+
config, "matchConsumerGroupTypes", match_types_array);
1219+
if (match_types_array->Length()) {
1220+
match_types = Conversion::Admin::FromV8GroupTypeArray(
1221+
match_types_array);
1222+
}
1223+
}
1224+
11981225
// Queue the work.
11991226
Nan::AsyncQueueWorker(new Workers::AdminClientListGroups(
1200-
callback, client, is_match_states_set, match_states, timeout_ms));
1227+
callback, client, is_match_states_set, match_states, is_match_types_set, match_types, timeout_ms));
12011228
}
12021229

12031230
/**

src/admin.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class AdminClient : public Connection {
5454
// Baton DescribeConfig(rd_kafka_NewTopic_t* topic, int timeout_ms);
5555
Baton ListGroups(bool is_match_states_set,
5656
std::vector<rd_kafka_consumer_group_state_t>& match_states,
57+
bool is_match_types_set,
58+
std::vector<rd_kafka_consumer_group_type_t>& match_types,
5759
int timeout_ms,
5860
rd_kafka_event_t** event_response);
5961
Baton DescribeGroups(std::vector<std::string>& groups,

src/common.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,35 @@ std::vector<rd_kafka_consumer_group_state_t> FromV8GroupStateArray(
908908
return returnVec;
909909
}
910910

911+
/**
912+
* @brief Converts a v8 array of group types into a vector of
913+
* rd_kafka_consumer_group_type_t.
914+
*/
915+
std::vector<rd_kafka_consumer_group_type_t> FromV8GroupTypeArray(
916+
v8::Local<v8::Array> array) {
917+
v8::Local<v8::Array> parameter = array.As<v8::Array>();
918+
std::vector<rd_kafka_consumer_group_type_t> returnVec;
919+
if (parameter->Length() >= 1) {
920+
for (unsigned int i = 0; i < parameter->Length(); i++) {
921+
v8::Local<v8::Value> v;
922+
if (!Nan::Get(parameter, i).ToLocal(&v)) {
923+
continue;
924+
}
925+
Nan::Maybe<int64_t> maybeT = Nan::To<int64_t>(v);
926+
if (maybeT.IsNothing()) {
927+
continue;
928+
}
929+
int64_t type_number = maybeT.FromJust();
930+
if (type_number >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) {
931+
continue;
932+
}
933+
returnVec.push_back(
934+
static_cast<rd_kafka_consumer_group_type_t>(type_number));
935+
}
936+
}
937+
return returnVec;
938+
}
939+
911940
/**
912941
* @brief Converts a rd_kafka_ListConsumerGroups_result_t* into a v8 object.
913942
*/
@@ -920,6 +949,7 @@ v8::Local<v8::Object> FromListConsumerGroupsResult(
920949
protocolType: string,
921950
isSimpleConsumerGroup: boolean,
922951
state: ConsumerGroupState (internally a number)
952+
type: ConsumerGroupType (internally a number)
923953
}[],
924954
errors: LibrdKafkaError[]
925955
}
@@ -957,6 +987,9 @@ v8::Local<v8::Object> FromListConsumerGroupsResult(
957987
Nan::Set(groupObject, Nan::New("state").ToLocalChecked(),
958988
Nan::New<v8::Number>(rd_kafka_ConsumerGroupListing_state(group)));
959989

990+
Nan::Set(groupObject, Nan::New("type").ToLocalChecked(),
991+
Nan::New<v8::Number>(rd_kafka_ConsumerGroupListing_type(group)));
992+
960993
Nan::Set(groups, i, groupObject);
961994
}
962995

src/common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ rd_kafka_NewTopic_t **FromV8TopicObjectArray(v8::Local<v8::Array>);
116116
std::vector<rd_kafka_consumer_group_state_t> FromV8GroupStateArray(
117117
v8::Local<v8::Array>);
118118

119+
// ListGroups: request
120+
std::vector<rd_kafka_consumer_group_type_t> FromV8GroupTypeArray(
121+
v8::Local<v8::Array> array);
122+
119123
// ListGroups: response
120124
v8::Local<v8::Object> FromListConsumerGroupsResult(
121125
const rd_kafka_ListConsumerGroups_result_t *);

src/workers.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,11 +1317,15 @@ void AdminClientCreatePartitions::HandleErrorCallback() {
13171317
AdminClientListGroups::AdminClientListGroups(
13181318
Nan::Callback* callback, AdminClient* client, bool is_match_states_set,
13191319
std::vector<rd_kafka_consumer_group_state_t>& match_states,
1320+
bool is_match_types_set,
1321+
std::vector<rd_kafka_consumer_group_type_t>& match_types,
13201322
const int& timeout_ms)
13211323
: ErrorAwareWorker(callback),
13221324
m_client(client),
13231325
m_is_match_states_set(is_match_states_set),
13241326
m_match_states(match_states),
1327+
m_is_match_types_set(is_match_types_set),
1328+
m_match_types(match_types),
13251329
m_timeout_ms(timeout_ms) {}
13261330

13271331
AdminClientListGroups::~AdminClientListGroups() {
@@ -1332,6 +1336,7 @@ AdminClientListGroups::~AdminClientListGroups() {
13321336

13331337
void AdminClientListGroups::Execute() {
13341338
Baton b = m_client->ListGroups(m_is_match_states_set, m_match_states,
1339+
m_is_match_types_set, m_match_types,
13351340
m_timeout_ms, &m_event_response);
13361341
if (b.err() != RdKafka::ERR_NO_ERROR) {
13371342
SetErrorBaton(b);

src/workers.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,8 @@ class AdminClientListGroups : public ErrorAwareWorker {
532532
public:
533533
AdminClientListGroups(Nan::Callback *, NodeKafka::AdminClient *, bool,
534534
std::vector<rd_kafka_consumer_group_state_t> &,
535+
bool,
536+
std::vector<rd_kafka_consumer_group_type_t> &,
535537
const int &);
536538
~AdminClientListGroups();
537539

@@ -543,6 +545,8 @@ class AdminClientListGroups : public ErrorAwareWorker {
543545
NodeKafka::AdminClient *m_client;
544546
const bool m_is_match_states_set;
545547
std::vector<rd_kafka_consumer_group_state_t> m_match_states;
548+
const bool m_is_match_types_set;
549+
std::vector<rd_kafka_consumer_group_type_t> m_match_types;
546550
const int m_timeout_ms;
547551
rd_kafka_event_t *m_event_response;
548552
};

test/promisified/admin/list_groups.spec.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const {
77
waitFor,
88
createAdmin,
99
} = require('../testhelpers');
10-
const { ConsumerGroupStates, ErrorCodes } = require('../../../lib').KafkaJS;
10+
const { ConsumerGroupStates, ConsumerGroupTypes, ErrorCodes } = require('../../../lib').KafkaJS;
1111

1212
describe('Admin > listGroups', () => {
1313
let topicName, groupId, consumer, admin;
@@ -50,6 +50,7 @@ describe('Admin > listGroups', () => {
5050
await admin.connect();
5151
let listGroupsResult = await admin.listGroups({
5252
matchConsumerGroupStates: undefined,
53+
matchConsumerGroupTypes: undefined,
5354
});
5455
expect(listGroupsResult.errors).toEqual([]);
5556
expect(listGroupsResult.groups).toEqual(
@@ -59,6 +60,7 @@ describe('Admin > listGroups', () => {
5960
isSimpleConsumerGroup: false,
6061
protocolType: 'consumer',
6162
state: ConsumerGroupStates.STABLE,
63+
type: ConsumerGroupTypes.CLASSIC,
6264
}),
6365
])
6466
);
@@ -76,6 +78,7 @@ describe('Admin > listGroups', () => {
7678
isSimpleConsumerGroup: false,
7779
protocolType: 'consumer',
7880
state: ConsumerGroupStates.EMPTY,
81+
type: ConsumerGroupTypes.CLASSIC,
7982
}),
8083
])
8184
);

0 commit comments

Comments
 (0)