Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
"blocked-at": "^1.2.0",
"fs-extra": "^10.0.1",
"globby": "^11.0.0",
"minimist": "^1.2.5"
"minimist": "^1.2.5",
"uuid": "^8.3.2"
},
"devDependencies": {
"@types/blocked-at": "^1.0.1",
Expand All @@ -25,6 +26,7 @@
"@types/node": "^16.9.6",
"@types/semver": "^7.3.9",
"@types/sinon": "^7.0.0",
"@types/uuid": "^8.3.4",
"@typescript-eslint/eslint-plugin": "^5.12.1",
"@typescript-eslint/parser": "^5.12.1",
"chai": "^4.2.0",
Expand Down
33 changes: 12 additions & 21 deletions src/FunctionLoader.ts → src/LegacyFunctionLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ import { loadScriptFile } from './loadScriptFile';
import { PackageJson } from './parsers/parsePackageJson';
import { InternalException } from './utils/InternalException';
import { nonNullProp } from './utils/nonNull';
import { RegisteredFunction } from './WorkerChannel';

export interface IFunctionLoader {
export interface ILegacyFunctionLoader {
load(functionId: string, metadata: rpc.IRpcFunctionMetadata, packageJson: PackageJson): Promise<void>;
getRpcMetadata(functionId: string): rpc.IRpcFunctionMetadata;
getCallback(functionId: string): FunctionCallback;
getFunction(functionId: string): RegisteredFunction;
}

interface LoadedFunction {
metadata: rpc.IRpcFunctionMetadata;
callback: FunctionCallback;
interface LegacyRegisteredFunction extends RegisteredFunction {
thisArg: unknown;
}

export class FunctionLoader implements IFunctionLoader {
#loadedFunctions: { [k: string]: LoadedFunction | undefined } = {};
export class LegacyFunctionLoader implements ILegacyFunctionLoader {
#loadedFunctions: { [k: string]: LegacyRegisteredFunction | undefined } = {};

async load(functionId: string, metadata: rpc.IRpcFunctionMetadata, packageJson: PackageJson): Promise<void> {
if (metadata.isProxy === true) {
Expand All @@ -33,21 +31,14 @@ export class FunctionLoader implements IFunctionLoader {
this.#loadedFunctions[functionId] = { metadata, callback, thisArg };
}

getRpcMetadata(functionId: string): rpc.IRpcFunctionMetadata {
const loadedFunction = this.#getLoadedFunction(functionId);
return loadedFunction.metadata;
}

getCallback(functionId: string): FunctionCallback {
const loadedFunction = this.#getLoadedFunction(functionId);
// `bind` is necessary to set the `this` arg, but it's also nice because it makes a clone of the function, preventing this invocation from affecting future invocations
return loadedFunction.callback.bind(loadedFunction.thisArg);
}

#getLoadedFunction(functionId: string): LoadedFunction {
getFunction(functionId: string): RegisteredFunction {
const loadedFunction = this.#loadedFunctions[functionId];
if (loadedFunction) {
return loadedFunction;
return {
metadata: loadedFunction.metadata,
// `bind` is necessary to set the `this` arg, but it's also nice because it makes a clone of the function, preventing this invocation from affecting future invocations
callback: loadedFunction.callback.bind(loadedFunction.thisArg),
};
} else {
throw new InternalException(`Function code for '${functionId}' is not loaded and cannot be invoked.`);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// Licensed under the MIT License.

import * as parseArgs from 'minimist';
import { FunctionLoader } from './FunctionLoader';
import { CreateGrpcEventStream } from './GrpcClient';
import { LegacyFunctionLoader } from './LegacyFunctionLoader';
import { setupCoreModule } from './setupCoreModule';
import { setupEventStream } from './setupEventStream';
import { startBlockedMonitor } from './utils/blockedMonitor';
Expand Down Expand Up @@ -43,7 +43,7 @@ export function startNodeWorker(args) {
throw error;
}

const channel = new WorkerChannel(eventStream, new FunctionLoader());
const channel = new WorkerChannel(eventStream, new LegacyFunctionLoader());
setupEventStream(workerId, channel);
setupCoreModule(channel);

Expand Down
17 changes: 12 additions & 5 deletions src/WorkerChannel.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { HookCallback, HookContext, HookData, ProgrammingModel } from '@azure/functions-core';
import { FunctionCallback, HookCallback, HookContext, HookData, ProgrammingModel } from '@azure/functions-core';
import { AzureFunctionsRpcMessages as rpc } from '../azure-functions-language-worker-protobuf/src/rpc';
import { Disposable } from './Disposable';
import { IFunctionLoader } from './FunctionLoader';
import { IEventStream } from './GrpcClient';
import { ILegacyFunctionLoader } from './LegacyFunctionLoader';
import { PackageJson, parsePackageJson } from './parsers/parsePackageJson';
import { ensureErrorType } from './utils/ensureErrorType';
import LogLevel = rpc.RpcLog.Level;
import LogCategory = rpc.RpcLog.RpcLogCategory;

export interface RegisteredFunction {
metadata: rpc.IRpcFunctionMetadata;
callback: FunctionCallback;
}

export class WorkerChannel {
eventStream: IEventStream;
functionLoader: IFunctionLoader;
legacyFunctionLoader: ILegacyFunctionLoader;
packageJson: PackageJson;
/**
* This will only be set after worker init request is received
Expand All @@ -40,10 +45,12 @@ export class WorkerChannel {
#preInvocationHooks: HookCallback[] = [];
#postInvocationHooks: HookCallback[] = [];
#appStartHooks: HookCallback[] = [];
functions: { [id: string]: RegisteredFunction } = {};
hasIndexedFunctions = false;

constructor(eventStream: IEventStream, functionLoader: IFunctionLoader) {
constructor(eventStream: IEventStream, legacyFunctionLoader: ILegacyFunctionLoader) {
this.eventStream = eventStream;
this.functionLoader = functionLoader;
this.legacyFunctionLoader = legacyFunctionLoader;
this.packageJson = {};
}

Expand Down
56 changes: 56 additions & 0 deletions src/coreApi/registerFunction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { FunctionCallback, FunctionMetadata, RpcBindingInfo } from '@azure/functions-core';
import { v4 as uuid } from 'uuid';
import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { Disposable } from '../Disposable';
import { InternalException } from '../utils/InternalException';
import { WorkerChannel } from '../WorkerChannel';

export function registerFunction(
channel: WorkerChannel,
metadata: FunctionMetadata,
callback: FunctionCallback
): Disposable {
if (channel.hasIndexedFunctions) {
throw new InternalException('A function can only be registered during app startup.');
}
const functionId = uuid();

const rpcMetadata: rpc.IRpcFunctionMetadata = metadata;
rpcMetadata.functionId = functionId;
// `rawBindings` is what's actually used by the host
// `bindings` is used by the js library in both the old host indexing and the new worker indexing
rpcMetadata.rawBindings = Object.entries(metadata.bindings).map(([name, binding]) => {
return convertToRawBinding(name, binding);
});
// The host validates that the `scriptFile` property is defined even though neither the host nor the worker needs it
// Long term we should adjust the host to remove that unnecessary validation, but for now we'll just set it to 'n/a'
rpcMetadata.scriptFile = 'n/a';
channel.functions[functionId] = { metadata: rpcMetadata, callback };

return new Disposable(() => {
if (channel.hasIndexedFunctions) {
throw new InternalException('A function can only be disposed during app startup.');
} else {
delete channel.functions[functionId];
}
});
}

function convertToRawBinding(name: string, binding: RpcBindingInfo): string {
const rawBinding: any = { ...binding, name };
switch (binding.direction) {
case rpc.BindingInfo.Direction.in:
rawBinding.direction = 'in';
break;
case rpc.BindingInfo.Direction.out:
rawBinding.direction = 'out';
break;
case rpc.BindingInfo.Direction.inout:
rawBinding.direction = 'inout';
break;
}
return JSON.stringify(rawBinding);
}
6 changes: 4 additions & 2 deletions src/eventHandlers/EventHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ export type SupportedRequestName =
| 'functionEnvironmentReloadRequest'
| 'functionLoadRequest'
| 'invocationRequest'
| 'workerInitRequest';
| 'workerInitRequest'
| 'functionsMetadataRequest';
export type SupportedRequest = rpc.StreamingMessage[SupportedRequestName];

export type SupportedResponseName =
| 'functionEnvironmentReloadResponse'
| 'functionLoadResponse'
| 'invocationResponse'
| 'workerInitResponse';
| 'workerInitResponse'
| 'functionMetadataResponse';
export type SupportedResponse = rpc.StreamingMessage[SupportedResponseName];

export abstract class EventHandler<
Expand Down
16 changes: 9 additions & 7 deletions src/eventHandlers/FunctionLoadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ export class FunctionLoadHandler extends EventHandler<'functionLoadRequest', 'fu

const functionId = nonNullProp(msg, 'functionId');
const metadata = nonNullProp(msg, 'metadata');
try {
await channel.functionLoader.load(functionId, metadata, channel.packageJson);
} catch (err) {
const error = ensureErrorType(err);
error.isAzureFunctionsInternalException = true;
error.message = `Worker was unable to load function ${metadata.name}: '${error.message}'`;
throw error;
if (!channel.functions[functionId]) {
try {
await channel.legacyFunctionLoader.load(functionId, metadata, channel.packageJson);
} catch (err) {
const error = ensureErrorType(err);
error.isAzureFunctionsInternalException = true;
error.message = `Worker was unable to load function ${metadata.name}: '${error.message}'`;
throw error;
}
}

return response;
Expand Down
41 changes: 41 additions & 0 deletions src/eventHandlers/FunctionsMetadataHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { WorkerChannel } from '../WorkerChannel';
import { EventHandler } from './EventHandler';
import LogCategory = rpc.RpcLog.RpcLogCategory;
import LogLevel = rpc.RpcLog.Level;

export class FunctionsMetadataHandler extends EventHandler<'functionsMetadataRequest', 'functionMetadataResponse'> {
readonly responseName = 'functionMetadataResponse';

getDefaultResponse(_msg: rpc.IFunctionsMetadataRequest): rpc.IFunctionMetadataResponse {
return {
useDefaultMetadataIndexing: true,
};
}

async handleEvent(
channel: WorkerChannel,
msg: rpc.IFunctionsMetadataRequest
): Promise<rpc.IFunctionMetadataResponse> {
const response = this.getDefaultResponse(msg);

channel.log({
message: 'Received FunctionsMetadataRequest',
level: LogLevel.Debug,
logCategory: LogCategory.System,
});

const functions = Object.values(channel.functions);
if (functions.length > 0) {
response.useDefaultMetadataIndexing = false;
response.functionMetadataResults = functions.map((f) => f.metadata);
}

channel.hasIndexedFunctions = true;

return response;
}
}
4 changes: 2 additions & 2 deletions src/eventHandlers/InvocationHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ export class InvocationHandler extends EventHandler<'invocationRequest', 'invoca

async handleEvent(channel: WorkerChannel, msg: rpc.IInvocationRequest): Promise<rpc.IInvocationResponse> {
const functionId = nonNullProp(msg, 'functionId');
const metadata = channel.functionLoader.getRpcMetadata(functionId);
let { metadata, callback } =
channel.functions[functionId] || channel.legacyFunctionLoader.getFunction(functionId);
const msgCategory = `${nonNullProp(metadata, 'name')}.Invocation`;
const coreCtx = new CoreInvocationContext(channel, msg, metadata, msgCategory);

Expand All @@ -43,7 +44,6 @@ export class InvocationHandler extends EventHandler<'invocationRequest', 'invoca

const hookData: HookData = {};
let { context, inputs } = await invocModel.getArguments();
let callback = channel.functionLoader.getCallback(functionId);

const preInvocContext: PreInvocationContext = {
hookData,
Expand Down
6 changes: 5 additions & 1 deletion src/setupCoreModule.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { HookCallback, ProgrammingModel } from '@azure/functions-core';
import { FunctionCallback, FunctionMetadata, HookCallback, ProgrammingModel } from '@azure/functions-core';
import { AzureFunctionsRpcMessages as rpc } from '../azure-functions-language-worker-protobuf/src/rpc';
import { version } from './constants';
import { registerFunction } from './coreApi/registerFunction';
import { Disposable } from './Disposable';
import { WorkerChannel } from './WorkerChannel';
import Module = require('module');
Expand Down Expand Up @@ -36,6 +37,9 @@ export function setupCoreModule(channel: WorkerChannel): void {
getProgrammingModel: () => {
return channel.programmingModel;
},
registerFunction: (metadata: FunctionMetadata, callback: FunctionCallback) => {
return registerFunction(channel, metadata, callback);
},
Disposable,
// NOTE: We have to pass along any and all enums used in the RPC api to the core api
RpcLog: rpc.RpcLog,
Expand Down
5 changes: 4 additions & 1 deletion src/setupEventStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { AzureFunctionsRpcMessages as rpc } from '../azure-functions-language-wo
import { EventHandler, SupportedRequest } from './eventHandlers/EventHandler';
import { FunctionEnvironmentReloadHandler } from './eventHandlers/FunctionEnvironmentReloadHandler';
import { FunctionLoadHandler } from './eventHandlers/FunctionLoadHandler';
import { FunctionsMetadataHandler } from './eventHandlers/FunctionsMetadataHandler';
import { InvocationHandler } from './eventHandlers/InvocationHandler';
import { WorkerInitHandler } from './eventHandlers/WorkerInitHandler';
import { ensureErrorType } from './utils/ensureErrorType';
Expand Down Expand Up @@ -71,10 +72,12 @@ async function handleMessage(workerId: string, channel: WorkerChannel, inMsg: rp
outMsg.workerStatusResponse = {};
channel.eventStream.write(outMsg);
return;
case 'functionsMetadataRequest':
eventHandler = new FunctionsMetadataHandler();
break;
case 'closeSharedMemoryResourcesRequest':
case 'fileChangeEventRequest':
case 'functionLoadRequestCollection':
case 'functionsMetadataRequest':
case 'invocationCancel':
case 'startStream':
case 'workerHeartbeat':
Expand Down
Loading