Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ PowerShell.sln.DotSettings.user
StyleCop.Cache

examples/PSCoreApp/Modules
src/Modules
!src/Modules/Microsoft.Azure.Functions.PowerShellWorker
src/Modules/Microsoft.PowerShell.*
src/Modules/PackageManagement
src/Modules/PowerShellGet
src/Modules/ThreadJob

# protobuf
protobuf/*
Expand Down
9 changes: 9 additions & 0 deletions examples/durable/DurableApp/FlakyActivity/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"bindings": [
{
"name": "name",
"type": "activityTrigger",
"direction": "in"
}
]
}
9 changes: 9 additions & 0 deletions examples/durable/DurableApp/FlakyActivity/run.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
param($name)

# Intentional intermittent error
$random = Get-Random -Minimum 0.0 -Maximum 1.0
if ($random -gt 0.2) {
throw 'Nope, no luck this time...'
}

"Hello $name"
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"bindings": [
{
"name": "Context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using namespace System.Net

param($Context)

$ErrorActionPreference = 'Stop'

$output = @()

$retryOptions = New-DurableRetryOptions `
-FirstRetryInterval (New-Timespan -Seconds 1) `
-MaxNumberOfAttempts 7

$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'Tokyo' -RetryOptions $retryOptions
$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'Seattle' -RetryOptions $retryOptions
$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'London' -RetryOptions $retryOptions

$output
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get",
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "Response"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using namespace System.Net

param($Request, $TriggerMetadata)

Write-Host 'FunctionChainingWithRetriesStart started'

$InstanceId = Start-NewOrchestration -FunctionName 'FunctionChainingWithRetriesOrchestrator' -InputObject 'Hello'
Write-Host "Started orchestration with ID = '$InstanceId'"

$Response = New-OrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $Response

Write-Host 'FunctionChainingWithRetriesStart completed'
71 changes: 71 additions & 0 deletions src/Durable/Actions/CallActivityWithRetryAction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

using System;
using System.Collections.Generic;

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions
{
/// <summary>
/// An orchestration action that represents calling an activity function with retry.
/// </summary>
internal class CallActivityWithRetryAction : OrchestrationAction
{
/// <summary>
/// The activity function name.
/// </summary>
public readonly string FunctionName;

/// <summary>
/// The input to the activity function.
/// </summary>
public readonly object Input;

/// <summary>
/// Retry options.
/// </summary>
public readonly Dictionary<string, object> RetryOptions;

public CallActivityWithRetryAction(string functionName, object input, RetryOptions retryOptions)
: base(ActionType.CallActivityWithRetry)
{
FunctionName = functionName;
Input = input;
RetryOptions = ToDictionary(retryOptions);
}

private static Dictionary<string, object> ToDictionary(RetryOptions retryOptions)
{
var result = new Dictionary<string, object>()
{
{ "firstRetryIntervalInMilliseconds", ToIntMilliseconds(retryOptions.FirstRetryInterval) },
{ "maxNumberOfAttempts", retryOptions.MaxNumberOfAttempts }
};

AddOptionalValue(result, "backoffCoefficient", retryOptions.BackoffCoefficient, x => x);
AddOptionalValue(result, "maxRetryIntervalInMilliseconds", retryOptions.MaxRetryInterval, ToIntMilliseconds);
AddOptionalValue(result, "retryTimeoutInMilliseconds", retryOptions.RetryTimeout, ToIntMilliseconds);

return result;
}

private static void AddOptionalValue<T>(
Dictionary<string, object> dictionary,
string name,
T? nullable,
Func<T, object> transformValue) where T : struct
{
if (nullable.HasValue)
{
dictionary.Add(name, transformValue(nullable.Value));
}
}

private static object ToIntMilliseconds(TimeSpan timespan)
{
return (int)timespan.TotalMilliseconds;
}
}
}
11 changes: 9 additions & 2 deletions src/Durable/Commands/InvokeActivityFunctionCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class InvokeActivityFunctionCommand : PSCmdlet
[Parameter]
public SwitchParameter NoWait { get; set; }

[Parameter]
[ValidateNotNull]
public RetryOptions RetryOptions { get; set; }

private readonly DurableTaskHandler _durableTaskHandler = new DurableTaskHandler();

protected override void EndProcessing()
Expand All @@ -41,11 +45,14 @@ protected override void EndProcessing()
var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey];
var loadedFunctions = FunctionLoader.GetLoadedFunctions();

var task = new ActivityInvocationTask(FunctionName, Input);
var task = new ActivityInvocationTask(FunctionName, Input, RetryOptions);
ActivityInvocationTask.ValidateTask(task, loadedFunctions);

_durableTaskHandler.StopAndInitiateDurableTaskOrReplay(
task, context, NoWait.IsPresent, WriteObject, failureReason => DurableActivityErrorHandler.Handle(this, failureReason));
task, context, NoWait.IsPresent,
output: WriteObject,
onFailure: failureReason => DurableActivityErrorHandler.Handle(this, failureReason),
retryOptions: RetryOptions);
}

protected override void StopProcessing()
Expand Down
36 changes: 34 additions & 2 deletions src/Durable/DurableTaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public void StopAndInitiateDurableTaskOrReplay(
OrchestrationContext context,
bool noWait,
Action<object> output,
Action<string> onFailure)
Action<string> onFailure,
RetryOptions retryOptions = null)
{
context.OrchestrationActionCollector.Add(task.CreateOrchestrationAction());

Expand All @@ -30,6 +31,8 @@ public void StopAndInitiateDurableTaskOrReplay(
}
else
{
context.OrchestrationActionCollector.NextBatch();

var scheduledHistoryEvent = task.GetScheduledHistoryEvent(context);
var completedHistoryEvent = task.GetCompletedHistoryEvent(context, scheduledHistoryEvent);

Expand All @@ -56,7 +59,32 @@ public void StopAndInitiateDurableTaskOrReplay(
break;

case HistoryEventType.TaskFailed:
onFailure(completedHistoryEvent.Reason);
if (retryOptions == null)
{
onFailure(completedHistoryEvent.Reason);
}
else
{
// Reset IsProcessed, let RetryProcessor handle these events instead.
scheduledHistoryEvent.IsProcessed = false;
completedHistoryEvent.IsProcessed = false;

var shouldContinueProcessing =
RetryProcessor.Process(
context.History,
scheduledHistoryEvent,
retryOptions.MaxNumberOfAttempts,
onSuccess:
result => {
output(TypeExtensions.ConvertFromJson(result));
},
onFailure);

if (shouldContinueProcessing)
{
InitiateAndWaitForStop(context);
}
}
break;
}
}
Expand All @@ -73,6 +101,8 @@ public void WaitAll(
OrchestrationContext context,
Action<object> output)
{
context.OrchestrationActionCollector.NextBatch();

var completedEvents = new List<HistoryEvent>();
foreach (var task in tasksToWaitFor)
{
Expand Down Expand Up @@ -118,6 +148,8 @@ public void WaitAny(
OrchestrationContext context,
Action<object> output)
{
context.OrchestrationActionCollector.NextBatch();

var completedTasks = new List<DurableTask>();
DurableTask firstCompletedTask = null;
int firstCompletedHistoryEventIndex = -1;
Expand Down
7 changes: 7 additions & 0 deletions src/Durable/HistoryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,12 @@ internal class HistoryEvent

// Internal used only
public bool IsProcessed { get; set; }

public override string ToString()
{
var relatedEventId = EventType == HistoryEventType.TimerFired ? TimerId : TaskScheduledId;
var processedMarker = IsProcessed ? "X" : " ";
return $"[{EventId}] {EventType} '{Name}' ({relatedEventId}) [{processedMarker}]";
}
}
}
20 changes: 17 additions & 3 deletions src/Durable/OrchestrationActionCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,36 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;

internal class OrchestrationActionCollector
{
private readonly List<OrchestrationAction> _actions = new List<OrchestrationAction>();
private readonly List<List<OrchestrationAction>> _actions = new();

private readonly AutoResetEvent _stopEvent = new AutoResetEvent(initialState: false);

private bool _nextBatch = true;

public void Add(OrchestrationAction action)
{
_actions.Add(action);
if (_nextBatch)
{
_actions.Add(new List<OrchestrationAction>());
_nextBatch = false;
}

_actions.Last().Add(action);
}

public void NextBatch()
{
_nextBatch = true;
}

public Tuple<bool, List<OrchestrationAction>> WaitForActions(WaitHandle completionWaitHandle)
public Tuple<bool, List<List<OrchestrationAction>>> WaitForActions(WaitHandle completionWaitHandle)
{
var waitHandles = new[] { _stopEvent, completionWaitHandle };
var signaledHandleIndex = WaitHandle.WaitAny(waitHandles);
Expand Down
6 changes: 3 additions & 3 deletions src/Durable/OrchestrationFailureException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ public OrchestrationFailureException()
{
}

public OrchestrationFailureException(List<OrchestrationAction> actions, Exception innerException)
public OrchestrationFailureException(List<List<OrchestrationAction>> actions, Exception innerException)
: base(FormatOrchestrationFailureMessage(actions, innerException), innerException)
{
}

private static string FormatOrchestrationFailureMessage(List<OrchestrationAction> actions, Exception exception)
private static string FormatOrchestrationFailureMessage(List<List<OrchestrationAction>> actions, Exception exception)
{
// For more details on why this message looks like this, see:
// - https://github.com/Azure/azure-functions-durable-js/pull/145
// - https://github.com/Azure/azure-functions-durable-extension/pull/1171
var orchestrationMessage = new OrchestrationMessage(isDone: false, new List<List<OrchestrationAction>> { actions }, output: null, exception.Message);
var orchestrationMessage = new OrchestrationMessage(isDone: false, actions, output: null, exception.Message);
var message = $"{exception.Message}{OutOfProcDataLabel}{JsonConvert.SerializeObject(orchestrationMessage)}";
return message;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Durable/OrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowe

private static Hashtable CreateOrchestrationResult(
bool isDone,
List<OrchestrationAction> actions,
List<List<OrchestrationAction>> actions,
object output)
{
var orchestrationMessage = new OrchestrationMessage(isDone, new List<List<OrchestrationAction>> { actions }, output);
var orchestrationMessage = new OrchestrationMessage(isDone, actions, output);
return new Hashtable { { AzFunctionInfo.DollarReturn, orchestrationMessage } };
}
}
Expand Down
38 changes: 38 additions & 0 deletions src/Durable/RetryOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

#pragma warning disable 1591 // Missing XML comment for publicly visible type or member 'member'

using System;

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
public class RetryOptions
{
public TimeSpan FirstRetryInterval { get; }

public int MaxNumberOfAttempts { get; }

public double? BackoffCoefficient { get; }

public TimeSpan? MaxRetryInterval { get; }

public TimeSpan? RetryTimeout { get; }

public RetryOptions(
TimeSpan firstRetryInterval,
int maxNumberOfAttempts,
double? backoffCoefficient,
TimeSpan? maxRetryInterval,
TimeSpan? retryTimeout)
{
this.FirstRetryInterval = firstRetryInterval;
this.MaxNumberOfAttempts = maxNumberOfAttempts;
this.BackoffCoefficient = backoffCoefficient;
this.MaxRetryInterval = maxRetryInterval;
this.RetryTimeout = retryTimeout;
}
}
}
Loading