Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ v1.0.0 is a feature release. It is supported for all usage.

1. Add support for an Admin API to fetch topic offsets (#156).
2. Add support for Node v23 pre-built binaries (#158).
3. Add KafkaJS-compatible errors to promisified Admin API (createTopics, deleteGroups, deleteTopicRecords) (#159).

## Fixes

Expand Down
8 changes: 4 additions & 4 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ The admin-client only has support for a limited subset of methods, with more to
* The `describeGroups` method is supported with additional `timeout` and `includeAuthorizedOperations` options.
A number of additional properties have been added to the returned groups.
* The `deleteGroups` method is supported with an additional `timeout` option.
* The `fetchOffsets` method is supported with additional `timeout` and
* The `fetchOffsets` method is supported with additional `timeout` and
`requireStableOffsets` option but `resolveOffsets` option is not yet supported.
* The `deleteTopicRecords` method is supported with additional `timeout`
and `operationTimeout` option.
Expand Down Expand Up @@ -374,8 +374,11 @@ An example is made available [here](./examples/kafkajs/sr.js).
For compatibility, as many error types as possible have been retained, but it is
better to switch to checking the `error.code`.

Note that `KafkaJSAggregateError` remains as before. Check the `.errors` array
for the individual errors when checking the error code.

Exhaustive list of error types and error fields removed:

| Error | Change |
|-------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `KafkaJSNonRetriableError` | Removed. Retriable errors are automatically retried by librdkafka, so there's no need for this type. Note that `error.retriable` still exists, but it's applicable only for transactional producer, where users are expected to retry an action themselves. All error types using this as a superclass now use `KafkaJSError` as their superclass. |
Expand All @@ -386,16 +389,13 @@ An example is made available [here](./examples/kafkajs/sr.js).
| `KafkaJSMetadataNotLoaded` | Removed. Metadata is automatically reloaded by librdkafka. |
| `KafkaJSTopicMetadataNotLoaded` | Removed. Topic metadata is automatically reloaded by librdkafka. |
| `KafkaJSStaleTopicMetadataAssignment` | removed as it's automatically refreshed by librdkafka. |
| `KafkaJSDeleteGroupsError` | Removed, as the Admin Client doesn't have this yet. May be added back again, or changed. |
| `KafkaJSServerDoesNotSupportApiKey` | Removed, as this error isn't generally exposed to user in librdkafka. If raised, it is subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION`. |
| `KafkaJSBrokerNotFound` | Removed. This error isn't exposed directly to the user in librdkafka. |
| `KafkaJSLockTimeout` | Removed. This error is not applicable while using librdkafka. |
| `KafkaJSUnsupportedMagicByteInMessageSet` | Removed. It is subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION`. |
| `KafkaJSDeleteTopicRecordsError` | Removed, as the Admin Client doesn't have this yet. May be added back again, or changed. |
| `KafkaJSInvariantViolation` | Removed, as it's not applicable to librdkafka. Errors in internal state are subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR__STATE`. |
| `KafkaJSInvalidVarIntError` | Removed. This error isn't exposed directly to the user in librdkafka. |
| `KafkaJSInvalidLongError` | Removed. This error isn't exposed directly to the user in librdkafka. |
| `KafkaJSCreateTopicError` | Removed, as the Admin Client doesn't have this yet. May be added back again, or changed.. |
| `KafkaJSAlterPartitionReassignmentsError` | removed, as the RPC is not used in librdkafka. |
| `KafkaJSFetcherRebalanceError` | Removed. This error isn't exposed directly to the user in librdkafka. |
| `KafkaJSConnectionError` | `broker` is removed from this object. |
Expand Down
63 changes: 56 additions & 7 deletions lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,25 +325,33 @@ class Admin {

/* Convert each topic to a format suitable for node-rdkafka, and dispatch the call. */
let allTopicsCreated = true;
const errors = [];
const ret =
options.topics
.map(this.#topicConfigToRdKafka)
.map(topicConfig => new Promise((resolve, reject) => {
.map(topicConfig => new Promise(resolve => {
this.#internalClient.createTopic(topicConfig, options.timeout ?? 5000, (err) => {
if (err) {
if (err.code === error.ErrorCodes.ERR_TOPIC_ALREADY_EXISTS) {
allTopicsCreated = false;
resolve();
return;
}
reject(createKafkaJsErrorFromLibRdKafkaError(err));
const e = createKafkaJsErrorFromLibRdKafkaError(err);
const createTopicError = new error.KafkaJSCreateTopicError(e, topicConfig.topic, e /* includes the properties */);
errors.push(createTopicError);
resolve(); // Don't reject this promise, instead, look at the errors array later.
} else {
resolve();
}
});
}));

return Promise.all(ret).then(() => allTopicsCreated);
await Promise.allSettled(ret);
if (errors.length > 0) {
throw new error.KafkaJSAggregateError("Topic creation errors", errors);
}
return allTopicsCreated;
}

/**
Expand Down Expand Up @@ -442,9 +450,24 @@ class Admin {
this.#internalClient.deleteGroups(groups, options, (err, reports) => {
if (err) {
reject(createKafkaJsErrorFromLibRdKafkaError(err));
} else {
resolve(reports);
return;
}

/* Convert group-level errors to KafkaJS errors if required. */
let errorsPresent = false;
reports = reports.map(groupReport => {
if (groupReport.error) {
errorsPresent = true;
groupReport.error = createKafkaJsErrorFromLibRdKafkaError(groupReport.error);
}
return groupReport;
});

if (errorsPresent) {
reject(new error.KafkaJSDeleteGroupsError('Error in DeleteGroups', reports));
return;
}
resolve(reports);
});
});
}
Expand Down Expand Up @@ -645,9 +668,35 @@ class Admin {
this.#internalClient.deleteRecords(topicPartitionOffsets, options, (err, results) => {
if (err) {
reject(createKafkaJsErrorFromLibRdKafkaError(err));
} else {
resolve(results);
return;
}

let errorsPresent = false;
results = results.map(result => {
if (result.error) {
errorsPresent = true;
result.error = createKafkaJsErrorFromLibRdKafkaError(result.error);
}
return result;
});

if (errorsPresent) {
const partitionsWithError =
{
/* Note that, for API compatibility, we must filter out partitions
* without errors, even though it is more useful to return all of
* them so the user can check offsets. */
partitions:
results.filter(result => result.error).map(result => ({
partition: result.partition,
offset: String(result.lowWatermark),
error: result.error,
}))
};
reject(new error.KafkaJSDeleteTopicRecordsError(partitionsWithError));
return;
}
resolve(results);
});
});
}
Expand Down
34 changes: 34 additions & 0 deletions lib/kafkajs/_error.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,37 @@ class KafkaJSLockTimeout extends KafkaJSTimeout {
}
}

