Skip to content

Commit a7374ef

Browse files
committed
Durable functions - 'Function chaining' scenario support
1 parent 45b6099 commit a7374ef

File tree

10 files changed

+682
-8
lines changed

10 files changed

+682
-8
lines changed

src/Durable/Actions.cs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
//
2+
// Copyright (c) Microsoft. All rights reserved.
3+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
4+
//
5+
6+
using System;
7+
using System.Collections.Generic;
8+
9+
namespace Microsoft.Azure.Functions.PowerShellWorker.Action
10+
{
11+
/// <summary>
12+
/// Action types
13+
/// </summary>
14+
public enum ActionType
15+
{
16+
/// <summary>
17+
/// Call an activity function.
18+
/// </summary>
19+
CallActivity = 0,
20+
21+
/// <summary>
22+
/// Call an activity function with retry.
23+
/// </summary>
24+
CallActivityWithRetry = 1,
25+
26+
/// <summary>
27+
/// Call a sub-orchestration function.
28+
/// </summary>
29+
CallSubOrchestrator = 2,
30+
31+
/// <summary>
32+
/// Call a sub-orchestration function with retry.
33+
/// </summary>
34+
CallSubOrchestratorWithRetry = 3,
35+
36+
/// <summary>
37+
/// Run the orchestration function as a loop.
38+
/// </summary>
39+
ContinueAsNew = 4,
40+
41+
/// <summary>
42+
/// Create a timer.
43+
/// </summary>
44+
CreateTimer = 5,
45+
46+
/// <summary>
47+
/// Wait for an external event.
48+
/// </summary>
49+
WaitForExternalEvent = 6,
50+
}
51+
52+
/// <summary>
53+
/// Base class that represents an orchestration action.
54+
/// </summary>
55+
public abstract class AzAction
56+
{
57+
/// <summary>
58+
/// Base constructor for creating an action.
59+
/// </summary>
60+
protected AzAction(ActionType actionType)
61+
{
62+
ActionType = actionType;
63+
}
64+
65+
/// <summary>
66+
/// Action type.
67+
/// </summary>
68+
public readonly ActionType ActionType;
69+
}
70+
71+
/// <summary>
72+
/// An orchestration action that represents calling an activity function.
73+
/// </summary>
74+
public class CallActivityAction : AzAction
75+
{
76+
/// <summary>
77+
/// The activity function name.
78+
/// </summary>
79+
public readonly string FunctionName;
80+
81+
/// <summary>
82+
/// The input to the activity function.
83+
/// </summary>
84+
public readonly object Input;
85+
86+
/// <summary>
87+
/// Constructor
88+
/// </summary>
89+
internal CallActivityAction(string functionName, object input) : base(ActionType.CallActivity)
90+
{
91+
FunctionName = functionName;
92+
Input = input;
93+
}
94+
}
95+
96+
/// <summary>
97+
/// An orchestration action that represents creating a timer.
98+
/// </summary>
99+
public class CreateTimerAction : AzAction
100+
{
101+
/// <summary>
102+
/// Time to fire the timer.
103+
/// </summary>
104+
public readonly DateTime FireAt;
105+
106+
/// <summary>
107+
/// Indicate if the timer is cancelled.
108+
/// </summary>
109+
public readonly bool IsCanceled;
110+
111+
/// <summary>
112+
/// Constructor.
113+
/// </summary>
114+
internal CreateTimerAction(DateTime fireAt, bool isCanceled) : base(ActionType.CreateTimer)
115+
{
116+
FireAt = fireAt;
117+
IsCanceled = isCanceled;
118+
}
119+
}
120+
121+
/// <summary>
122+
/// An orchestration action that represents waiting for an external event.
123+
/// </summary>
124+
public class WaitForExternalEventAction : AzAction
125+
{
126+
/// <summary>
127+
/// Name of the external event.
128+
/// </summary>
129+
public readonly string ExternalEventName;
130+
131+
/// <summary>
132+
/// Constructor.
133+
/// </summary>
134+
internal WaitForExternalEventAction(string externalEventName) : base(ActionType.WaitForExternalEvent)
135+
{
136+
ExternalEventName = externalEventName;
137+
}
138+
}
139+
}

