Skip to content

Commit 86306ed

Browse files
committed
fixup: add conformance test
Signed-off-by: Lance Ball <[email protected]>
1 parent 3f91da0 commit 86306ed

File tree

3 files changed

+37
-7
lines changed

3 files changed

+37
-7
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"lint:fix": "eslint 'src/**/*.{js,ts}' 'test/**/*.{js,ts}' --fix",
1313
"apretest": "npm run lint && npm run conformance",
1414
"test": "mocha --require ts-node/register ./test/integration/**/kafka_tests.ts",
15-
"conformance": "cucumber-js ./conformance/features/http-protocol-binding.feature -p default",
15+
"conformance": "cucumber-js ./conformance/features/*-protocol-binding.feature -p default",
1616
"coverage": "nyc --reporter=lcov --reporter=text npm run test",
1717
"coverage-publish": "wget -qO - https://coverage.codacy.com/get.sh | bash -s report -l JavaScript -r coverage/lcov.info",
1818
"generate-docs": "typedoc --excludeNotDocumented --out docs src",

src/message/kafka/index.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,15 @@ function parseBinary<T>(message: KafkaMessage<T>): CloudEvent<T> {
175175
const eventObj: { [key: string ]: unknown } = {};
176176
const headers = { ...message.headers };
177177

178+
eventObj.datacontenttype = headers[CONSTANTS.HEADER_CONTENT_TYPE];
179+
178180
for (const key in KAFKA_CE_HEADERS) {
179181
const h = KAFKA_CE_HEADERS[key];
180182
if (!!headers[h]) {
181183
eventObj[HEADER_MAP[h]] = headers[h];
184+
if (h === KAFKA_CE_HEADERS.TIME) {
185+
eventObj.time = new Date(eventObj.time as string).toISOString();
186+
}
182187
delete headers[h];
183188
}
184189
}
@@ -211,9 +216,10 @@ function parseStructured<T>(message: KafkaMessage<T>): CloudEvent<T> {
211216
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_JSON)) {
212217
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
213218
}
214-
219+
const eventObj = JSON.parse(message.value as string);
220+
eventObj.time = new Date(eventObj.time).toISOString();
215221
return new CloudEvent({
216-
...JSON.parse(message.value as string),
222+
...eventObj,
217223
partitionkey: message.key,
218224
}, false);
219225
}
@@ -242,7 +248,7 @@ function parseBatched<T>(message: KafkaMessage<T>): CloudEvent<T>[] {
242248
function extractBinaryData<T>(message: KafkaMessage<T>): T {
243249
let data = message.value as T;
244250
// If the event data is JSON, go ahead and parse it
245-
const datacontenttype = message.headers[KAFKA_CE_HEADERS.DATACONTENTTYPE] as string;
251+
const datacontenttype = message.headers[CONSTANTS.HEADER_CONTENT_TYPE] as string;
246252
if (!!datacontenttype && datacontenttype.startsWith(CONSTANTS.MIME_JSON)) {
247253
if (typeof message.value === "string") {
248254
data = JSON.parse(message.value);

test/conformance/steps.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,41 @@
33
SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
/* eslint-disable @typescript-eslint/ban-ts-comment */
7-
86
import { assert } from "chai";
97
import { Given, When, Then, World } from "@cucumber/cucumber";
10-
import { Message, Headers, HTTP } from "../../src";
8+
import { Message, Headers, HTTP, KafkaMessage, Kafka } from "../../src";
119

1210
// eslint-disable-next-line @typescript-eslint/no-var-requires
1311
const { HTTPParser } = require("http-parser-js");
1412

1513
const parser = new HTTPParser(HTTPParser.REQUEST);
1614

15+
Given("Kafka Protocol Binding is supported", function (this: World) {
16+
return true;
17+
});
18+
19+
Given("a Kafka message with payload:", function (request: string) {
20+
// Create a KafkaMessage from the incoming HTTP request
21+
const value = Buffer.from(request);
22+
const message: KafkaMessage = {
23+
key: "",
24+
headers: {},
25+
body: value,
26+
value,
27+
};
28+
this.message = message;
29+
return true;
30+
});
31+
32+
Then("Kafka headers:", function (attributes: { rawTable: [] }) {
33+
this.message.headers = tableToObject(attributes.rawTable);
34+
});
35+
36+
When("parsed as Kafka message", function () {
37+
this.cloudevent = Kafka.toEvent(this.message);
38+
return true;
39+
});
40+
1741
Given("HTTP Protocol Binding is supported", function (this: World) {
1842
return true;
1943
});

0 commit comments

Comments
 (0)