Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "azure-iothub",
"version": "1.16.1",
"version": "1.17.0-preview.1",
"description": "Azure IoT SDK - IoT Hub",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
59 changes: 57 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@

import { EventEmitter } from 'events';
import { Agent } from 'https';
import { anHourFromNow, errors, results, Message, Receiver, SharedAccessSignature } from 'azure-iot-common';
import { anHourFromNow, errors, results, Message, Receiver, SharedAccessSignature, callbackToPromise, encodeUriComponentStrict } from 'azure-iot-common';
import { RetryOperation, RetryPolicy, ExponentialBackOffWithJitter } from 'azure-iot-common';
import * as ConnectionString from './connection_string';
import { Amqp } from './amqp';
import { DeviceMethod } from './device_method';
import { RestApiClient } from 'azure-iot-http-base';
import { DeviceMethodParams, IncomingMessageCallback, createResultWithIncomingMessage, ResultWithIncomingMessage } from './interfaces';
import { DeviceMethodParams, IncomingMessageCallback, createResultWithIncomingMessage, ResultWithIncomingMessage, StreamInitiation } from './interfaces';
import { StreamInitiationResult } from './stream_initiation_result';
import { Callback, tripleValueCallbackToPromise } from 'azure-iot-common';
import { IncomingMessage } from 'http';
import { TokenCredential } from '@azure/core-http';
import { versionQueryString } from './version';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const packageJson = require('../package.json');
Expand Down Expand Up @@ -395,6 +397,59 @@ export class Client extends EventEmitter {
this._retryPolicy = policy;
}

/**
* Initializes a new cloud-to-device stream.
*
* @param deviceId target device identifier.
* @param streamInitiation stream initialization request.
* @param callback function called when the stream has been successfully initialized.
*/
initiateStream(deviceId: string, streamInitiation: StreamInitiation): Promise<StreamInitiationResult>;
initiateStream(deviceId: string, streamInitiation: StreamInitiation, callback: (err: Error, result?: StreamInitiationResult) => void): void;
initiateStream(deviceId: string, streamInitiation: StreamInitiation, callback?: (err: Error, result?: StreamInitiationResult) => void): Promise<StreamInitiationResult> | void {
/*Codes_SRS_NODE_IOTHUB_CLIENT_16_031: [The `initiateStream` method shall throw a `ReferenceError` if the `deviceId` is argument falsy.]*/
if (!deviceId) {
throw new ReferenceError('\'deviceId\' cannot be \'' + deviceId + '\'');
}

/*Codes_SRS_NODE_IOTHUB_CLIENT_16_032: [The `initiateStream` method shall throw a `ReferenceError` if the `streamInitiation` argument is falsy.]*/
if (!streamInitiation) {
throw new ReferenceError('\'streamInitiation\' cannot be \'' + streamInitiation + '\'');
}

return callbackToPromise((_callback) => {
/*Codes_SRS_NODE_IOTHUB_CLIENT_16_033: [The `initiateStream` method shall send an HTTP request formatted as follows:
```
POST /twins/encodeUriComponentStrict(<deviceId>)/streams/encodeUriComponentStrict(streamInitiation.streamName)

iothub-streaming-connect-timeout-in-seconds: <streamInitiation.connectTimeoutInSeconds>
iothub-streaming-response-timeout-in-seconds: <streamInitiation.responseTimeoutInSeconds>
```]*/
const path = '/twins/' + encodeUriComponentStrict(deviceId) + '/streams/' + streamInitiation.streamName + versionQueryString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job on getting the encodeuricomponentstring in there. I forgot about that.

const httpHeaders = {
'iothub-streaming-connect-timeout-in-seconds': streamInitiation.connectTimeoutInSeconds,
'iothub-streaming-response-timeout-in-seconds': streamInitiation.responseTimeoutInSeconds
};

/*Codes_SRS_NODE_IOTHUB_CLIENT_16_034: [The `initiateStream` method shall have a custom timeout set to the value in milliseconds of the sum of the streamInitiation.connectTimeoutInSeconds and streamInitiation.responseTimeoutInSeconds.]*/
const requestTimeout = 1000 * (streamInitiation.connectTimeoutInSeconds + streamInitiation.responseTimeoutInSeconds);

this._restApiClient.executeApiCall('POST', path, httpHeaders, undefined, requestTimeout, (err, result, response) => {
if (err) {
/*Codes_SRS_NODE_IOTHUB_CLIENT_16_035: [The `initiateStream` method shall call its callback with an error if the RestApiClient fails to execute the API call.]*/
_callback(err);
} else {
/*Codes_SRS_NODE_IOTHUB_CLIENT_16_036: [The `initiateStream` method shall create a `StreamInitiationResult` object from the received HTTP response as follows:
streamInitiationResult.authorizationToken: response.headers['iothub-streaming-auth-token']
streamInitiationResult.uri: response.headers['iothub-streaming-url']
streamInitiationResult.isAccepted: true if response.headers['iothub-streaming-is-accepted'] is 'True', false otherwise.]*/
const streamInitResult = StreamInitiationResult.fromHttpResponse(response.headers, result);
_callback(null, streamInitResult);
}
});
}, callback);
}

