diff --git a/packages/grpc-js-core/src/call-credentials-filter.ts b/packages/grpc-js-core/src/call-credentials-filter.ts index 678124592..4633ffe88 100644 --- a/packages/grpc-js-core/src/call-credentials-filter.ts +++ b/packages/grpc-js-core/src/call-credentials-filter.ts @@ -13,7 +13,7 @@ export class CallCredentialsFilter extends BaseFilter implements Filter { async sendMetadata(metadata: Promise): Promise { // TODO(murgatroid99): pass real options to generateMetadata - let credsMetadata = this.credentials.generateMetadata.bind({}); + let credsMetadata = this.credentials.generateMetadata({}); let resultMetadata = await metadata; resultMetadata.merge(await credsMetadata); return resultMetadata; diff --git a/packages/grpc-js-core/src/call-credentials.ts b/packages/grpc-js-core/src/call-credentials.ts index 3be454d5f..278cb163b 100644 --- a/packages/grpc-js-core/src/call-credentials.ts +++ b/packages/grpc-js-core/src/call-credentials.ts @@ -46,7 +46,7 @@ class ComposedCallCredentials implements CallCredentials { class SingleCallCredentials implements CallCredentials { constructor(private metadataGenerator: CallMetadataGenerator) {} - async generateMetadata(options: {}): Promise { + generateMetadata(options: {}): Promise { return new Promise((resolve, reject) => { this.metadataGenerator(options, (err, metadata) => { if (metadata !== undefined) { @@ -64,8 +64,8 @@ class SingleCallCredentials implements CallCredentials { } class EmptyCallCredentials implements CallCredentials { - async generateMetadata(options: {}): Promise { - return new Metadata(); + generateMetadata(options: {}): Promise { + return Promise.resolve(new Metadata()); } compose(other: CallCredentials): CallCredentials { diff --git a/packages/grpc-js-core/src/call-stream.ts b/packages/grpc-js-core/src/call-stream.ts index f62e42afa..3c3b5cfa3 100644 --- a/packages/grpc-js-core/src/call-stream.ts +++ b/packages/grpc-js-core/src/call-stream.ts @@ -282,6 +282,9 @@ export class Http2CallStream extends Duplex implements CallStream { let code: Status; let details = ''; switch (errorCode) { + case http2.constants.NGHTTP2_NO_ERROR: + code = Status.OK; + break; case http2.constants.NGHTTP2_REFUSED_STREAM: code = Status.UNAVAILABLE; break; diff --git a/packages/grpc-js-core/src/call.ts b/packages/grpc-js-core/src/call.ts index ba38a41ca..a4ae44790 100644 --- a/packages/grpc-js-core/src/call.ts +++ b/packages/grpc-js-core/src/call.ts @@ -38,6 +38,17 @@ export interface Call extends EventEmitter { event: 'metadata', listener: (metadata: Metadata) => void): this; removeListener(event: 'metadata', listener: (metadata: Metadata) => void): this; + + addListener(event: 'status', listener: (status: StatusObject) => void): this; + emit(event: 'status', status: StatusObject): boolean; + on(event: 'status', listener: (status: StatusObject) => void): this; + once(event: 'status', listener: (status: StatusObject) => void): this; + prependListener(event: 'status', listener: (status: StatusObject) => void): + this; + prependOnceListener( + event: 'status', listener: (status: StatusObject) => void): this; + removeListener(event: 'status', listener: (status: StatusObject) => void): + this; } export interface ClientUnaryCall extends Call {} @@ -48,6 +59,9 @@ export class ClientUnaryCallImpl extends EventEmitter implements Call { call.on('metadata', (metadata: Metadata) => { this.emit('metadata', metadata); }); + call.on('status', (status: StatusObject) => { + this.emit('status', status); + }); } cancel(): void { @@ -70,17 +84,6 @@ export interface ClientReadableStream extends prependListener(event: string, listener: Function): this; prependOnceListener(event: string, listener: Function): this; removeListener(event: string, listener: Function): this; - - addListener(event: 'status', listener: (status: StatusObject) => void): this; - emit(event: 'status', status: StatusObject): boolean; - on(event: 'status', listener: (status: StatusObject) => void): this; - once(event: 'status', listener: (status: StatusObject) => void): this; - prependListener(event: 'status', listener: (status: StatusObject) => void): - this; - prependOnceListener( - event: 'status', listener: (status: StatusObject) => void): this; - removeListener(event: 'status', listener: (status: StatusObject) => void): - this; } export interface ClientWritableStream extends @@ -190,6 +193,9 @@ export class ClientWritableStreamImpl extends Writable implements call.on('metadata', (metadata: Metadata) => { this.emit('metadata', metadata); }); + call.on('status', (status: StatusObject) => { + this.emit('status', status); + }); } cancel(): void { diff --git a/packages/grpc-js-core/src/channel-credentials.ts b/packages/grpc-js-core/src/channel-credentials.ts index 0d87d5de1..419e3d17e 100644 --- a/packages/grpc-js-core/src/channel-credentials.ts +++ b/packages/grpc-js-core/src/channel-credentials.ts @@ -79,7 +79,14 @@ class SecureChannelCredentialsImpl extends ChannelCredentialsImpl { } } +function verifyIsBufferOrNull(obj: any, friendlyName: string): void { + if (obj && !(obj instanceof Buffer)) { + throw new TypeError(`${friendlyName}, if provided, must be a Buffer.`); + } +} + export namespace ChannelCredentials { + /** * Return a new ChannelCredentials instance with a given set of credentials. * The resulting instance can be used to construct a Channel that communicates @@ -91,6 +98,9 @@ export namespace ChannelCredentials { export function createSsl( rootCerts?: Buffer|null, privateKey?: Buffer|null, certChain?: Buffer|null): ChannelCredentials { + verifyIsBufferOrNull(rootCerts, 'Root certificate'); + verifyIsBufferOrNull(privateKey, 'Private key'); + verifyIsBufferOrNull(certChain, 'Certificate chain'); if (privateKey && !certChain) { throw new Error( 'Private key must be given with accompanying certificate chain'); diff --git a/packages/grpc-js-core/src/channel.ts b/packages/grpc-js-core/src/channel.ts index d28d8d592..9b28d8c1a 100644 --- a/packages/grpc-js-core/src/channel.ts +++ b/packages/grpc-js-core/src/channel.ts @@ -1,6 +1,6 @@ import {EventEmitter} from 'events'; import * as http2 from 'http2'; -import {SecureContext} from 'tls'; +import {checkServerIdentity, SecureContext, PeerCertificate} from 'tls'; import * as url from 'url'; import {CallCredentials} from './call-credentials'; @@ -57,7 +57,7 @@ function uniformRandom(min:number, max: number) { export interface Channel extends EventEmitter { createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream; - connect(callback: () => void): void; + connect(): Promise; getConnectivityState(): ConnectivityState; close(): void; @@ -110,7 +110,7 @@ export class Http2Channel extends EventEmitter implements Channel { break; case ConnectivityState.IDLE: case ConnectivityState.SHUTDOWN: - if (this.subChannel !== null) { + if (this.subChannel) { this.subChannel.shutdown({graceful: true}); this.subChannel.removeListener('connect', this.subChannelConnectCallback); this.subChannel.removeListener('close', this.subChannelCloseCallback); @@ -137,7 +137,19 @@ export class Http2Channel extends EventEmitter implements Channel { if (secureContext === null) { subChannel = http2.connect(this.authority); } else { - subChannel = http2.connect(this.authority, {secureContext}); + const connectionOptions: http2.SecureClientSessionOptions = { + secureContext, + } + // If provided, the value of grpc.ssl_target_name_override should be used + // to override the target hostname when checking server identity. + // This option is used for testing only. + if (this.options['grpc.ssl_target_name_override']) { + const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'] as string; + connectionOptions.checkServerIdentity = (host: string, cert: PeerCertificate): Error | undefined => { + return checkServerIdentity(sslTargetNameOverride, cert); + } + } + subChannel = http2.connect(this.authority, connectionOptions); } this.subChannel = subChannel; let now = new Date(); @@ -190,34 +202,35 @@ export class Http2Channel extends EventEmitter implements Channel { methodName: string, stream: Http2CallStream, metadata: Metadata) { let finalMetadata: Promise = stream.filterStack.sendMetadata(Promise.resolve(metadata)); - this.connect(() => { - finalMetadata.then( - (metadataValue) => { - let headers = metadataValue.toHttp2Headers(); - headers[HTTP2_HEADER_AUTHORITY] = this.authority.hostname; - headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; - headers[HTTP2_HEADER_METHOD] = 'POST'; - headers[HTTP2_HEADER_PATH] = methodName; - headers[HTTP2_HEADER_TE] = 'trailers'; - if (stream.getStatus() === null) { - if (this.connectivityState === ConnectivityState.READY) { - let session: http2.ClientHttp2Session = - (this.subChannel as http2.ClientHttp2Session); - stream.attachHttp2Stream(session.request(headers)); - } else { - /* In this case, we lost the connection while finalizing - * metadata. That should be very unusual */ - setImmediate(() => { - this.startHttp2Stream(methodName, stream, metadata); - }); - } - } - }, - (error) => { - stream.cancelWithStatus( - Status.UNKNOWN, 'Failed to generate metadata'); - }); - }); + Promise.all([finalMetadata, this.connect()]) + .then(([metadataValue]) => { + let headers = metadataValue.toHttp2Headers(); + headers[HTTP2_HEADER_AUTHORITY] = this.authority.hostname; + headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; + headers[HTTP2_HEADER_METHOD] = 'POST'; + headers[HTTP2_HEADER_PATH] = methodName; + headers[HTTP2_HEADER_TE] = 'trailers'; + if (stream.getStatus() === null) { + if (this.connectivityState === ConnectivityState.READY) { + const session: http2.ClientHttp2Session = this.subChannel!; + // Prevent the HTTP/2 session from keeping the process alive. + // TODO(kjin): Monitor nodejs/node#17620, which adds unref + // directly to the Http2Session object. + session.socket.unref(); + stream.attachHttp2Stream(session.request(headers)); + } else { + /* In this case, we lost the connection while finalizing + * metadata. That should be very unusual */ + setImmediate(() => { + this.startHttp2Stream(methodName, stream, metadata); + }); + } + } + }).catch((error: Error & { code: number }) => { + // We assume the error code isn't 0 (Status.OK) + stream.cancelWithStatus(error.code || Status.UNKNOWN, + `Getting metadata from plugin failed with error: ${error.message}`); + }); } createStream(methodName: string, metadata: Metadata, options: CallOptions): @@ -236,13 +249,15 @@ export class Http2Channel extends EventEmitter implements Channel { return stream; } - connect(callback: () => void): void { - this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING); - if (this.connectivityState === ConnectivityState.READY) { - setImmediate(callback); - } else { - this.once('connect', callback); - } + connect(): Promise { + return new Promise((resolve) => { + this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING); + if (this.connectivityState === ConnectivityState.READY) { + setImmediate(resolve); + } else { + this.once('connect', resolve); + } + }); } getConnectivityState(): ConnectivityState { diff --git a/packages/grpc-js-core/src/client.ts b/packages/grpc-js-core/src/client.ts index 2b040e6be..13849ab72 100644 --- a/packages/grpc-js-core/src/client.ts +++ b/packages/grpc-js-core/src/client.ts @@ -35,7 +35,7 @@ export class Client { void { let cb: (error: Error|null) => void = once(callback); let callbackCalled = false; - this.channel.connect(() => { + this.channel.connect().then(() => { cb(null); }); if (deadline !== Infinity) { diff --git a/packages/grpc-js-core/src/deadline-filter.ts b/packages/grpc-js-core/src/deadline-filter.ts index 66b181076..c3ed045bf 100644 --- a/packages/grpc-js-core/src/deadline-filter.ts +++ b/packages/grpc-js-core/src/deadline-filter.ts @@ -7,6 +7,18 @@ import {Metadata} from './metadata'; const units: [string, number][] = [['m', 1], ['S', 1000], ['M', 60 * 1000], ['H', 60 * 60 * 1000]]; +function getDeadline(deadline: number) { + let now = (new Date()).getTime(); + let timeoutMs = Math.max(deadline - now, 0); + for (let [unit, factor] of units) { + let amount = timeoutMs / factor; + if (amount < 1e8) { + return String(Math.ceil(amount)) + unit; + } + } + throw new Error('Deadline is too far in the future'); +} + export class DeadlineFilter extends BaseFilter implements Filter { private deadline: number; constructor( @@ -36,22 +48,10 @@ export class DeadlineFilter extends BaseFilter implements Filter { if (this.deadline === Infinity) { return await metadata; } - let timeoutString: Promise = - new Promise((resolve, reject) => { - this.channel.connect(() => { - let now = (new Date()).getTime(); - let timeoutMs = this.deadline - now; - for (let [unit, factor] of units) { - let amount = timeoutMs / factor; - if (amount < 1e8) { - resolve(String(Math.ceil(amount)) + unit); - return; - } - } - }); - }); - let finalMetadata = await metadata; - finalMetadata.set('grpc-timeout', await timeoutString); + await this.channel.connect(); + const timeoutString = getDeadline(this.deadline); + const finalMetadata = await metadata; + finalMetadata.set('grpc-timeout', timeoutString); return finalMetadata; } }