Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ blocks:
- docker compose up -d && sleep 30
- export NODE_OPTIONS='--max-old-space-size=1536'
- npx jest --no-colors --ci test/promisified/
- name: "ESLint"
- name: "Lint"
commands:
- npx eslint lib/kafkajs
- make lint
- name: "Docs"
commands:
- make docs
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but there's a linting error to fix in

src/admin.cc:1001:  Lines should be <= 80 characters long  [whitespace/line_length] [2]

In semaphore.yml could you change name: "ESLint" to name: "Lint" and run make lint instead?

For the rest it looks good.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ v1.0.0 is a feature release. It is supported for all usage.
1. Add support for an Admin API to fetch topic offsets (#156).
2. Add support for Node v23 pre-built binaries (#158).
3. Add KafkaJS-compatible errors to promisified Admin API (createTopics, deleteGroups, deleteTopicRecords) (#159).
4. Include error types within Type definitions for promisified API (#210).

## Fixes

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ cpplint:
@$(PYTHON) $(CPPLINT) --filter=$(CPPLINT_FILTER) $(CPPLINT_FILES)

eslint: node_modules/.dirstamp
@./node_modules/.bin/eslint .
@./node_modules/.bin/eslint lib

lib: node_modules/.dirstamp $(CONFIG_OUTPUTS)
@PYTHONHTTPSVERIFY=0 $(NODE-GYP) build $(GYPBUILDARGS)
Expand Down
22 changes: 15 additions & 7 deletions lib/kafkajs/_error.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,12 @@ class KafkaJSTimeout extends KafkaJSError {
this.name = 'KafkaJSTimeout';
}
}
class KafkaJSLockTimeout extends KafkaJSTimeout {
constructor() {
super(...arguments);
this.name = 'KafkaJSLockTimeout';
}
}

/**
* KafkaJSCreateTopicError represents an error raised by the createTopics method for one topic.
* @extends KafkaJS.KafkaJSError
* @memberof KafkaJS
*/
class KafkaJSCreateTopicError extends KafkaJSProtocolError {
constructor(e, topicName, properties) {
super(e, properties);
Expand All @@ -196,6 +195,11 @@ class KafkaJSCreateTopicError extends KafkaJSProtocolError {
}
}

/**
* KafkaJSDeleteGroupsError represents an error raised by the deleteGroups method.
* @extends KafkaJS.KafkaJSError
* @memberof KafkaJS
*/
class KafkaJSDeleteGroupsError extends KafkaJSError {
constructor(e, groups) {
super(e);
Expand All @@ -204,6 +208,11 @@ class KafkaJSDeleteGroupsError extends KafkaJSError {
}
}

/**
* KafkaJSDeleteTopicRecordsError represents an error raised by the deleteTopicRecords method.
* @extends KafkaJS.KafkaJSError
* @memberof KafkaJS
*/
class KafkaJSDeleteTopicRecordsError extends KafkaJSError {
constructor({ partitions }) {
/*
Expand Down Expand Up @@ -278,7 +287,6 @@ module.exports = {
KafkaJSGroupCoordinatorNotFound,
KafkaJSNotImplemented,
KafkaJSTimeout,
KafkaJSLockTimeout,
KafkaJSCreateTopicError,
KafkaJSDeleteGroupsError,
KafkaJSDeleteTopicRecordsError,
Expand Down
4 changes: 2 additions & 2 deletions src/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,8 @@ NAN_METHOD(AdminClient::NodeConnect) {
// on the background thread.
// We will deactivate them if the connection fails.
// Because the Admin Client connect is synchronous, we can do this within
// AdminClient::Connect as well, but we do it here to keep the code similiar to
// the Producer and Consumer.
// AdminClient::Connect as well, but we do it here to keep the code similiar
// to the Producer and Consumer.
client->ActivateDispatchers();

Baton b = client->Connect();
Expand Down
104 changes: 103 additions & 1 deletion types/kafkajs.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import {
IsolationLevel
} from './rdkafka'

import {
CODES
} from './errors';

// Admin API related interfaces, types etc; and Error types are common, so
// just re-export them from here too.
export {
Expand All @@ -24,7 +28,7 @@ export {
Node,
AclOperationTypes,
Uuid,
IsolationLevel
IsolationLevel,
} from './rdkafka'

export interface OauthbearerProviderResponse {
Expand Down Expand Up @@ -427,3 +431,101 @@ export type Admin = {
isolationLevel: IsolationLevel
}): Promise<Array<SeekEntry & { high: string; low: string }>>
}


export function isKafkaJSError(error: Error): boolean;

export const ErrorCodes: typeof CODES.ERRORS;

export class KafkaJSError extends Error {
readonly message: Error['message']
readonly name: string
readonly retriable: boolean
readonly fatal: boolean
readonly abortable: boolean
readonly code: number
constructor(e: Error | string, metadata?: KafkaJSErrorMetadata)
}

export class KafkaJSProtocolError extends KafkaJSError {
constructor(e: Error | string)
}

export class KafkaJSCreateTopicError extends KafkaJSError {
readonly topic: string
constructor(e: Error | string, topicName: string, metadata?: KafkaJSErrorMetadata)
}

export class KafkaJSDeleteGroupsError extends KafkaJSError {
readonly groups: DeleteGroupsResult[]
constructor(e: Error | string, groups?: KafkaJSDeleteGroupsErrorGroups[])
}

export class KafkaJSDeleteTopicRecordsError extends KafkaJSError {
readonly partitions: KafkaJSDeleteTopicRecordsErrorPartition[]
constructor(metadata: KafkaJSDeleteTopicRecordsErrorTopic)
}

export interface KafkaJSDeleteGroupsErrorGroups {
groupId: string
errorCode: number
error: KafkaJSError
}

export interface KafkaJSDeleteTopicRecordsErrorTopic {
topic: string
partitions: KafkaJSDeleteTopicRecordsErrorPartition[]
}

export interface KafkaJSDeleteTopicRecordsErrorPartition {
partition: number
offset: string
error: KafkaJSError
}

export class KafkaJSAggregateError extends Error {
readonly errors: (Error | string)[]
constructor(message: Error | string, errors: (Error | string)[])
}

export class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError {
readonly topic: string
readonly partition: number
constructor(e: Error | string, metadata?: KafkaJSErrorMetadata)
}

export class KafkaJSConnectionError extends KafkaJSError {
constructor(e: Error | string, metadata?: KafkaJSErrorMetadata)
}

export class KafkaJSRequestTimeoutError extends KafkaJSError {
constructor(e: Error | string, metadata?: KafkaJSErrorMetadata)
}

export class KafkaJSPartialMessageError extends KafkaJSError {
constructor()
}

export class KafkaJSSASLAuthenticationError extends KafkaJSError {
constructor()
}

export class KafkaJSGroupCoordinatorNotFound extends KafkaJSError {
constructor()
}

export class KafkaJSNotImplemented extends KafkaJSError {
constructor()
}

export class KafkaJSTimeout extends KafkaJSError {
constructor()
}

export interface KafkaJSErrorMetadata {
retriable?: boolean
fatal?: boolean
abortable?: boolean
stack?: string
code?: number
}