private _disconnectHandler(reason: string): void {
/*Codes_SRS_NODE_IOTHUB_CLIENT_16_004: [** The `disconnect` event shall be emitted when the client is disconnected from the server.]*/
const evt = new results.Disconnected();
Expand Down
35 changes: 35 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,38 @@ export interface DeviceMethodParams {
*/
connectTimeoutInSeconds?: number; // default is 0
}

/**
* Describes the initial request sent by the service to the device to initiate a cloud-to-device TCP streaming connection.
*/
export interface StreamInitiation {
/**
* Name of the requested stream (unique in case multiple streams are used)
*/
streamName: string;

/**
* Timeout (in seconds) to wait for a connected device to respond.
*/
responseTimeoutInSeconds: number;

/**
* Timeout (in seconds) to wait for a disconnected device to connect.
*/
connectTimeoutInSeconds: number;

/**
* ???
*/
contentType: string;

/**
* ???
*/
contentEncoding: string;

/**
* ???
*/
payload: any;
}
39 changes: 39 additions & 0 deletions src/stream_initiation_result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

'use strict';
import { IncomingHttpHeaders } from 'http';

/**
* Describes the response sent by the service when the device has accepted (or rejected) the cloud-to-device TCP streaming request
*/
export class StreamInitiationResult {
/**
* Token used in the Authorization header to authenticate the websocket connection
*/
authorizationToken: string;

/**
* Boolean indicating whether the TCP streaming request has been accepted by the device
*/
isAccepted: boolean;

/**
* URI of the websocket used to connect to the device
*/
uri: string;

/**
* Creates a StreamInitiationResult from the response to the StreamInitiation request.
*
* @param headers headers received in the HTTP response to the stream initiation request
* @param _body body of the response to the stream initiation request
*/
static fromHttpResponse(headers: IncomingHttpHeaders, _body: string): StreamInitiationResult {
const result = new StreamInitiationResult();
result.authorizationToken = headers['iothub-streaming-auth-token'] as string;
result.isAccepted = headers['iothub-streaming-is-accepted'] === 'True';
result.uri = headers['iothub-streaming-url'] as string;
return result;
}
}
2 changes: 1 addition & 1 deletion src/version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

'use strict';

export const apiVersion = '2021-04-12';
export const apiVersion = '2018-10-20-preview';

export function versionQueryString(): string {
return '?api-version=' + apiVersion;
Expand Down
155 changes: 155 additions & 0 deletions test/_client_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const Amqp = require('../dist/amqp.js').Amqp;
const Client = require('../dist/client.js').Client;
const Message = require('azure-iot-common').Message;
const errors = require('azure-iot-common').errors;
const versionQueryString = require('../dist/version').versionQueryString;
const SimulatedAmqp = require('./amqp_simulated.js');
const transportSpecificTests = require('./_client_common_testrun.js');

Expand Down Expand Up @@ -613,6 +614,160 @@ describe('Client', function () {
client.open(function () {});
});
});