class KafkaJSCreateTopicError extends KafkaJSProtocolError {
constructor(e, topicName, properties) {
super(e, properties);
this.topic = topicName;
this.name = 'KafkaJSCreateTopicError';
}
}

class KafkaJSDeleteGroupsError extends KafkaJSError {
constructor(e, groups) {
super(e);
this.groups = groups || [];
this.name = 'KafkaJSDeleteGroupsError';
}
}

class KafkaJSDeleteTopicRecordsError extends KafkaJSError {
constructor({ partitions }) {
/*
* This error is retriable if all the errors were retriable
*/
const retriable = partitions
.filter(({ error }) => error !== null)
.every(({ error }) => error.retriable === true);

super('Error while deleting records', { retriable });
this.name = 'KafkaJSDeleteTopicRecordsError';
this.partitions = partitions;
}
}

/**
* KafkaJSAggregateError represents an error raised when multiple errors occur at once.
* @extends Error
Expand Down Expand Up @@ -248,6 +279,9 @@ module.exports = {
KafkaJSNotImplemented,
KafkaJSTimeout,
KafkaJSLockTimeout,
KafkaJSCreateTopicError,
KafkaJSDeleteGroupsError,
KafkaJSDeleteTopicRecordsError,
KafkaJSAggregateError,
KafkaJSNoBrokerAvailableError,
isRebalancing,
Expand Down
103 changes: 103 additions & 0 deletions test/promisified/admin/create_topics.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
jest.setTimeout(30000);

const {
secureRandom,
createAdmin,
sleep,
} = require('../testhelpers');
const { KafkaJSAggregateError, KafkaJSCreateTopicError, ErrorCodes } = require('../../../lib').KafkaJS;

describe('Admin > createTopics', () => {
let topicNames, admin;

beforeEach(async () => {
topicNames = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`];
admin = createAdmin({});
});

afterEach(async () => {
admin && (await admin.disconnect());
});

it('should create topics', async () => {
await admin.connect();
await expect(admin.createTopics({
topics: [{
topic: topicNames[0]
}, {
topic: topicNames[1],
replicationFactor: 1,
}]
})).resolves.toEqual(true);
await sleep(1000); /* wait for metadata propagation */

const listTopicsResult = await admin.listTopics();
expect(listTopicsResult).toEqual(
expect.arrayContaining(topicNames)
);
});

it('should indicate if topics were already created', async () => {
await admin.connect();
await expect(admin.createTopics({
topics: [{
topic: topicNames[0]
}]
})).resolves.toEqual(true);
await sleep(1000); /* wait for metadata propagation */

await expect(admin.createTopics({
topics: [{
topic: topicNames[0]
}]
})).resolves.toEqual(false); /* topic already exists */
await sleep(1000); /* wait for metadata propagation */


await expect(admin.createTopics({
topics: [{
topic: topicNames[0]
}, {
topic: topicNames[1]
}]
})).resolves.toEqual(false); /* Even of one topic already exists */
});

it('should throw topic errors', async () => {
await admin.connect();

let storedErr;
await expect(admin.createTopics({
topics: [{
topic: topicNames[0] + '-invalid',
replicationFactor: 9090, /* unlikely that anyone has this many brokers in test env */
},
{
topic: topicNames[1] + '-invalid',
numPartitions: 0, /* 0 partitions is invalid */
}, {
topic: topicNames[0]
}]
}).catch(err => {
/* Store the error for checking contents later. */
storedErr = err;
throw err;
})).rejects.toThrow(KafkaJSAggregateError);
await sleep(1000); /* wait for metadata propagation */

expect(storedErr.message).toMatch(/Topic creation errors/);
expect(storedErr.errors).toHaveLength(2);

const replicationErr = storedErr.errors.find(e => e.topic === topicNames[0] + '-invalid');
expect(replicationErr).toBeInstanceOf(KafkaJSCreateTopicError);
expect(replicationErr.code).toEqual(ErrorCodes.ERR_INVALID_REPLICATION_FACTOR);

const partitionsErr = storedErr.errors.find(e => e.topic === topicNames[1]+ '-invalid');
expect(partitionsErr).toBeInstanceOf(KafkaJSCreateTopicError);
expect(partitionsErr.code).toEqual(ErrorCodes.ERR_INVALID_PARTITIONS);

/* Despite errors the valid topic should still be created. */
await expect(admin.listTopics()).resolves.toEqual(expect.arrayContaining([topicNames[0]]));
});
});

Loading