Skip to content
11 changes: 9 additions & 2 deletions src/services/data-targets/base-data-target.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Logger } from "@nestjs/common";

import { SendStatus } from "@enum/send-status.enum";
import { DataTargetSendStatus } from "@interfaces/data-target-send-status.interface";
import { DataTarget } from "@entities/data-target.entity";

/**
* This class exposes general functionality used for the DataTarget
Expand All @@ -14,8 +15,14 @@ export abstract class BaseDataTargetService {
return { status: SendStatus.OK };
}

failure(receiver: string, errorMessage: string): DataTargetSendStatus {
this.baseLogger.error(`Send to ${receiver} failed with error ${errorMessage}`);
failure(
receiver: string,
errorMessage: string,
dataTarget: DataTarget
): DataTargetSendStatus {
this.baseLogger.error(
`Datatarget {Id: ${dataTarget.id}, Name: ${dataTarget.name}} Send to ${receiver} failed with error ${errorMessage}`
);
return {
status: SendStatus.ERROR,
errorMessage: errorMessage.toString(),
Expand Down
52 changes: 31 additions & 21 deletions src/services/data-targets/fiware-data-target.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import { SendStatus } from "../../entities/enum/send-status.enum";

@Injectable()
export class FiwareDataTargetService extends BaseDataTargetService {
constructor(private httpService: HttpService, private authenticationTokenProvider: AuthenticationTokenProvider) {
constructor(
private httpService: HttpService,
private authenticationTokenProvider: AuthenticationTokenProvider
) {
super();
}

Expand All @@ -25,20 +28,24 @@ export class FiwareDataTargetService extends BaseDataTargetService {
datatarget: DataTarget,
dto: TransformedPayloadDto
): Promise<DataTargetSendStatus> {

const config: FiwareDataTargetConfiguration = (datatarget as FiwareDataTarget).toConfiguration();
const config: FiwareDataTargetConfiguration = (
datatarget as FiwareDataTarget
).toConfiguration();

// NOTE: For context broker secured with OAuth2 we want to have extra retry in case the cached token is expired.
const retries = config.tokenEndpoint ? 1 : 0
const retries = config.tokenEndpoint ? 1 : 0;

return this.retry(async () => this.sendInternal(config, dto), retries)
return this.retry(
async () => this.sendInternal(config, dto, datatarget),
retries
);
}

async sendInternal(
config: FiwareDataTargetConfiguration,
dto: TransformedPayloadDto
dto: TransformedPayloadDto,
dataTarget: DataTarget
): Promise<DataTargetSendStatus> {

const endpointUrl = `${config.url}/ngsi-ld/v1/entityOperations/upsert/`;
const target = `FiwareDataTarget(${endpointUrl})`;

Expand All @@ -56,42 +63,45 @@ export class FiwareDataTargetService extends BaseDataTargetService {
);
if (!result.status.toString().startsWith("2")) {
this.logger.warn(
`Got a non-2xx status-code: ${result.status.toString()} and message: ${result.statusText
`Got a non-2xx status-code: ${result.status.toString()} and message: ${
result.statusText
}`
);
}
return this.success(target);
} catch (err) {
this.logger.error(`FiwareDataTarget got error: ${err}`);
await this.authenticationTokenProvider.clearConfig(config);
return this.failure(target, err);
return this.failure(target, err, dataTarget);
}
}

async retry(action: () => Promise<DataTargetSendStatus>, retries: number): Promise<DataTargetSendStatus> {
async retry(
action: () => Promise<DataTargetSendStatus>,
retries: number
): Promise<DataTargetSendStatus> {
do {
const result = await action()
const result = await action();
if (result.status === SendStatus.ERROR && retries > 0) {
this.logger.warn('Sending request to Fiware failed. Retrying...')
this.logger.warn("Sending request to Fiware failed. Retrying...");
retries--;
continue
continue;
} else {
return result
return result;
}
}
while (true)
} while (true);
}

async makeAxiosConfiguration(
config: FiwareDataTargetConfiguration
): Promise<AxiosRequestConfig> {

const axiosConfig: AxiosRequestConfig = {
timeout: config.timeout,
headers: this.getHeaders(config),
};

if (config.authorizationType !== null &&
if (
config.authorizationType !== null &&
config.authorizationType !== AuthorizationType.NO_AUTHORIZATION
) {
if (config.authorizationType === AuthorizationType.HTTP_BASIC_AUTHORIZATION) {
Expand All @@ -106,20 +116,20 @@ export class FiwareDataTargetService extends BaseDataTargetService {
} else if (
config.authorizationType === AuthorizationType.OAUTH_AUTHORIZATION
) {
const token = await this.authenticationTokenProvider.getToken(config)
const token = await this.authenticationTokenProvider.getToken(config);
axiosConfig.headers["Authorization"] = `Bearer ${token}`;
}
}
return axiosConfig;
}

getHeaders(config: FiwareDataTargetConfiguration): any {
let headers: any = {}
let headers: any = {};

if (config.context) {
headers = {
"Content-Type": "application/json",
Link: `<${config.context}>; rel="http://www.w3.org/ns/json-ld#context"; type="application/ld+json"`
Link: `<${config.context}>; rel="http://www.w3.org/ns/json-ld#context"; type="application/ld+json"`,
};
} else {
headers = {
Expand Down
6 changes: 4 additions & 2 deletions src/services/data-targets/http-push-data-target.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ export class HttpPushDataTargetService extends BaseDataTargetService {
rawBody: JSON.stringify(dto.payload),
mimeType: "application/json",
};
const config: HttpPushDataTargetConfiguration = (datatarget as HttpPushDataTarget).toConfiguration();
const config: HttpPushDataTargetConfiguration = (
datatarget as HttpPushDataTarget
).toConfiguration();

// Setup HTTP client
const axiosConfig = HttpPushDataTargetService.makeAxiosConfiguration(
Expand Down Expand Up @@ -55,7 +57,7 @@ export class HttpPushDataTargetService extends BaseDataTargetService {
} catch (err) {
// TODO: Error handling for common errors
this.logger.error(`HttpPushDataTarget got error: ${err}`);
return this.failure(target, err);
return this.failure(target, err, datatarget);
}
}

Expand Down
10 changes: 8 additions & 2 deletions src/services/data-targets/mqtt-data-target.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ export class MqttDataTargetService extends BaseDataTargetService {
dto: TransformedPayloadDto,
onDone: (status: DataTargetSendStatus, targetType: DataTargetType) => void
): void {
const config: MqttDataTargetConfiguration = (datatarget as MqttDataTarget).toConfiguration();
const config: MqttDataTargetConfiguration = (
datatarget as MqttDataTarget
).toConfiguration();

// Setup client
const client = mqtt.connect(config.url, {
Expand All @@ -46,7 +48,11 @@ export class MqttDataTargetService extends BaseDataTargetService {
(err, packet) => {
try {
if (err) {
const status = this.failure(targetForLogging, err?.message);
const status = this.failure(
targetForLogging,
err?.message,
datatarget
);
onDone(status, DataTargetType.MQTT);
} else {
this.logger.debug(
Expand Down