From 39c682c6b17628e1c002a36b634fe3886598ac59 Mon Sep 17 00:00:00 2001 From: Anatoli Beliaev Date: Mon, 25 Jan 2021 20:08:13 -0800 Subject: [PATCH] Implement retry policy support --- .gitignore | 6 +- .../DurableApp/FlakyActivity/function.json | 9 + .../durable/DurableApp/FlakyActivity/run.ps1 | 9 + .../function.json | 9 + .../run.ps1 | 17 ++ .../function.json | 24 ++ .../FunctionChainingWithRetriesStart/run.ps1 | 13 + .../Actions/CallActivityWithRetryAction.cs | 71 ++++++ .../Commands/InvokeActivityFunctionCommand.cs | 11 +- src/Durable/DurableTaskHandler.cs | 36 ++- src/Durable/HistoryEvent.cs | 7 + src/Durable/OrchestrationActionCollector.cs | 20 +- src/Durable/OrchestrationFailureException.cs | 6 +- src/Durable/OrchestrationInvoker.cs | 4 +- src/Durable/RetryOptions.cs | 38 +++ src/Durable/RetryProcessor.cs | 130 ++++++++++ src/Durable/Tasks/ActivityInvocationTask.cs | 14 +- ...soft.Azure.Functions.PowerShellWorker.psd1 | 4 +- ...soft.Azure.Functions.PowerShellWorker.psm1 | 28 +- src/Utility/TypeExtensions.cs | 2 +- .../DurableEndToEndTests.cs | 3 +- .../DurableActivityFlaky/function.json | 9 + .../DurableActivityFlaky/run.ps1 | 9 + .../DurableOrchestrator/run.ps1 | 5 + .../Durable/ActivityInvocationTaskTests.cs | 2 +- .../CallActivityWithRetryActionTests.cs | 65 +++++ test/Unit/Durable/DurableTaskHandlerTests.cs | 113 +++++++++ test/Unit/Durable/DurableTestUtilities.cs | 2 +- test/Unit/Durable/DurableTimerTaskTests.cs | 2 +- .../OrchestrationActionCollectorTests.cs | 149 +++++++++++ .../OrchestrationFailureExceptionTests.cs | 19 +- .../Unit/Durable/OrchestrationInvokerTests.cs | 13 +- test/Unit/Durable/RetryProcessorTests.cs | 239 ++++++++++++++++++ 33 files changed, 1051 insertions(+), 37 deletions(-) create mode 100644 examples/durable/DurableApp/FlakyActivity/function.json create mode 100644 examples/durable/DurableApp/FlakyActivity/run.ps1 create mode 100644 examples/durable/DurableApp/FunctionChainingWithRetriesOrchestrator/function.json create mode 100644 examples/durable/DurableApp/FunctionChainingWithRetriesOrchestrator/run.ps1 create mode 100644 examples/durable/DurableApp/FunctionChainingWithRetriesStart/function.json create mode 100644 examples/durable/DurableApp/FunctionChainingWithRetriesStart/run.ps1 create mode 100644 src/Durable/Actions/CallActivityWithRetryAction.cs create mode 100644 src/Durable/RetryOptions.cs create mode 100644 src/Durable/RetryProcessor.cs create mode 100644 test/E2E/TestFunctionApp/DurableActivityFlaky/function.json create mode 100644 test/E2E/TestFunctionApp/DurableActivityFlaky/run.ps1 create mode 100644 test/Unit/Durable/CallActivityWithRetryActionTests.cs create mode 100644 test/Unit/Durable/OrchestrationActionCollectorTests.cs create mode 100644 test/Unit/Durable/RetryProcessorTests.cs diff --git a/.gitignore b/.gitignore index c0e90cb4..b807c462 100644 --- a/.gitignore +++ b/.gitignore @@ -63,8 +63,10 @@ PowerShell.sln.DotSettings.user StyleCop.Cache examples/PSCoreApp/Modules -src/Modules -!src/Modules/Microsoft.Azure.Functions.PowerShellWorker +src/Modules/Microsoft.PowerShell.* +src/Modules/PackageManagement +src/Modules/PowerShellGet +src/Modules/ThreadJob # protobuf protobuf/* diff --git a/examples/durable/DurableApp/FlakyActivity/function.json b/examples/durable/DurableApp/FlakyActivity/function.json new file mode 100644 index 00000000..806ce0f8 --- /dev/null +++ b/examples/durable/DurableApp/FlakyActivity/function.json @@ -0,0 +1,9 @@ +{ + "bindings": [ + { + "name": "name", + "type": "activityTrigger", + "direction": "in" + } + ] +} diff --git a/examples/durable/DurableApp/FlakyActivity/run.ps1 b/examples/durable/DurableApp/FlakyActivity/run.ps1 new file mode 100644 index 00000000..d5f86deb --- /dev/null +++ b/examples/durable/DurableApp/FlakyActivity/run.ps1 @@ -0,0 +1,9 @@ +param($name) + +# Intentional intermittent error +$random = Get-Random -Minimum 0.0 -Maximum 1.0 +if ($random -gt 0.2) { + throw 'Nope, no luck this time...' +} + +"Hello $name" diff --git a/examples/durable/DurableApp/FunctionChainingWithRetriesOrchestrator/function.json b/examples/durable/DurableApp/FunctionChainingWithRetriesOrchestrator/function.json new file mode 100644 index 00000000..0c950e30 --- /dev/null +++ b/examples/durable/DurableApp/FunctionChainingWithRetriesOrchestrator/function.json @@ -0,0 +1,9 @@ +{ + "bindings": [ + { + "name": "Context", + "type": "orchestrationTrigger", + "direction": "in" + } + ] +} diff --git a/examples/durable/DurableApp/FunctionChainingWithRetriesOrchestrator/run.ps1 b/examples/durable/DurableApp/FunctionChainingWithRetriesOrchestrator/run.ps1 new file mode 100644 index 00000000..ec1eef1c --- /dev/null +++ b/examples/durable/DurableApp/FunctionChainingWithRetriesOrchestrator/run.ps1 @@ -0,0 +1,17 @@ +using namespace System.Net + +param($Context) + +$ErrorActionPreference = 'Stop' + +$output = @() + +$retryOptions = New-DurableRetryOptions ` + -FirstRetryInterval (New-Timespan -Seconds 1) ` + -MaxNumberOfAttempts 7 + +$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'Tokyo' -RetryOptions $retryOptions +$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'Seattle' -RetryOptions $retryOptions +$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'London' -RetryOptions $retryOptions + +$output diff --git a/examples/durable/DurableApp/FunctionChainingWithRetriesStart/function.json b/examples/durable/DurableApp/FunctionChainingWithRetriesStart/function.json new file mode 100644 index 00000000..54e2a634 --- /dev/null +++ b/examples/durable/DurableApp/FunctionChainingWithRetriesStart/function.json @@ -0,0 +1,24 @@ +{ + "bindings": [ + { + "authLevel": "function", + "type": "httpTrigger", + "direction": "in", + "name": "Request", + "methods": [ + "get", + "post" + ] + }, + { + "type": "http", + "direction": "out", + "name": "Response" + }, + { + "name": "starter", + "type": "durableClient", + "direction": "in" + } + ] +} diff --git a/examples/durable/DurableApp/FunctionChainingWithRetriesStart/run.ps1 b/examples/durable/DurableApp/FunctionChainingWithRetriesStart/run.ps1 new file mode 100644 index 00000000..5e5154fa --- /dev/null +++ b/examples/durable/DurableApp/FunctionChainingWithRetriesStart/run.ps1 @@ -0,0 +1,13 @@ +using namespace System.Net + +param($Request, $TriggerMetadata) + +Write-Host 'FunctionChainingWithRetriesStart started' + +$InstanceId = Start-NewOrchestration -FunctionName 'FunctionChainingWithRetriesOrchestrator' -InputObject 'Hello' +Write-Host "Started orchestration with ID = '$InstanceId'" + +$Response = New-OrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId +Push-OutputBinding -Name Response -Value $Response + +Write-Host 'FunctionChainingWithRetriesStart completed' diff --git a/src/Durable/Actions/CallActivityWithRetryAction.cs b/src/Durable/Actions/CallActivityWithRetryAction.cs new file mode 100644 index 00000000..a65168f4 --- /dev/null +++ b/src/Durable/Actions/CallActivityWithRetryAction.cs @@ -0,0 +1,71 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +// + +using System; +using System.Collections.Generic; + +namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions +{ + /// + /// An orchestration action that represents calling an activity function with retry. + /// + internal class CallActivityWithRetryAction : OrchestrationAction + { + /// + /// The activity function name. + /// + public readonly string FunctionName; + + /// + /// The input to the activity function. + /// + public readonly object Input; + + /// + /// Retry options. + /// + public readonly Dictionary RetryOptions; + + public CallActivityWithRetryAction(string functionName, object input, RetryOptions retryOptions) + : base(ActionType.CallActivityWithRetry) + { + FunctionName = functionName; + Input = input; + RetryOptions = ToDictionary(retryOptions); + } + + private static Dictionary ToDictionary(RetryOptions retryOptions) + { + var result = new Dictionary() + { + { "firstRetryIntervalInMilliseconds", ToIntMilliseconds(retryOptions.FirstRetryInterval) }, + { "maxNumberOfAttempts", retryOptions.MaxNumberOfAttempts } + }; + + AddOptionalValue(result, "backoffCoefficient", retryOptions.BackoffCoefficient, x => x); + AddOptionalValue(result, "maxRetryIntervalInMilliseconds", retryOptions.MaxRetryInterval, ToIntMilliseconds); + AddOptionalValue(result, "retryTimeoutInMilliseconds", retryOptions.RetryTimeout, ToIntMilliseconds); + + return result; + } + + private static void AddOptionalValue( + Dictionary dictionary, + string name, + T? nullable, + Func transformValue) where T : struct + { + if (nullable.HasValue) + { + dictionary.Add(name, transformValue(nullable.Value)); + } + } + + private static object ToIntMilliseconds(TimeSpan timespan) + { + return (int)timespan.TotalMilliseconds; + } + } +} diff --git a/src/Durable/Commands/InvokeActivityFunctionCommand.cs b/src/Durable/Commands/InvokeActivityFunctionCommand.cs index ac0384d2..7dda9e35 100644 --- a/src/Durable/Commands/InvokeActivityFunctionCommand.cs +++ b/src/Durable/Commands/InvokeActivityFunctionCommand.cs @@ -33,6 +33,10 @@ public class InvokeActivityFunctionCommand : PSCmdlet [Parameter] public SwitchParameter NoWait { get; set; } + [Parameter] + [ValidateNotNull] + public RetryOptions RetryOptions { get; set; } + private readonly DurableTaskHandler _durableTaskHandler = new DurableTaskHandler(); protected override void EndProcessing() @@ -41,11 +45,14 @@ protected override void EndProcessing() var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey]; var loadedFunctions = FunctionLoader.GetLoadedFunctions(); - var task = new ActivityInvocationTask(FunctionName, Input); + var task = new ActivityInvocationTask(FunctionName, Input, RetryOptions); ActivityInvocationTask.ValidateTask(task, loadedFunctions); _durableTaskHandler.StopAndInitiateDurableTaskOrReplay( - task, context, NoWait.IsPresent, WriteObject, failureReason => DurableActivityErrorHandler.Handle(this, failureReason)); + task, context, NoWait.IsPresent, + output: WriteObject, + onFailure: failureReason => DurableActivityErrorHandler.Handle(this, failureReason), + retryOptions: RetryOptions); } protected override void StopProcessing() diff --git a/src/Durable/DurableTaskHandler.cs b/src/Durable/DurableTaskHandler.cs index c47e043b..3981b686 100644 --- a/src/Durable/DurableTaskHandler.cs +++ b/src/Durable/DurableTaskHandler.cs @@ -20,7 +20,8 @@ public void StopAndInitiateDurableTaskOrReplay( OrchestrationContext context, bool noWait, Action output, - Action onFailure) + Action onFailure, + RetryOptions retryOptions = null) { context.OrchestrationActionCollector.Add(task.CreateOrchestrationAction()); @@ -30,6 +31,8 @@ public void StopAndInitiateDurableTaskOrReplay( } else { + context.OrchestrationActionCollector.NextBatch(); + var scheduledHistoryEvent = task.GetScheduledHistoryEvent(context); var completedHistoryEvent = task.GetCompletedHistoryEvent(context, scheduledHistoryEvent); @@ -56,7 +59,32 @@ public void StopAndInitiateDurableTaskOrReplay( break; case HistoryEventType.TaskFailed: - onFailure(completedHistoryEvent.Reason); + if (retryOptions == null) + { + onFailure(completedHistoryEvent.Reason); + } + else + { + // Reset IsProcessed, let RetryProcessor handle these events instead. + scheduledHistoryEvent.IsProcessed = false; + completedHistoryEvent.IsProcessed = false; + + var shouldContinueProcessing = + RetryProcessor.Process( + context.History, + scheduledHistoryEvent, + retryOptions.MaxNumberOfAttempts, + onSuccess: + result => { + output(TypeExtensions.ConvertFromJson(result)); + }, + onFailure); + + if (shouldContinueProcessing) + { + InitiateAndWaitForStop(context); + } + } break; } } @@ -73,6 +101,8 @@ public void WaitAll( OrchestrationContext context, Action output) { + context.OrchestrationActionCollector.NextBatch(); + var completedEvents = new List(); foreach (var task in tasksToWaitFor) { @@ -118,6 +148,8 @@ public void WaitAny( OrchestrationContext context, Action output) { + context.OrchestrationActionCollector.NextBatch(); + var completedTasks = new List(); DurableTask firstCompletedTask = null; int firstCompletedHistoryEventIndex = -1; diff --git a/src/Durable/HistoryEvent.cs b/src/Durable/HistoryEvent.cs index 86899ee2..ed16c4f9 100644 --- a/src/Durable/HistoryEvent.cs +++ b/src/Durable/HistoryEvent.cs @@ -58,5 +58,12 @@ internal class HistoryEvent // Internal used only public bool IsProcessed { get; set; } + + public override string ToString() + { + var relatedEventId = EventType == HistoryEventType.TimerFired ? TimerId : TaskScheduledId; + var processedMarker = IsProcessed ? "X" : " "; + return $"[{EventId}] {EventType} '{Name}' ({relatedEventId}) [{processedMarker}]"; + } } } diff --git a/src/Durable/OrchestrationActionCollector.cs b/src/Durable/OrchestrationActionCollector.cs index b5337c39..b62fbc4b 100644 --- a/src/Durable/OrchestrationActionCollector.cs +++ b/src/Durable/OrchestrationActionCollector.cs @@ -7,22 +7,36 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable { using System; using System.Collections.Generic; + using System.Linq; using System.Threading; using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions; internal class OrchestrationActionCollector { - private readonly List _actions = new List(); + private readonly List> _actions = new(); private readonly AutoResetEvent _stopEvent = new AutoResetEvent(initialState: false); + private bool _nextBatch = true; + public void Add(OrchestrationAction action) { - _actions.Add(action); + if (_nextBatch) + { + _actions.Add(new List()); + _nextBatch = false; + } + + _actions.Last().Add(action); + } + + public void NextBatch() + { + _nextBatch = true; } - public Tuple> WaitForActions(WaitHandle completionWaitHandle) + public Tuple>> WaitForActions(WaitHandle completionWaitHandle) { var waitHandles = new[] { _stopEvent, completionWaitHandle }; var signaledHandleIndex = WaitHandle.WaitAny(waitHandles); diff --git a/src/Durable/OrchestrationFailureException.cs b/src/Durable/OrchestrationFailureException.cs index 4b33a8c5..66203531 100644 --- a/src/Durable/OrchestrationFailureException.cs +++ b/src/Durable/OrchestrationFailureException.cs @@ -24,17 +24,17 @@ public OrchestrationFailureException() { } - public OrchestrationFailureException(List actions, Exception innerException) + public OrchestrationFailureException(List> actions, Exception innerException) : base(FormatOrchestrationFailureMessage(actions, innerException), innerException) { } - private static string FormatOrchestrationFailureMessage(List actions, Exception exception) + private static string FormatOrchestrationFailureMessage(List> actions, Exception exception) { // For more details on why this message looks like this, see: // - https://github.com/Azure/azure-functions-durable-js/pull/145 // - https://github.com/Azure/azure-functions-durable-extension/pull/1171 - var orchestrationMessage = new OrchestrationMessage(isDone: false, new List> { actions }, output: null, exception.Message); + var orchestrationMessage = new OrchestrationMessage(isDone: false, actions, output: null, exception.Message); var message = $"{exception.Message}{OutOfProcDataLabel}{JsonConvert.SerializeObject(orchestrationMessage)}"; return message; } diff --git a/src/Durable/OrchestrationInvoker.cs b/src/Durable/OrchestrationInvoker.cs index 79113203..1d3721bb 100644 --- a/src/Durable/OrchestrationInvoker.cs +++ b/src/Durable/OrchestrationInvoker.cs @@ -67,10 +67,10 @@ public Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowe private static Hashtable CreateOrchestrationResult( bool isDone, - List actions, + List> actions, object output) { - var orchestrationMessage = new OrchestrationMessage(isDone, new List> { actions }, output); + var orchestrationMessage = new OrchestrationMessage(isDone, actions, output); return new Hashtable { { AzFunctionInfo.DollarReturn, orchestrationMessage } }; } } diff --git a/src/Durable/RetryOptions.cs b/src/Durable/RetryOptions.cs new file mode 100644 index 00000000..d36d1b06 --- /dev/null +++ b/src/Durable/RetryOptions.cs @@ -0,0 +1,38 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +// + +#pragma warning disable 1591 // Missing XML comment for publicly visible type or member 'member' + +using System; + +namespace Microsoft.Azure.Functions.PowerShellWorker.Durable +{ + public class RetryOptions + { + public TimeSpan FirstRetryInterval { get; } + + public int MaxNumberOfAttempts { get; } + + public double? BackoffCoefficient { get; } + + public TimeSpan? MaxRetryInterval { get; } + + public TimeSpan? RetryTimeout { get; } + + public RetryOptions( + TimeSpan firstRetryInterval, + int maxNumberOfAttempts, + double? backoffCoefficient, + TimeSpan? maxRetryInterval, + TimeSpan? retryTimeout) + { + this.FirstRetryInterval = firstRetryInterval; + this.MaxNumberOfAttempts = maxNumberOfAttempts; + this.BackoffCoefficient = backoffCoefficient; + this.MaxRetryInterval = maxRetryInterval; + this.RetryTimeout = retryTimeout; + } + } +} diff --git a/src/Durable/RetryProcessor.cs b/src/Durable/RetryProcessor.cs new file mode 100644 index 00000000..783ab870 --- /dev/null +++ b/src/Durable/RetryProcessor.cs @@ -0,0 +1,130 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +// + +namespace Microsoft.Azure.Functions.PowerShellWorker.Durable +{ + using System; + + internal class RetryProcessor + { + // Returns true to indicate that processing this activity invocation should continue. + public static bool Process( + HistoryEvent[] history, + HistoryEvent firstTaskScheduledEvent, + int maxNumberOfAttempts, + Action onSuccess, + Action onFinalFailure) + { + var firstTaskScheduledEventIndex = FindEventIndex(history, firstTaskScheduledEvent); + + // Inspired by https://github.com/Azure/azure-functions-durable-js/commit/d789181234ace85df51ce8a849f15b7c8ae2a4f1 + var attempt = 1; + HistoryEvent taskScheduled = null; + HistoryEvent taskFailed = null; + HistoryEvent taskRetryTimer = null; + for (var i = firstTaskScheduledEventIndex; i < history.Length; i++) + { + var historyEvent = history[i]; + if (historyEvent.IsProcessed) + { + continue; + } + + if (taskScheduled == null) + { + if (historyEvent.EventType == HistoryEventType.TaskScheduled) + { + taskScheduled = historyEvent; + } + continue; + } + + if (historyEvent.EventType == HistoryEventType.TaskCompleted) + { + if (historyEvent.TaskScheduledId == taskScheduled.EventId) + { + taskScheduled.IsProcessed = true; + historyEvent.IsProcessed = true; + onSuccess(historyEvent.Result); + return false; + } + else + { + continue; + } + } + + if (taskFailed == null) + { + if (historyEvent.EventType == HistoryEventType.TaskFailed) + { + if (historyEvent.TaskScheduledId == taskScheduled.EventId) + { + taskFailed = historyEvent; + } + } + continue; + } + + if (taskRetryTimer == null) + { + if (historyEvent.EventType == HistoryEventType.TimerCreated) + { + taskRetryTimer = historyEvent; + } + else + { + continue; + } + } + + if (historyEvent.EventType == HistoryEventType.TimerFired) + { + if (historyEvent.TimerId == taskRetryTimer.EventId) + { + taskScheduled.IsProcessed = true; + taskFailed.IsProcessed = true; + taskRetryTimer.IsProcessed = true; + historyEvent.IsProcessed = true; + if (attempt >= maxNumberOfAttempts) + { + onFinalFailure(taskFailed.Reason); + return false; + } + else + { + attempt++; + taskScheduled = null; + taskFailed = null; + taskRetryTimer = null; + } + } + else + { + continue; + } + } + } + + return true; + } + + private static int FindEventIndex(HistoryEvent[] orchestrationHistory, HistoryEvent historyEvent) + { + var result = 0; + foreach (var e in orchestrationHistory) + { + if (ReferenceEquals(historyEvent, e)) + { + return result; + } + + result++; + } + + return -1; + } + } +} diff --git a/src/Durable/Tasks/ActivityInvocationTask.cs b/src/Durable/Tasks/ActivityInvocationTask.cs index 3722ab6b..87bf61c2 100644 --- a/src/Durable/Tasks/ActivityInvocationTask.cs +++ b/src/Durable/Tasks/ActivityInvocationTask.cs @@ -23,10 +23,18 @@ public class ActivityInvocationTask : DurableTask private object Input { get; } - internal ActivityInvocationTask(string functionName, object functionInput) + private RetryOptions RetryOptions { get; } + + internal ActivityInvocationTask(string functionName, object functionInput, RetryOptions retryOptions) { FunctionName = functionName; Input = functionInput; + RetryOptions = retryOptions; + } + + internal ActivityInvocationTask(string functionName, object functionInput) + : this(functionName, functionInput, retryOptions: null) + { } internal override HistoryEvent GetScheduledHistoryEvent(OrchestrationContext context) @@ -49,7 +57,9 @@ internal override HistoryEvent GetCompletedHistoryEvent(OrchestrationContext con internal override OrchestrationAction CreateOrchestrationAction() { - return new CallActivityAction(FunctionName, Input); + return RetryOptions == null + ? new CallActivityAction(FunctionName, Input) + : new CallActivityWithRetryAction(FunctionName, Input, RetryOptions); } internal static void ValidateTask(ActivityInvocationTask task, IEnumerable loadedFunctions) diff --git a/src/Modules/Microsoft.Azure.Functions.PowerShellWorker/Microsoft.Azure.Functions.PowerShellWorker.psd1 b/src/Modules/Microsoft.Azure.Functions.PowerShellWorker/Microsoft.Azure.Functions.PowerShellWorker.psd1 index 96c45b84..7b24724b 100644 --- a/src/Modules/Microsoft.Azure.Functions.PowerShellWorker/Microsoft.Azure.Functions.PowerShellWorker.psd1 +++ b/src/Modules/Microsoft.Azure.Functions.PowerShellWorker/Microsoft.Azure.Functions.PowerShellWorker.psd1 @@ -49,14 +49,13 @@ NestedModules = @('Microsoft.Azure.Functions.PowerShellWorker.psm1', 'Microsoft. # Functions to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no functions to export. FunctionsToExport = @( - + 'New-DurableRetryOptions', 'New-OrchestrationCheckStatusResponse', 'Send-DurableExternalEvent', 'Start-NewOrchestration') # Cmdlets to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no cmdlets to export. CmdletsToExport = @( - 'Get-OutputBinding', 'Invoke-ActivityFunction', 'Push-OutputBinding', @@ -72,7 +71,6 @@ VariablesToExport = @() # Aliases to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no aliases to export. AliasesToExport = @( - 'Wait-ActivityFunction') # Private data to pass to the module specified in RootModule/ModuleToProcess. This may also contain a PSData hashtable with additional module metadata used by PowerShell. diff --git a/src/Modules/Microsoft.Azure.Functions.PowerShellWorker/Microsoft.Azure.Functions.PowerShellWorker.psm1 b/src/Modules/Microsoft.Azure.Functions.PowerShellWorker/Microsoft.Azure.Functions.PowerShellWorker.psm1 index 913ec068..6c99198e 100644 --- a/src/Modules/Microsoft.Azure.Functions.PowerShellWorker/Microsoft.Azure.Functions.PowerShellWorker.psm1 +++ b/src/Modules/Microsoft.Azure.Functions.PowerShellWorker/Microsoft.Azure.Functions.PowerShellWorker.psm1 @@ -207,4 +207,30 @@ function GetRaiseEventUrl( } return $RequestUrl -} \ No newline at end of file +} + +function New-DurableRetryOptions( + [Parameter(Mandatory = $true)] + [timespan] + $FirstRetryInterval, + + [Parameter(Mandatory = $true)] + [int] + $MaxNumberOfAttempts, + + [double] + $BackoffCoefficient, + + [timespan] + $MaxRetryInterval, + + [timespan] + $RetryTimeout) { + + [Microsoft.Azure.Functions.PowerShellWorker.Durable.RetryOptions]::new( + $FirstRetryInterval, + $MaxNumberOfAttempts, + $PSBoundParameters.ContainsKey('BackoffCoefficient') ? $BackoffCoefficient : $null, + $MaxRetryInterval, + $RetryTimeout) +} diff --git a/src/Utility/TypeExtensions.cs b/src/Utility/TypeExtensions.cs index f997b3a6..112b3c52 100644 --- a/src/Utility/TypeExtensions.cs +++ b/src/Utility/TypeExtensions.cs @@ -142,7 +142,7 @@ public static object ConvertFromJson(string json) private static string ConvertToJson(object fromObj) { var context = new JsonObject.ConvertToJsonContext( - maxDepth: 3, + maxDepth: 4, enumsAsStrings: false, compressOutput: true); diff --git a/test/E2E/Azure.Functions.PowerShellWorker.E2E/Azure.Functions.PowerShellWorker.E2E/DurableEndToEndTests.cs b/test/E2E/Azure.Functions.PowerShellWorker.E2E/Azure.Functions.PowerShellWorker.E2E/DurableEndToEndTests.cs index 5fba8932..2fd4e132 100644 --- a/test/E2E/Azure.Functions.PowerShellWorker.E2E/Azure.Functions.PowerShellWorker.E2E/DurableEndToEndTests.cs +++ b/test/E2E/Azure.Functions.PowerShellWorker.E2E/Azure.Functions.PowerShellWorker.E2E/DurableEndToEndTests.cs @@ -42,7 +42,7 @@ public async Task DurableClientFollowsAsyncPattern() Assert.NotNull(initialResponseBodyObject.terminatePostUri); Assert.NotNull(initialResponseBodyObject.rewindPostUri); - var orchestrationCompletionTimeout = TimeSpan.FromSeconds(60); + var orchestrationCompletionTimeout = TimeSpan.FromSeconds(90); var startTime = DateTime.UtcNow; using (var httpClient = new HttpClient()) @@ -76,6 +76,7 @@ public async Task DurableClientFollowsAsyncPattern() Assert.Equal("Hello Tokyo", statusResponseBody.output[0].ToString()); Assert.Equal("Hello Seattle", statusResponseBody.output[1].ToString()); Assert.Equal("Hello London", statusResponseBody.output[2].ToString()); + Assert.Equal("Hello Toronto", statusResponseBody.output[3].ToString()); return; } diff --git a/test/E2E/TestFunctionApp/DurableActivityFlaky/function.json b/test/E2E/TestFunctionApp/DurableActivityFlaky/function.json new file mode 100644 index 00000000..f66b3aca --- /dev/null +++ b/test/E2E/TestFunctionApp/DurableActivityFlaky/function.json @@ -0,0 +1,9 @@ +{ + "bindings": [ + { + "name": "InputData", + "type": "activityTrigger", + "direction": "in" + } + ] +} diff --git a/test/E2E/TestFunctionApp/DurableActivityFlaky/run.ps1 b/test/E2E/TestFunctionApp/DurableActivityFlaky/run.ps1 new file mode 100644 index 00000000..993452aa --- /dev/null +++ b/test/E2E/TestFunctionApp/DurableActivityFlaky/run.ps1 @@ -0,0 +1,9 @@ +param([hashtable]$InputData) + +# Intentional intermittent error, eventually "self-healing" +$elapsedTime = (Get-Date).ToUniversalTime() - $InputData.StartTime +if ($elapsedTime.TotalSeconds -lt 3) { + throw 'Nope, no luck this time...' +} + +"Hello $($InputData.Name)" diff --git a/test/E2E/TestFunctionApp/DurableOrchestrator/run.ps1 b/test/E2E/TestFunctionApp/DurableOrchestrator/run.ps1 index 8071d852..11a7d588 100644 --- a/test/E2E/TestFunctionApp/DurableOrchestrator/run.ps1 +++ b/test/E2E/TestFunctionApp/DurableOrchestrator/run.ps1 @@ -16,6 +16,11 @@ $tasks += Invoke-ActivityFunction -FunctionName "DurableActivity" -Input "Seattl $tasks += Invoke-ActivityFunction -FunctionName "DurableActivity" -Input "London" -NoWait $output += Wait-DurableTask -Task $tasks +# Retries +$retryOptions = New-DurableRetryOptions -FirstRetryInterval (New-Timespan -Seconds 2) -MaxNumberOfAttempts 5 +$inputData = @{ Name = 'Toronto'; StartTime = $Context.CurrentUtcDateTime } +$output += Invoke-ActivityFunction -FunctionName "DurableActivityFlaky" -Input $inputData -RetryOptions $retryOptions + Write-Host "DurableOrchestrator: finished." return $output diff --git a/test/Unit/Durable/ActivityInvocationTaskTests.cs b/test/Unit/Durable/ActivityInvocationTaskTests.cs index 6ceb537d..c96cc755 100644 --- a/test/Unit/Durable/ActivityInvocationTaskTests.cs +++ b/test/Unit/Durable/ActivityInvocationTaskTests.cs @@ -289,7 +289,7 @@ private int GetUniqueEventId() private static void VerifyCallActivityActionAdded(OrchestrationContext orchestrationContext) { var actions = DurableTestUtilities.GetCollectedActions(orchestrationContext); - var action = (CallActivityAction) actions.Single(); + var action = (CallActivityAction)actions.Single().Single(); Assert.Equal(FunctionName, action.FunctionName); Assert.Equal(FunctionInput, action.Input); } diff --git a/test/Unit/Durable/CallActivityWithRetryActionTests.cs b/test/Unit/Durable/CallActivityWithRetryActionTests.cs new file mode 100644 index 00000000..dca80be3 --- /dev/null +++ b/test/Unit/Durable/CallActivityWithRetryActionTests.cs @@ -0,0 +1,65 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +// + +namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable +{ + using System; + using Microsoft.Azure.Functions.PowerShellWorker.Durable; + using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions; + using Xunit; + + public class CallActivityWithRetryActionTests + { + [Theory] + [InlineData(1, 1, null, null, null)] + [InlineData(5, 3, null, null, null)] + [InlineData(2, 3, 1.0, null, null)] + [InlineData(4, 3, null, 1, null)] + [InlineData(8, 3, null, null, 1)] + [InlineData(1, 3, 0.5, 6, 7)] + public void RetryOptionsContainsNonNullProperties( + int firstRetryIntervalInMilliseconds, + int maxNumberOfAttempts, + double? backoffCoefficient, + int? maxRetryIntervalInMilliseconds, + int? retryTimeoutInMilliseconds) + { + var retryOptions = new RetryOptions( + TimeSpan.FromMilliseconds(firstRetryIntervalInMilliseconds), + maxNumberOfAttempts, + backoffCoefficient, + CreateTimeSpanOrNull(maxRetryIntervalInMilliseconds), + CreateTimeSpanOrNull(retryTimeoutInMilliseconds)); + + var action = new CallActivityWithRetryAction("FunctionName", "input", retryOptions); + + Assert.Equal(firstRetryIntervalInMilliseconds, action.RetryOptions["firstRetryIntervalInMilliseconds"]); + Assert.Equal(maxNumberOfAttempts, action.RetryOptions["maxNumberOfAttempts"]); + AssertRetryOptionsEntry("backoffCoefficient", backoffCoefficient, action); + AssertRetryOptionsEntry("maxRetryIntervalInMilliseconds", maxRetryIntervalInMilliseconds, action); + AssertRetryOptionsEntry("retryTimeoutInMilliseconds", retryTimeoutInMilliseconds, action); + } + + private static void AssertRetryOptionsEntry( + string key, + T? expectedValue, + CallActivityWithRetryAction actualAction) where T : struct + { + if (expectedValue.HasValue) + { + Assert.Equal(expectedValue.Value, actualAction.RetryOptions[key]); + } + else + { + Assert.False(actualAction.RetryOptions.ContainsKey(key)); + } + } + + private static TimeSpan? CreateTimeSpanOrNull(double? milliseconds) + { + return milliseconds.HasValue ? TimeSpan.FromMilliseconds(milliseconds.Value) : (TimeSpan?)null; + } + } +} diff --git a/test/Unit/Durable/DurableTaskHandlerTests.cs b/test/Unit/Durable/DurableTaskHandlerTests.cs index c243c601..ef9ab809 100644 --- a/test/Unit/Durable/DurableTaskHandlerTests.cs +++ b/test/Unit/Durable/DurableTaskHandlerTests.cs @@ -8,7 +8,10 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable using System; using System.Collections.Generic; using System.Collections.ObjectModel; + using System.Linq; + using System.Threading; using Microsoft.Azure.Functions.PowerShellWorker.Durable; + using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions; using Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks; using Xunit; @@ -191,6 +194,116 @@ public void WaitAny_WaitsForStop_WhenAllTasksAreNotCompleted(bool completed, boo }); } + [Theory] + [InlineData(false, 1, 1)] + [InlineData(false, 5, 5)] + [InlineData(true, 1, 1)] + [InlineData(true, 5, 1)] + public void StopAndInitiateDurableTaskOrReplay_AddsActivityBatch_UnlessNoWait(bool noWait, int numberOfActions, int expectedNumberOfBatches) + { + var orchestrationContext = new OrchestrationContext { History = new HistoryEvent[0] }; + var durableTaskHandler = new DurableTaskHandler(); + + for (var i = 0; i < numberOfActions; ++i) + { + durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event + + durableTaskHandler.StopAndInitiateDurableTaskOrReplay( + new ActivityInvocationTask("Function", "Input"), + orchestrationContext, + noWait: noWait, + output: _ => {}, + onFailure: _ => {} + ); + } + + var (_, actions) = orchestrationContext.OrchestrationActionCollector.WaitForActions(new AutoResetEvent(initialState: true)); + Assert.Equal(expectedNumberOfBatches, actions.Count); + } + + [Theory] + [InlineData(false, false, 1)] + [InlineData(true, false, 2)] + [InlineData(false, true, 2)] + [InlineData(true, true, 2)] + public void WaitAll_And_WaitAny_StartNewActivityBatch(bool invokeWaitAll, bool invokeWaitAny, int expectedNumberOfBatches) + { + var orchestrationContext = new OrchestrationContext { History = new HistoryEvent[0] }; + var durableTaskHandler = new DurableTaskHandler(); + + durableTaskHandler.StopAndInitiateDurableTaskOrReplay( + new ActivityInvocationTask("Function", "Input"), + orchestrationContext, + noWait: true, + output: _ => {}, + onFailure: _ => {} + ); + + if (invokeWaitAll) + { + durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event + durableTaskHandler.WaitAll(new DurableTask[0], orchestrationContext, output: _ => {}); + } + + if (invokeWaitAny) + { + durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event + durableTaskHandler.WaitAny(new DurableTask[0], orchestrationContext, output: _ => {}); + } + + durableTaskHandler.StopAndInitiateDurableTaskOrReplay( + new ActivityInvocationTask("Function", "Input"), + orchestrationContext, + noWait: true, + output: _ => {}, + onFailure: _ => {} + ); + + var (_, actions) = orchestrationContext.OrchestrationActionCollector.WaitForActions(new AutoResetEvent(initialState: true)); + Assert.Equal(expectedNumberOfBatches, actions.Count); + } + + [Fact] + public void StopAndInitiateDurableTaskOrReplay_RetriesOnFailure() + { + const string FunctionName = "Function"; + const string FunctionInput = "Input"; + + var history = new[] + { + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, Name = FunctionName, EventId = 1 }, + new HistoryEvent { EventType = HistoryEventType.TaskFailed, TaskScheduledId = 1 }, + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 }, + new HistoryEvent { EventType = HistoryEventType.TimerFired, TimerId = 2 }, + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, Name = FunctionName, EventId = 3 }, + new HistoryEvent { EventType = HistoryEventType.TaskCompleted, Result = "\"OK\"", TaskScheduledId = 3 }, + }; + + var orchestrationContext = new OrchestrationContext { History = history }; + var durableTaskHandler = new DurableTaskHandler(); + + var retryOptions = new RetryOptions(TimeSpan.FromSeconds(1), 2, null, null, null); + + object result = null; + + durableTaskHandler.StopAndInitiateDurableTaskOrReplay( + new ActivityInvocationTask(FunctionName, FunctionInput, retryOptions), + orchestrationContext, + noWait: false, + output: output => { result = output; }, + onFailure: _ => { Assert.True(false, "Unexpected failure"); }, + retryOptions: retryOptions + ); + + Assert.Equal("OK", result); + + var (_, actions) = orchestrationContext.OrchestrationActionCollector.WaitForActions(new AutoResetEvent(initialState: true)); + var action = (CallActivityWithRetryAction)actions.Single().Single(); + Assert.Equal(FunctionName, action.FunctionName); + Assert.Equal(FunctionInput, action.Input); + Assert.NotEmpty(action.RetryOptions); + } + private HistoryEvent[] CreateActivityHistory(string name, bool scheduled, bool completed, string output) { return CreateActivityHistory(name: name, scheduled: scheduled, restartTime: _restartTime, completed: completed, output: output, orchestratorStartedIsProcessed: false); } diff --git a/test/Unit/Durable/DurableTestUtilities.cs b/test/Unit/Durable/DurableTestUtilities.cs index 6676e501..97e0fcf5 100644 --- a/test/Unit/Durable/DurableTestUtilities.cs +++ b/test/Unit/Durable/DurableTestUtilities.cs @@ -86,7 +86,7 @@ public static void ExpectBeginInvoke( }); } - public static List GetCollectedActions(OrchestrationContext orchestrationContext) + public static List> GetCollectedActions(OrchestrationContext orchestrationContext) { var (_, actions) = orchestrationContext.OrchestrationActionCollector.WaitForActions(new ManualResetEvent(true)); return actions; diff --git a/test/Unit/Durable/DurableTimerTaskTests.cs b/test/Unit/Durable/DurableTimerTaskTests.cs index 93074cbb..7e583881 100644 --- a/test/Unit/Durable/DurableTimerTaskTests.cs +++ b/test/Unit/Durable/DurableTimerTaskTests.cs @@ -227,7 +227,7 @@ private int GetUniqueEventId() private void VerifyCreateDurableTimerActionAdded(OrchestrationContext context, DateTime fireAt) { var actions = DurableTestUtilities.GetCollectedActions(context); - var action = (CreateDurableTimerAction)actions.Last(); + var action = (CreateDurableTimerAction)actions.Last().Last(); Assert.Equal(action.FireAt, fireAt); } } diff --git a/test/Unit/Durable/OrchestrationActionCollectorTests.cs b/test/Unit/Durable/OrchestrationActionCollectorTests.cs new file mode 100644 index 00000000..e0cd3d29 --- /dev/null +++ b/test/Unit/Durable/OrchestrationActionCollectorTests.cs @@ -0,0 +1,149 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +// + +namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using Microsoft.Azure.Functions.PowerShellWorker.Durable; + using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions; + using Xunit; + + public class OrchestrationActionCollectorTests + { + private readonly OrchestrationAction[] _expectedActions = + Enumerable.Range(0, 14).Select(i => new CallActivityAction($"Name{i}", $"Input{i}")).ToArray(); + + [Fact] + public void IndicatesShouldNotStopOnSignalledCompletionWaitHandle() + { + var collector = new OrchestrationActionCollector(); + var (shouldStop, _) = collector.WaitForActions(new AutoResetEvent(initialState: true)); + Assert.False(shouldStop); + } + + [Fact] + public void IndicatesShouldStopOnStopEvent() + { + var collector = new OrchestrationActionCollector(); + collector.Stop(); + var (shouldStop, _) = collector.WaitForActions(new AutoResetEvent(initialState: false)); + Assert.True(shouldStop); + } + + [Fact] + public void ReturnsNoActionsWhenNoneAdded() + { + var collector = new OrchestrationActionCollector(); + var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true)); + Assert.Empty(actions); + } + + [Fact] + public void ReturnsSingleAction() + { + var collector = new OrchestrationActionCollector(); + collector.Add(_expectedActions[0]); + var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true)); + + Assert.Single(actions); + Assert.Single(actions.Single()); + Assert.Same(_expectedActions[0], actions.Single().Single()); + } + + [Fact] + public void ReturnsSequentialActions() + { + var collector = new OrchestrationActionCollector(); + + collector.Add(_expectedActions[0]); + collector.NextBatch(); + collector.Add(_expectedActions[1]); + + var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true)); + + var expected = new[] { + new[] { _expectedActions[0] }, + new[] { _expectedActions[1] } + }; + + AssertExpectedActions(expected, actions); + } + + [Fact] + public void ReturnsParallelActions() + { + var collector = new OrchestrationActionCollector(); + + collector.Add(_expectedActions[0]); + collector.Add(_expectedActions[1]); + + var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true)); + + var expected = new[] { + new[] { _expectedActions[0], _expectedActions[1] } + }; + + AssertExpectedActions(expected, actions); + } + + [Fact] + public void ReturnsMixOfSequentialAndParallelActions() + { + var collector = new OrchestrationActionCollector(); + + collector.Add(_expectedActions[0]); + collector.NextBatch(); + collector.Add(_expectedActions[1]); + collector.Add(_expectedActions[2]); + collector.NextBatch(); + collector.Add(_expectedActions[3]); + collector.NextBatch(); + collector.Add(_expectedActions[4]); + collector.Add(_expectedActions[5]); + collector.Add(_expectedActions[6]); + collector.NextBatch(); + collector.Add(_expectedActions[7]); + collector.NextBatch(); + collector.Add(_expectedActions[8]); + collector.NextBatch(); + collector.Add(_expectedActions[9]); + collector.NextBatch(); + collector.Add(_expectedActions[10]); + collector.Add(_expectedActions[11]); + collector.Add(_expectedActions[12]); + collector.Add(_expectedActions[13]); + + var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true)); + + var expected = new[] { + new[] { _expectedActions[0] }, + new[] { _expectedActions[1], _expectedActions[2] }, + new[] { _expectedActions[3] }, + new[] { _expectedActions[4], _expectedActions[5], _expectedActions[6] }, + new[] { _expectedActions[7] }, + new[] { _expectedActions[8] }, + new[] { _expectedActions[9] }, + new[] { _expectedActions[10], _expectedActions[11], _expectedActions[12], _expectedActions[13] } + }; + + AssertExpectedActions(expected, actions); + } + + private void AssertExpectedActions(OrchestrationAction[][] expected, List> actual) + { + Assert.Equal(expected.Count(), actual.Count()); + for (var batchIndex = 0; batchIndex < expected.Count(); ++batchIndex) + { + for (var actionIndex = 0; actionIndex < expected[batchIndex].Count(); ++actionIndex) + { + Assert.Same(expected[batchIndex][actionIndex], actual[batchIndex][actionIndex]); + } + } + } + } +} diff --git a/test/Unit/Durable/OrchestrationFailureExceptionTests.cs b/test/Unit/Durable/OrchestrationFailureExceptionTests.cs index ee183981..af0fa95c 100644 --- a/test/Unit/Durable/OrchestrationFailureExceptionTests.cs +++ b/test/Unit/Durable/OrchestrationFailureExceptionTests.cs @@ -20,7 +20,7 @@ public class OrchestrationFailureExceptionTests [Fact] public void MessageContainsInnerExceptionMessage() { - var e = new OrchestrationFailureException(new List(), _innerException); + var e = new OrchestrationFailureException(new List>(), _innerException); var labelPos = e.Message.IndexOf(OrchestrationFailureException.OutOfProcDataLabel); Assert.Equal(_innerException.Message, e.Message.Substring(0, labelPos)); @@ -29,11 +29,13 @@ public void MessageContainsInnerExceptionMessage() [Fact] public void MessageContainsSerializedOrchestrationMessage() { - var actions = new List { - new CallActivityAction("activity1", "input1"), - new CallActivityAction("activity2", "input2") - }; - + var actions = new List> { + new List { + new CallActivityAction("activity1", "input1"), + new CallActivityAction("activity2", "input2") + } + }; + var e = new OrchestrationFailureException(actions, _innerException); var labelPos = e.Message.IndexOf(OrchestrationFailureException.OutOfProcDataLabel); @@ -44,10 +46,9 @@ public void MessageContainsSerializedOrchestrationMessage() Assert.Null(orchestrationMessage.Output.Value); Assert.Equal(_innerException.Message, (string)orchestrationMessage.Error); var deserializedActions = (IEnumerable)((IEnumerable)orchestrationMessage.Actions).Single(); - Assert.Equal(actions.Count(), deserializedActions.Count()); - for (var i = 0; i < actions.Count(); i++) + for (var i = 0; i < actions.Single().Count(); i++) { - AssertEqualAction((OrchestrationAction)actions[i], deserializedActions.ElementAt(i)); + AssertEqualAction((OrchestrationAction)actions.Single()[i], deserializedActions.ElementAt(i)); } } diff --git a/test/Unit/Durable/OrchestrationInvokerTests.cs b/test/Unit/Durable/OrchestrationInvokerTests.cs index 3d8cd3de..65c4b24d 100644 --- a/test/Unit/Durable/OrchestrationInvokerTests.cs +++ b/test/Unit/Durable/OrchestrationInvokerTests.cs @@ -96,9 +96,16 @@ public void ReturnsOrchestrationActions(bool completed, int actionCount) Assert.Single(result); var returnOrchestrationMessage = (OrchestrationMessage)result["$return"]; - Assert.Single(returnOrchestrationMessage.Actions); - Assert.Equal(actions.Length, returnOrchestrationMessage.Actions.Single().Count); - Assert.Equal(actions, returnOrchestrationMessage.Actions.Single()); + if (actionCount == 0) + { + Assert.Empty(returnOrchestrationMessage.Actions); + } + else + { + Assert.Single(returnOrchestrationMessage.Actions); + Assert.Equal(actions.Length, returnOrchestrationMessage.Actions.Single().Count); + Assert.Equal(actions, returnOrchestrationMessage.Actions.Single()); + } } [Fact] diff --git a/test/Unit/Durable/RetryProcessorTests.cs b/test/Unit/Durable/RetryProcessorTests.cs new file mode 100644 index 00000000..2abefd5c --- /dev/null +++ b/test/Unit/Durable/RetryProcessorTests.cs @@ -0,0 +1,239 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +// + +namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable +{ + using System; + using System.Linq; + using Microsoft.Azure.Functions.PowerShellWorker.Durable; + using Xunit; + + public class RetryProcessorTests + { + [Fact] + public void ContinuesAfterFirstFailure() + { + var history = new[] + { + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 }, + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" } + }; + + AssertRetryProcessorReportsContinue(history, firstEventIndex: 0, maxNumberOfAttempts: 2); + AssertNoEventsProcessed(history); + } + + [Fact] + public void ContinuesAfterSecondFailure() + { + var history = new[] + { + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 }, + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" }, + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 }, + new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 2 }, + + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 }, + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 3, Reason = "Failure 2" }, + }; + + AssertRetryProcessorReportsContinue(history, firstEventIndex: 0, maxNumberOfAttempts: 2); + AssertEventsProcessed(history, 0, 1, 2, 3); // Don't expect the last Scheduled/Failed pair to be processed + } + + [Fact] + public void FailsOnMaxNumberOfAttempts() + { + var history = new[] + { + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 }, + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" }, + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 }, + new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 2 }, + + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 }, + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 3, Reason = "Failure 2" }, + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 4 }, + new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 4 }, + }; + + AssertRetryProcessorReportsFailure(history, firstEventIndex: 0, maxNumberOfAttempts: 2, "Failure 2"); + AssertAllEventsProcessed(history); + } + + [Fact] + public void SucceedsOnRetry() + { + var history = new[] + { + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 }, + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" }, + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 }, + new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 2 }, + + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 }, + new HistoryEvent { EventType = HistoryEventType.TaskCompleted, EventId = -1, TaskScheduledId = 3, Result = "Success" }, + }; + + AssertRetryProcessorReportsSuccess(history, firstEventIndex: 0, maxNumberOfAttempts: 2, "Success"); + AssertAllEventsProcessed(history); + } + + [Fact] + public void IgnoresPreviousHistory() + { + var history = new[] + { + // From a previous activity invocation + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 }, + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" }, + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 }, + new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 2 }, + + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 }, + new HistoryEvent { EventType = HistoryEventType.TaskCompleted, EventId = -1, TaskScheduledId = 3, Result = "Success 1" }, + + // The current invocation starts here: + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 4 }, + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 4, Reason = "Failure 2" }, + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 5 }, + new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 5 }, + + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 6 }, + new HistoryEvent { EventType = HistoryEventType.TaskCompleted, EventId = -1, TaskScheduledId = 6, Result = "Success 2" }, + }; + + AssertRetryProcessorReportsSuccess(history, firstEventIndex: 6, maxNumberOfAttempts: 2, "Success 2"); + AssertEventsProcessed(history, 6, 7, 8, 9, 10, 11); + } + + // This history emulates the situation when multiple activity invocations are scheduled at the same time + // ("fan-out" scenario): + // - Activity A failed on the first attempt and succeeded on the second attempt. + // - Activity B failed after two attempts. + // - Activity C failed on the first attempt and has not been retried yet. + private static HistoryEvent[] CreateInterleavingHistory() + { + return new[] + { + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 }, // 0: A + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 2 }, // 1: B + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 }, // 2: C + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "A1" }, // 3: A + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 4 }, // 4: A + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 2, Reason = "B1" }, // 5: B + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 5 }, // 6: B + new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 4 }, // 7: A + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 6 }, // 8: A + new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 5 }, // 9: B + new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 7 }, // 10: B + new HistoryEvent { EventType = HistoryEventType.TaskCompleted, EventId = -1, TaskScheduledId = 6, Result = "OK" }, // 11: A + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 7, Reason = "B2" }, // 12: B + new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 8 }, // 13: B + new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 8 }, // 14: B + new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 3, Reason = "C1" }, // 15: C + }; + } + + [Fact] + public void InterleavingRetries_ReportsSuccess() + { + var history = CreateInterleavingHistory(); + + // Activity A + AssertRetryProcessorReportsSuccess(history, firstEventIndex: 0, maxNumberOfAttempts: 2, "OK"); + AssertEventsProcessed(history, 0, 3, 4, 7, 8, 11); + } + + [Fact] + public void InterleavingRetries_ReportsFailure() + { + var history = CreateInterleavingHistory(); + + // Activity B + AssertRetryProcessorReportsFailure(history, firstEventIndex: 1, maxNumberOfAttempts: 2, "B2"); + AssertEventsProcessed(history, 1, 5, 6, 9, 10, 12, 13, 14); + } + + [Fact] + public void InterleavingRetries_ReportsContinue() + { + var history = CreateInterleavingHistory(); + + // Activity C + AssertRetryProcessorReportsContinue(history, firstEventIndex: 2, maxNumberOfAttempts: 2); + AssertNoEventsProcessed(history); + } + + private static void AssertRetryProcessorReportsContinue(HistoryEvent[] history, int firstEventIndex, int maxNumberOfAttempts) + { + var shouldRetry = RetryProcessor.Process( + history, + history[firstEventIndex], + maxNumberOfAttempts, + onSuccess: result => { Assert.True(false, $"Unexpected output: {result}"); }, + onFinalFailure: reason => { Assert.True(false, $"Unexpected failure: {reason}"); }); + + Assert.True(shouldRetry); + } + + private static void AssertRetryProcessorReportsFailure(HistoryEvent[] history, int firstEventIndex, int maxNumberOfAttempts, string expectedFailureReason) + { + string actualFailureReason = null; + + var shouldRetry = RetryProcessor.Process( + history, + history[firstEventIndex], + maxNumberOfAttempts, + onSuccess: result => { Assert.True(false, $"Unexpected output: {result}"); }, + onFinalFailure: reason => + { + Assert.Null(actualFailureReason); + actualFailureReason = reason; + }); + + Assert.False(shouldRetry); + Assert.Equal(expectedFailureReason, actualFailureReason); + } + + private static void AssertRetryProcessorReportsSuccess(HistoryEvent[] history, int firstEventIndex, int maxNumberOfAttempts, string expectedOutput) + { + string actualOutput = null; + + var shouldRetry = RetryProcessor.Process( + history, + history[firstEventIndex], + maxNumberOfAttempts, + onSuccess: result => + { + Assert.Null(actualOutput); + actualOutput = result; + }, + onFinalFailure: reason => { Assert.True(false, $"Unexpected failure: {reason}"); }); + + Assert.False(shouldRetry); + Assert.Equal(expectedOutput, actualOutput); + } + + private static void AssertEventsProcessed(HistoryEvent[] history, params int[] expectedProcessedIndexes) + { + for (var i = 0; i < history.Length; ++i) + { + var expectedProcessed = expectedProcessedIndexes.Contains(i); + Assert.Equal(expectedProcessed, history[i].IsProcessed); + } + } + + private static void AssertAllEventsProcessed(HistoryEvent[] history) + { + Assert.True(history.All(e => e.IsProcessed)); + } + + private static void AssertNoEventsProcessed(HistoryEvent[] history) + { + AssertEventsProcessed(history); // Note: passing nothing to expectedProcessedIndexes + } + } +}