Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js-core/src/call-credentials-filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class CallCredentialsFilter extends BaseFilter implements Filter {

async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
// TODO(murgatroid99): pass real options to generateMetadata
let credsMetadata = this.credentials.generateMetadata.bind({});
let credsMetadata = this.credentials.generateMetadata({});
let resultMetadata = await metadata;
resultMetadata.merge(await credsMetadata);
return resultMetadata;
Expand Down
6 changes: 3 additions & 3 deletions packages/grpc-js-core/src/call-credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ComposedCallCredentials implements CallCredentials {
class SingleCallCredentials implements CallCredentials {
constructor(private metadataGenerator: CallMetadataGenerator) {}

async generateMetadata(options: {}): Promise<Metadata> {
generateMetadata(options: {}): Promise<Metadata> {
return new Promise<Metadata>((resolve, reject) => {
this.metadataGenerator(options, (err, metadata) => {
if (metadata !== undefined) {
Expand All @@ -64,8 +64,8 @@ class SingleCallCredentials implements CallCredentials {
}

class EmptyCallCredentials implements CallCredentials {
async generateMetadata(options: {}): Promise<Metadata> {
return new Metadata();
generateMetadata(options: {}): Promise<Metadata> {
return Promise.resolve(new Metadata());
}

compose(other: CallCredentials): CallCredentials {
Expand Down
3 changes: 3 additions & 0 deletions packages/grpc-js-core/src/call-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ export class Http2CallStream extends Duplex implements CallStream {
let code: Status;
let details = '';
switch (errorCode) {
case http2.constants.NGHTTP2_NO_ERROR:
code = Status.OK;
break;
case http2.constants.NGHTTP2_REFUSED_STREAM:
code = Status.UNAVAILABLE;
break;
Expand Down
28 changes: 17 additions & 11 deletions packages/grpc-js-core/src/call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ export interface Call extends EventEmitter {
event: 'metadata', listener: (metadata: Metadata) => void): this;
removeListener(event: 'metadata', listener: (metadata: Metadata) => void):
this;

addListener(event: 'status', listener: (status: StatusObject) => void): this;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to this file do not match the existing implementation, which only emits the status event for calls with response streaming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like that's not the case -- this test depends on 'status' being emitted for a unary call:

it('should be present when a unary call succeeds', function(done) {
var call = client.unary({error: false}, function(err, data) {
assert.ifError(err);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});

emit(event: 'status', status: StatusObject): boolean;
on(event: 'status', listener: (status: StatusObject) => void): this;
once(event: 'status', listener: (status: StatusObject) => void): this;
prependListener(event: 'status', listener: (status: StatusObject) => void):
this;
prependOnceListener(
event: 'status', listener: (status: StatusObject) => void): this;
removeListener(event: 'status', listener: (status: StatusObject) => void):
this;
}

export interface ClientUnaryCall extends Call {}
Expand All @@ -48,6 +59,9 @@ export class ClientUnaryCallImpl extends EventEmitter implements Call {
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
call.on('status', (status: StatusObject) => {
this.emit('status', status);
});
}

cancel(): void {
Expand All @@ -70,17 +84,6 @@ export interface ClientReadableStream<ResponseType> extends
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;

addListener(event: 'status', listener: (status: StatusObject) => void): this;
emit(event: 'status', status: StatusObject): boolean;
on(event: 'status', listener: (status: StatusObject) => void): this;
once(event: 'status', listener: (status: StatusObject) => void): this;
prependListener(event: 'status', listener: (status: StatusObject) => void):
this;
prependOnceListener(
event: 'status', listener: (status: StatusObject) => void): this;
removeListener(event: 'status', listener: (status: StatusObject) => void):
this;
}

export interface ClientWritableStream<RequestType> extends
Expand Down Expand Up @@ -190,6 +193,9 @@ export class ClientWritableStreamImpl<RequestType> extends Writable implements
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
call.on('status', (status: StatusObject) => {
this.emit('status', status);
});
}

cancel(): void {
Expand Down
10 changes: 10 additions & 0 deletions packages/grpc-js-core/src/channel-credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ class SecureChannelCredentialsImpl extends ChannelCredentialsImpl {
}
}

function verifyIsBufferOrNull(obj: any, friendlyName: string): void {
if (obj && !(obj instanceof Buffer)) {
throw new TypeError(`${friendlyName}, if provided, must be a Buffer.`);
}
}

export namespace ChannelCredentials {

/**
* Return a new ChannelCredentials instance with a given set of credentials.
* The resulting instance can be used to construct a Channel that communicates
Expand All @@ -91,6 +98,9 @@ export namespace ChannelCredentials {
export function createSsl(
rootCerts?: Buffer|null, privateKey?: Buffer|null,
certChain?: Buffer|null): ChannelCredentials {
verifyIsBufferOrNull(rootCerts, 'Root certificate');
verifyIsBufferOrNull(privateKey, 'Private key');
verifyIsBufferOrNull(certChain, 'Certificate chain');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these not redundant with the argument type annotations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TS doesn't auto-generate sentinel checks in JS, its typings are only enforced for other TS files. The credentials test depends on these checks existing, and is written in JS.

if (privateKey && !certChain) {
throw new Error(
'Private key must be given with accompanying certificate chain');
Expand Down
93 changes: 54 additions & 39 deletions packages/grpc-js-core/src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {EventEmitter} from 'events';
import * as http2 from 'http2';
import {SecureContext} from 'tls';
import {checkServerIdentity, SecureContext, PeerCertificate} from 'tls';
import * as url from 'url';

import {CallCredentials} from './call-credentials';
Expand Down Expand Up @@ -57,7 +57,7 @@ function uniformRandom(min:number, max: number) {
export interface Channel extends EventEmitter {
createStream(methodName: string, metadata: Metadata, options: CallOptions):
CallStream;
connect(callback: () => void): void;
connect(): Promise<void>;
getConnectivityState(): ConnectivityState;
close(): void;

Expand Down Expand Up @@ -110,7 +110,7 @@ export class Http2Channel extends EventEmitter implements Channel {
break;
case ConnectivityState.IDLE:
case ConnectivityState.SHUTDOWN:
if (this.subChannel !== null) {
if (this.subChannel) {
this.subChannel.shutdown({graceful: true});
this.subChannel.removeListener('connect', this.subChannelConnectCallback);
this.subChannel.removeListener('close', this.subChannelCloseCallback);
Expand All @@ -137,7 +137,19 @@ export class Http2Channel extends EventEmitter implements Channel {
if (secureContext === null) {
subChannel = http2.connect(this.authority);
} else {
subChannel = http2.connect(this.authority, {secureContext});
const connectionOptions: http2.SecureClientSessionOptions = {
secureContext,
}
// If provided, the value of grpc.ssl_target_name_override should be used
// to override the target hostname when checking server identity.
// This option is used for testing only.
if (this.options['grpc.ssl_target_name_override']) {
const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'] as string;
connectionOptions.checkServerIdentity = (host: string, cert: PeerCertificate): Error | undefined => {
return checkServerIdentity(sslTargetNameOverride, cert);
}
}
subChannel = http2.connect(this.authority, connectionOptions);
}
this.subChannel = subChannel;
let now = new Date();
Expand Down Expand Up @@ -190,34 +202,35 @@ export class Http2Channel extends EventEmitter implements Channel {
methodName: string, stream: Http2CallStream, metadata: Metadata) {
let finalMetadata: Promise<Metadata> =
stream.filterStack.sendMetadata(Promise.resolve(metadata));
this.connect(() => {
finalMetadata.then(
(metadataValue) => {
let headers = metadataValue.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = this.authority.hostname;
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = methodName;
headers[HTTP2_HEADER_TE] = 'trailers';
if (stream.getStatus() === null) {
if (this.connectivityState === ConnectivityState.READY) {
let session: http2.ClientHttp2Session =
(this.subChannel as http2.ClientHttp2Session);
stream.attachHttp2Stream(session.request(headers));
} else {
/* In this case, we lost the connection while finalizing
* metadata. That should be very unusual */
setImmediate(() => {
this.startHttp2Stream(methodName, stream, metadata);
});
}
}
},
(error) => {
stream.cancelWithStatus(
Status.UNKNOWN, 'Failed to generate metadata');
});
});
Promise.all([finalMetadata, this.connect()])
.then(([metadataValue]) => {
let headers = metadataValue.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = this.authority.hostname;
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = methodName;
headers[HTTP2_HEADER_TE] = 'trailers';
if (stream.getStatus() === null) {
if (this.connectivityState === ConnectivityState.READY) {
const session: http2.ClientHttp2Session = this.subChannel!;
// Prevent the HTTP/2 session from keeping the process alive.
// TODO(kjin): Monitor nodejs/node#17620, which adds unref
// directly to the Http2Session object.
session.socket.unref();
stream.attachHttp2Stream(session.request(headers));
} else {
/* In this case, we lost the connection while finalizing
* metadata. That should be very unusual */
setImmediate(() => {
this.startHttp2Stream(methodName, stream, metadata);
});
}
}
}).catch((error: Error & { code: number }) => {
// We assume the error code isn't 0 (Status.OK)
stream.cancelWithStatus(error.code || Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${error.message}`);
});
}

createStream(methodName: string, metadata: Metadata, options: CallOptions):
Expand All @@ -236,13 +249,15 @@ export class Http2Channel extends EventEmitter implements Channel {
return stream;
}

connect(callback: () => void): void {
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING);
if (this.connectivityState === ConnectivityState.READY) {
setImmediate(callback);
} else {
this.once('connect', callback);
}
connect(): Promise<void> {
return new Promise((resolve) => {
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING);
if (this.connectivityState === ConnectivityState.READY) {
setImmediate(resolve);
} else {
this.once('connect', resolve);
}
});
}

getConnectivityState(): ConnectivityState {
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class Client {
void {
let cb: (error: Error|null) => void = once(callback);
let callbackCalled = false;
this.channel.connect(() => {
this.channel.connect().then(() => {
cb(null);
});
if (deadline !== Infinity) {
Expand Down
32 changes: 16 additions & 16 deletions packages/grpc-js-core/src/deadline-filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ import {Metadata} from './metadata';
const units: [string, number][] =
[['m', 1], ['S', 1000], ['M', 60 * 1000], ['H', 60 * 60 * 1000]];

function getDeadline(deadline: number) {
let now = (new Date()).getTime();
let timeoutMs = Math.max(deadline - now, 0);
for (let [unit, factor] of units) {
let amount = timeoutMs / factor;
if (amount < 1e8) {
return String(Math.ceil(amount)) + unit;
}
}
throw new Error('Deadline is too far in the future');
}

export class DeadlineFilter extends BaseFilter implements Filter {
private deadline: number;
constructor(
Expand Down Expand Up @@ -36,22 +48,10 @@ export class DeadlineFilter extends BaseFilter implements Filter {
if (this.deadline === Infinity) {
return await metadata;
}
let timeoutString: Promise<string> =
new Promise<string>((resolve, reject) => {
this.channel.connect(() => {
let now = (new Date()).getTime();
let timeoutMs = this.deadline - now;
for (let [unit, factor] of units) {
let amount = timeoutMs / factor;
if (amount < 1e8) {
resolve(String(Math.ceil(amount)) + unit);
return;
}
}
});
});
let finalMetadata = await metadata;
finalMetadata.set('grpc-timeout', await timeoutString);
await this.channel.connect();
const timeoutString = getDeadline(this.deadline);
const finalMetadata = await metadata;
finalMetadata.set('grpc-timeout', timeoutString);
return finalMetadata;
}
}
Expand Down