src/Durable/HistoryEvent.cs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
//
2+
// Copyright (c) Microsoft. All rights reserved.
3+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
4+
//
5+
6+
using System;
7+
using System.Runtime.Serialization;
8+
9+
namespace Microsoft.Azure.Functions.PowerShellWorker.History
10+
{
11+
[DataContract]
12+
internal class HistoryEvent
13+
{
14+
#region Common_Fields
15+
16+
[DataMember]
17+
internal int EventId { get; set; }
18+
[DataMember]
19+
internal bool IsPlayed { get; set; }
20+
[DataMember]
21+
internal DateTime Timestamp { get; set; }
22+
[DataMember]
23+
internal EventType EventType { get; set; }
24+
25+
#endregion
26+
27+
#region Timer_Event_Fields
28+
29+
[DataMember]
30+
internal DateTime FireAt { get; set; }
31+
[DataMember]
32+
internal int TimerId { get; set; }
33+
34+
#endregion
35+
36+
#region Overloaded_Fields
37+
38+
[DataMember]
39+
internal int TaskScheduledId { get; set; }
40+
[DataMember]
41+
internal string Input { get; set; }
42+
[DataMember]
43+
internal string Name { get; set; }
44+
[DataMember]
45+
internal string Result { get; set; }
46+
47+
#endregion
48+
49+
// Internal used only
50+
internal bool IsProcessed { get; set; }
51+
}
52+
53+
internal enum EventType
54+
{
55+
/// <summary>
56+
/// Orchestration execution has started event
57+
/// </summary>
58+
ExecutionStarted,
59+
60+
/// <summary>
61+
/// Orchestration execution has completed event
62+
/// </summary>
63+
ExecutionCompleted,
64+
65+
/// <summary>
66+
/// Orchestration execution has failed event
67+
/// </summary>
68+
ExecutionFailed,
69+
70+
/// <summary>
71+
/// Orchestration was terminated event
72+
/// </summary>
73+
ExecutionTerminated,
74+
75+
/// <summary>
76+
/// Task Activity scheduled event
77+
/// </summary>
78+
TaskScheduled,
79+
80+
/// <summary>
81+
/// Task Activity completed event
82+
/// </summary>
83+
TaskCompleted,
84+
85+
/// <summary>
86+
/// Task Activity failed event
87+
/// </summary>
88+
TaskFailed,
89+
90+
/// <summary>
91+
/// Sub Orchestration instance created event
92+
/// </summary>
93+
SubOrchestrationInstanceCreated,
94+
95+
/// <summary>
96+
/// Sub Orchestration instance completed event
97+
/// </summary>
98+
SubOrchestrationInstanceCompleted,
99+
100+
/// <summary>
101+
/// Sub Orchestration instance failed event
102+
/// </summary>
103+
SubOrchestrationInstanceFailed,
104+
105+
/// <summary>
106+
/// Timer created event
107+
/// </summary>
108+
TimerCreated,
109+
110+
/// <summary>
111+
/// Timer fired event
112+
/// </summary>
113+
TimerFired,
114+
115+
/// <summary>
116+
/// Orchestration has started event
117+
/// </summary>
118+
OrchestratorStarted,
119+
120+
/// <summary>
121+
/// Orchestration has completed event
122+
/// </summary>
123+
OrchestratorCompleted,
124+
125+
/// <summary>
126+
/// External Event raised to orchestration event
127+
/// </summary>
128+
EventRaised,
129+
130+
/// <summary>
131+
/// Orchestration Continued as new event
132+
/// </summary>
133+
ContinueAsNew,
134+
135+
/// <summary>
136+
/// Generic event for tracking event existence
137+
/// </summary>
138+
GenericEvent,
139+
140+
/// <summary>
141+
/// Orchestration state history event
142+
/// </summary>
143+
HistoryState,
144+
}
145+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
//
2+
// Copyright (c) Microsoft. All rights reserved.
3+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
4+
//
5+
6+
using System.Collections;
7+
using System.Collections.Generic;
8+
using System.Linq;
9+
using System.Management.Automation;
10+
using System.Threading;
11+
12+
using Newtonsoft.Json;
13+
using Microsoft.Azure.Functions.PowerShellWorker.Action;
14+
using Microsoft.Azure.Functions.PowerShellWorker.History;
15+
16+
namespace Microsoft.Azure.Functions.PowerShellWorker.Commands
17+
{
18+
/// <summary>
19+
/// Invoke a function asynchronously.
20+
/// </summary>
21+
[Cmdlet("Invoke", "ActivityFunctionAsync")]
22+
public class InvokeActivityFunctionCommand : PSCmdlet
23+
{
24+
/// <summary>
25+
/// Gets and sets the activity function name.
26+
/// </summary>
27+
[Parameter(Mandatory = true)]
28+
public string FunctionName { get; set; }
29+
30+
/// <summary>
31+
/// Gets and sets the input for an activity function.
32+
/// </summary>
33+
/// <remarks>
34+
/// Copy the default value from durable-js, in case that it's a magic value for specifying no input value.
35+
/// </remarks>
36+
[Parameter]
37+
[ValidateNotNull]
38+
public object Input { get; set; } = "__activity__default";
39+
40+
// Used for waiting on the pipeline to be stopped.
41+
private ManualResetEvent waitForStop = new ManualResetEvent(initialState: false);
42+
private OrchestrationContext context;
43+
44+
/// <summary>
45+
/// Implement the EndProcessing method.
46+
/// </summary>
47+
protected override void EndProcessing()
48+
{
49+
var privateData = (Hashtable)this.MyInvocation.MyCommand.Module.PrivateData;
50+
context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey];
51+
52+
context.Actions.Add(new List<AzAction>() { new CallActivityAction(FunctionName, Input) });
53+
54+
HistoryEvent taskScheduled = context.History
55+
.FirstOrDefault(e => e.EventType == EventType.TaskScheduled &&
56+
e.Name == FunctionName &&
57+
!e.IsProcessed);
58+
59+
HistoryEvent taskCompleted = taskScheduled == null ? null : context.History
60+
.FirstOrDefault(e => e.EventType == EventType.TaskCompleted &&
61+
e.TaskScheduledId == taskScheduled.EventId);
62+
63+
if (taskCompleted != null)
64+
{
65+
taskScheduled.IsProcessed = true;
66+
taskCompleted.IsProcessed = true;
67+
WriteObject(taskCompleted.Result);
68+
}
69+
else
70+
{
71+
context.ActionEvent.Set();
72+
waitForStop.WaitOne();
73+
}
74+
}
75+
76+
/// <summary>
77+
/// Implement the StopProcessing method.
78+
/// </summary>
79+
protected override void StopProcessing()
80+
{
81+
waitForStop.Set();
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)