describe('initiateStream', function () {
/*Tests_SRS_NODE_IOTHUB_CLIENT_16_031: [The `initiateStream` method shall throw a `ReferenceError` if the `deviceId` is argument falsy.]*/
[undefined, null, ''].forEach(function (badDeviceId) {
it('throws a ReferenceError if \'deviceId\' is \'' + badDeviceId + '\'', function () {
let client = new Client(new EventEmitter(), {});
assert.throws(function () {
client.initiateStream(badDeviceId, {}, function () {});
}, ReferenceError);
});
});

/*Tests_SRS_NODE_IOTHUB_CLIENT_16_032: [The `initiateStream` method shall throw a `ReferenceError` if the `streamInitiation` argument is falsy.]*/
[undefined, null].forEach(function (badStreamInitiation) {
it('throws a ReferenceError if \'streamInitiation\' is \'' + badStreamInitiation + '\'', function () {
let client = new Client(new EventEmitter(), {});
assert.throws(function () {
client.initiateStream('deviceId', badStreamInitiation, function () {});
}, ReferenceError);
});
});

/*Tests_SRS_NODE_IOTHUB_CLIENT_16_033: [The `initiateStream` method shall send an HTTP request formatted as follows:
```
POST /twins/encodeUriComponentStrict(<deviceId>)/streams/encodeUriComponentStrict(streamInitiation.streamName)

iothub-streaming-connect-timeout-in-seconds: <streamInitiation.connectTimeoutInSeconds>
iothub-streaming-response-timeout-in-seconds: <streamInitiation.responseTimeoutInSeconds>
```]*/
it('sends a well-formatted HTTP request', function (testCallback) {
const fakeDeviceId = 'fakeDevice';
const fakeStreamInitiation = {
connectTimeoutInSeconds: 42,
responseTimeoutInSeconds: 1337,
streamName: 'streamName'
};
const fakeResponse = {
statusCode: 200,
headers: {
'iothub-streaming-is-accepted': 'True',
'iothub-streaming-url': 'wss://test',
'iothub-streaming-authToken': 'token',
}
};
let fakeResult = "";
let fakeRestClient = {
executeApiCall: function (method, path, headers, body, timeout, callback) {
assert.strictEqual(method, 'POST');
assert.strictEqual(path, '/twins/' + fakeDeviceId + '/streams/' + fakeStreamInitiation.streamName + versionQueryString());
assert.strictEqual(headers['iothub-streaming-connect-timeout-in-seconds'], fakeStreamInitiation.connectTimeoutInSeconds);
assert.strictEqual(headers['iothub-streaming-response-timeout-in-seconds'], fakeStreamInitiation.responseTimeoutInSeconds);
/*Tests_SRS_NODE_IOTHUB_CLIENT_16_034: [The `initiateStream` method shall have a custom timeout set to the value in milliseconds of the sum of the streamInitiation.connectTimeoutInSeconds and streamInitiation.responseTimeoutInSeconds.]*/
assert.strictEqual(timeout, 1000 * (fakeStreamInitiation.connectTimeoutInSeconds + fakeStreamInitiation.responseTimeoutInSeconds));
callback(null, fakeResult, fakeResponse);
}
};

let client = new Client(new EventEmitter(), fakeRestClient);
client.initiateStream(fakeDeviceId, fakeStreamInitiation, function (err) {
testCallback(err);
});
});

it('returns a promise if no callback is specified', function (testCallback) {
const fakeDeviceId = 'fakeDevice';
const fakeStreamInitiation = {
connectTimeoutInSeconds: 42,
responseTimeoutInSeconds: 1337,
streamName: 'streamName'
};
const fakeResponse = {
statusCode: 200,
headers: {
'iothub-streaming-is-accepted': 'True',
'iothub-streaming-url': 'wss://test',
'iothub-streaming-authToken': 'token',
}
};
let fakeRestClient = {
executeApiCall: function (method, path, headers, body, timeout, callback) {
callback(null, undefined, fakeResponse);
}
};

let client = new Client(new EventEmitter(), fakeRestClient);
let resultPromise = client.initiateStream(fakeDeviceId, fakeStreamInitiation);
assert.instanceOf(resultPromise, Promise);
resultPromise.then(function (result) {
assert.strictEqual(result.uri, fakeResponse.headers['iothub-streaming-url']);
assert.strictEqual(result.authorizationToken, fakeResponse.headers['iothub-streaming-auth-token']);
assert.strictEqual(result.isAccepted, true);
testCallback();
});
});

/*Tests_SRS_NODE_IOTHUB_CLIENT_16_036: [The `initiateStream` method shall create a `StreamInitiationResult` object from the received HTTP response as follows:
streamInitiationResult.authorizationToken: response.headers['iothub-streaming-auth-token']
streamInitiationResult.uri: response.headers['iothub-streaming-url']
streamInitiationResult.isAccepted: true if response.headers['iothub-streaming-is-accepted'] is 'True', false otherwise.]*/
it('calls the callback with a well-formed response', function (testCallback) {
const fakeDeviceId = 'fakeDevice';
const fakeStreamInitiation = {
connectTimeoutInSeconds: 42,
responseTimeoutInSeconds: 1337,
streamName: 'streamName'
};
const fakeResponse = {
statusCode: 200,
headers: {
'iothub-streaming-is-accepted': 'True',
'iothub-streaming-url': 'wss://test',
'iothub-streaming-authToken': 'token',
}
};
let fakeRestClient = {
executeApiCall: function (method, path, headers, body, timeout, callback) {
callback(null, undefined, fakeResponse);
}
};

let client = new Client(new EventEmitter(), fakeRestClient);
client.initiateStream(fakeDeviceId, fakeStreamInitiation, function (err, result) {
if (err) {
testCallback(err);
} else {
assert.strictEqual(result.uri, fakeResponse.headers['iothub-streaming-url']);
assert.strictEqual(result.authorizationToken, fakeResponse.headers['iothub-streaming-auth-token']);
assert.strictEqual(result.isAccepted, true);
testCallback();
}
});
});

/*Tests_SRS_NODE_IOTHUB_CLIENT_16_035: [The `initiateStream` method shall call its callback with an error if the RestApiClient fails to execute the API call.]*/
it('calls the callback with an error if the RestApiClient fails to execute the API call', function (testCallback) {
const fakeError = new Error('fake');
const fakeStreamInitiation = {
connectTimeoutInSeconds: 42,
responseTimeoutInSeconds: 1337,
streamName: 'streamName'
};
let fakeRestClient = {
executeApiCall: function (method, path, headers, body, timeout, callback) {
callback(fakeError);
}
};

let client = new Client(new EventEmitter(), fakeRestClient);
client.initiateStream('deviceId', fakeStreamInitiation, function (err) {
assert.strictEqual(err, fakeError);
testCallback();
});
});
});
});

const fakeRegistry = {
Expand Down