Skip to content

Commit 30250f4

Browse files
authored
[Durable] Implement retry policy support (#590)
1 parent e387058 commit 30250f4

33 files changed

+1051
-37
lines changed

.gitignore

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@ PowerShell.sln.DotSettings.user
6363
StyleCop.Cache
6464

6565
examples/PSCoreApp/Modules
66-
src/Modules
67-
!src/Modules/Microsoft.Azure.Functions.PowerShellWorker
66+
src/Modules/Microsoft.PowerShell.*
67+
src/Modules/PackageManagement
68+
src/Modules/PowerShellGet
69+
src/Modules/ThreadJob
6870

6971
# protobuf
7072
protobuf/*
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"bindings": [
3+
{
4+
"name": "name",
5+
"type": "activityTrigger",
6+
"direction": "in"
7+
}
8+
]
9+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
param($name)
2+
3+
# Intentional intermittent error
4+
$random = Get-Random -Minimum 0.0 -Maximum 1.0
5+
if ($random -gt 0.2) {
6+
throw 'Nope, no luck this time...'
7+
}
8+
9+
"Hello $name"
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"bindings": [
3+
{
4+
"name": "Context",
5+
"type": "orchestrationTrigger",
6+
"direction": "in"
7+
}
8+
]
9+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using namespace System.Net
2+
3+
param($Context)
4+
5+
$ErrorActionPreference = 'Stop'
6+
7+
$output = @()
8+
9+
$retryOptions = New-DurableRetryOptions `
10+
-FirstRetryInterval (New-Timespan -Seconds 1) `
11+
-MaxNumberOfAttempts 7
12+
13+
$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'Tokyo' -RetryOptions $retryOptions
14+
$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'Seattle' -RetryOptions $retryOptions
15+
$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'London' -RetryOptions $retryOptions
16+
17+
$output
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"bindings": [
3+
{
4+
"authLevel": "function",
5+
"type": "httpTrigger",
6+
"direction": "in",
7+
"name": "Request",
8+
"methods": [
9+
"get",
10+
"post"
11+
]
12+
},
13+
{
14+
"type": "http",
15+
"direction": "out",
16+
"name": "Response"
17+
},
18+
{
19+
"name": "starter",
20+
"type": "durableClient",
21+
"direction": "in"
22+
}
23+
]
24+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using namespace System.Net
2+
3+
param($Request, $TriggerMetadata)
4+
5+
Write-Host 'FunctionChainingWithRetriesStart started'
6+
7+
$InstanceId = Start-NewOrchestration -FunctionName 'FunctionChainingWithRetriesOrchestrator' -InputObject 'Hello'
8+
Write-Host "Started orchestration with ID = '$InstanceId'"
9+
10+
$Response = New-OrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
11+
Push-OutputBinding -Name Response -Value $Response
12+
13+
Write-Host 'FunctionChainingWithRetriesStart completed'
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
//
2+
// Copyright (c) Microsoft. All rights reserved.
3+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
4+
//
5+
6+
using System;
7+
using System.Collections.Generic;
8+
9+
namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions
10+
{
11+
/// <summary>
12+
/// An orchestration action that represents calling an activity function with retry.
13+
/// </summary>
14+
internal class CallActivityWithRetryAction : OrchestrationAction
15+
{
16+
/// <summary>
17+
/// The activity function name.
18+
/// </summary>
19+
public readonly string FunctionName;
20+
21+
/// <summary>
22+
/// The input to the activity function.
23+
/// </summary>
24+
public readonly object Input;
25+
26+
/// <summary>
27+
/// Retry options.
28+
/// </summary>
29+
public readonly Dictionary<string, object> RetryOptions;
30+
31+
public CallActivityWithRetryAction(string functionName, object input, RetryOptions retryOptions)
32+
: base(ActionType.CallActivityWithRetry)
33+
{
34+
FunctionName = functionName;
35+
Input = input;
36+
RetryOptions = ToDictionary(retryOptions);
37+
}
38+
39+
private static Dictionary<string, object> ToDictionary(RetryOptions retryOptions)
40+
{
41+
var result = new Dictionary<string, object>()
42+
{
43+
{ "firstRetryIntervalInMilliseconds", ToIntMilliseconds(retryOptions.FirstRetryInterval) },
44+
{ "maxNumberOfAttempts", retryOptions.MaxNumberOfAttempts }
45+
};
46+
47+
AddOptionalValue(result, "backoffCoefficient", retryOptions.BackoffCoefficient, x => x);
48+
AddOptionalValue(result, "maxRetryIntervalInMilliseconds", retryOptions.MaxRetryInterval, ToIntMilliseconds);
49+
AddOptionalValue(result, "retryTimeoutInMilliseconds", retryOptions.RetryTimeout, ToIntMilliseconds);
50+
51+
return result;
52+
}
53+
54+
private static void AddOptionalValue<T>(
55+
Dictionary<string, object> dictionary,
56+
string name,
57+
T? nullable,
58+
Func<T, object> transformValue) where T : struct
59+
{
60+
if (nullable.HasValue)
61+
{
62+
dictionary.Add(name, transformValue(nullable.Value));
63+
}
64+
}
65+
66+
private static object ToIntMilliseconds(TimeSpan timespan)
67+
{
68+
return (int)timespan.TotalMilliseconds;
69+
}
70+
}
71+
}

src/Durable/Commands/InvokeActivityFunctionCommand.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ public class InvokeActivityFunctionCommand : PSCmdlet
3333
[Parameter]
3434
public SwitchParameter NoWait { get; set; }
3535

36+
[Parameter]
37+
[ValidateNotNull]
38+
public RetryOptions RetryOptions { get; set; }
39+
3640
private readonly DurableTaskHandler _durableTaskHandler = new DurableTaskHandler();
3741

3842
protected override void EndProcessing()
@@ -41,11 +45,14 @@ protected override void EndProcessing()
4145
var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey];
4246
var loadedFunctions = FunctionLoader.GetLoadedFunctions();
4347

44-
var task = new ActivityInvocationTask(FunctionName, Input);
48+
var task = new ActivityInvocationTask(FunctionName, Input, RetryOptions);
4549
ActivityInvocationTask.ValidateTask(task, loadedFunctions);
4650

4751
_durableTaskHandler.StopAndInitiateDurableTaskOrReplay(
48-
task, context, NoWait.IsPresent, WriteObject, failureReason => DurableActivityErrorHandler.Handle(this, failureReason));
52+
task, context, NoWait.IsPresent,
53+
output: WriteObject,
54+
onFailure: failureReason => DurableActivityErrorHandler.Handle(this, failureReason),
55+
retryOptions: RetryOptions);
4956
}
5057

5158
protected override void StopProcessing()

src/Durable/DurableTaskHandler.cs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ public void StopAndInitiateDurableTaskOrReplay(
2020
OrchestrationContext context,
2121
bool noWait,
2222
Action<object> output,
23-
Action<string> onFailure)
23+
Action<string> onFailure,
24+
RetryOptions retryOptions = null)
2425
{
2526
context.OrchestrationActionCollector.Add(task.CreateOrchestrationAction());
2627

@@ -30,6 +31,8 @@ public void StopAndInitiateDurableTaskOrReplay(
3031
}
3132
else
3233
{
34+
context.OrchestrationActionCollector.NextBatch();
35+
3336
var scheduledHistoryEvent = task.GetScheduledHistoryEvent(context);
3437
var completedHistoryEvent = task.GetCompletedHistoryEvent(context, scheduledHistoryEvent);
3538

@@ -56,7 +59,32 @@ public void StopAndInitiateDurableTaskOrReplay(
5659
break;
5760

5861
case HistoryEventType.TaskFailed:
59-
onFailure(completedHistoryEvent.Reason);
62+
if (retryOptions == null)
63+
{
64+
onFailure(completedHistoryEvent.Reason);
65+
}
66+
else
67+
{
68+
// Reset IsProcessed, let RetryProcessor handle these events instead.
69+
scheduledHistoryEvent.IsProcessed = false;
70+
completedHistoryEvent.IsProcessed = false;
71+
72+
var shouldContinueProcessing =
73+
RetryProcessor.Process(
74+
context.History,
75+
scheduledHistoryEvent,
76+
retryOptions.MaxNumberOfAttempts,
77+
onSuccess:
78+
result => {
79+
output(TypeExtensions.ConvertFromJson(result));
80+
},
81+
onFailure);
82+
83+
if (shouldContinueProcessing)
84+
{
85+
InitiateAndWaitForStop(context);
86+
}
87+
}
6088
break;
6189
}
6290
}
@@ -73,6 +101,8 @@ public void WaitAll(
73101
OrchestrationContext context,
74102
Action<object> output)
75103
{
104+
context.OrchestrationActionCollector.NextBatch();
105+
76106
var completedEvents = new List<HistoryEvent>();
77107
foreach (var task in tasksToWaitFor)
78108
{
@@ -118,6 +148,8 @@ public void WaitAny(
118148
OrchestrationContext context,
119149
Action<object> output)
120150
{
151+
context.OrchestrationActionCollector.NextBatch();
152+
121153
var completedTasks = new List<DurableTask>();
122154
DurableTask firstCompletedTask = null;
123155
int firstCompletedHistoryEventIndex = -1;

0 commit comments

Comments
 (0)