Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions src/TaskManager/Plug-ins/Argo/ArgoClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
using System.Text;
using Argo;
using Ardalis.GuardClauses;

using Microsoft.Extensions.Logging;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Logging;
using System.Net;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions;

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
{
public class ArgoClient : BaseArgoClient, IArgoClient
{
public ArgoClient(HttpClient httpClient) : base(httpClient) { }
public ArgoClient(HttpClient httpClient, ILoggerFactory logger) : base(httpClient, logger) { }

public async Task<Workflow> Argo_CreateWorkflowAsync(string argoNamespace, WorkflowCreateRequest body, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -77,7 +80,23 @@ public async Task<Workflow> Argo_StopWorkflowAsync(string argoNamespace, string

const string method = "PUT";
var content = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(body));
return await SendRequest<Workflow>(content, urlBuilder, method, new CancellationToken()).ConfigureAwait(false);
try
{
return await SendRequest<Workflow>(content, urlBuilder, method, new CancellationToken()).ConfigureAwait(false);
}
catch (ApiException<Error> ex)
{
if (ex.StatusCode == (int)HttpStatusCode.NotFound)
{
throw new ArgoWorkflowNotFoundException(body.Name, ex);
}
throw;
}
catch (Exception)
{
throw;
}


}

Expand All @@ -92,7 +111,22 @@ public async Task<Workflow> Argo_TerminateWorkflowAsync(string argoNamespace, st

const string method = "PUT";
var content = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(body));
return await SendRequest<Workflow>(content, urlBuilder, method, new CancellationToken()).ConfigureAwait(false);
try
{
return await SendRequest<Workflow>(content, urlBuilder, method, new CancellationToken()).ConfigureAwait(false);
}
catch (ApiException<Error> ex)
{
if (ex.StatusCode == (int)HttpStatusCode.NotFound)
{
throw new ArgoWorkflowNotFoundException(body.Name, ex);
}
throw;
}
catch (Exception)
{
throw;
}
}

public async Task<WorkflowTemplate?> Argo_GetWorkflowTemplateAsync(string argoNamespace, string name, string? getOptionsResourceVersion)
Expand Down Expand Up @@ -231,9 +265,11 @@ public class BaseArgoClient

protected readonly HttpClient HttpClient;

public BaseArgoClient(HttpClient httpClient)
protected readonly ILogger Logger;
public BaseArgoClient(HttpClient httpClient, ILoggerFactory loggerFactory)
{
HttpClient = httpClient;
Logger = loggerFactory.CreateLogger("BaseArgoClient");
}

protected async Task<T> SendRequest<T>(StringContent stringContent, StringBuilder urlBuilder, string method, CancellationToken cancellationToken)
Expand All @@ -250,6 +286,8 @@ protected async Task<T> SendRequest<T>(StringContent stringContent, StringBuilde
request.RequestUri = new Uri(urlBuilder.ToString(), UriKind.RelativeOrAbsolute);

HttpResponseMessage? response = null;
var logStringContent = stringContent == null ? string.Empty : await stringContent.ReadAsStringAsync();
Logger.CallingArgoHttpInfo(request.RequestUri.ToString(), method, logStringContent);
response = await HttpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken).ConfigureAwait(false);

try
Expand Down
29 changes: 20 additions & 9 deletions src/TaskManager/Plug-ins/Argo/ArgoPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using Monai.Deploy.WorkflowManager.TaskManager.API.Models;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Logging;
using Newtonsoft.Json;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions;

[assembly: PlugIn()]
namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
Expand Down Expand Up @@ -902,18 +903,28 @@ private async ValueTask DisposeAsyncCore()
public override async Task HandleTimeout(string identity)
{
var client = _argoProvider.CreateClient(_baseUrl, _apiToken, _allowInsecure);

await client.Argo_StopWorkflowAsync(_namespace, identity, new WorkflowStopRequest
try
{
Namespace = _namespace,
Name = identity,
});
await client.Argo_StopWorkflowAsync(_namespace, identity, new WorkflowStopRequest
{
Namespace = _namespace,
Name = identity,
});

await client.Argo_TerminateWorkflowAsync(_namespace, identity, new WorkflowTerminateRequest
await client.Argo_TerminateWorkflowAsync(_namespace, identity, new WorkflowTerminateRequest
{
Name = identity,
Namespace = _namespace
});
}
catch (ArgoWorkflowNotFoundException ex)
{
Name = identity,
Namespace = _namespace
});
_logger.ExecptionStoppingArgoWorkflow(identity, ex);
}
catch (Exception)
{
throw;
}
}

public async Task<WorkflowTemplate> CreateArgoTemplate(string template)
Expand Down
7 changes: 4 additions & 3 deletions src/TaskManager/Plug-ins/Argo/ArgoProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ public class ArgoProvider : IArgoProvider
{
private readonly ILogger<ArgoProvider> _logger;
private readonly IHttpClientFactory _httpClientFactory;

public ArgoProvider(ILogger<ArgoProvider> logger, IHttpClientFactory httpClientFactory)
private readonly ILoggerFactory _logFactory;
public ArgoProvider(ILogger<ArgoProvider> logger, IHttpClientFactory httpClientFactory, ILoggerFactory logFactory)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
_logFactory = logFactory;
}

public IArgoClient CreateClient(string baseUrl, string? apiToken, bool allowInsecure = true)
Expand All @@ -50,7 +51,7 @@ public IArgoClient CreateClient(string baseUrl, string? apiToken, bool allowInse
{
httpClient.SetBearerToken(apiToken);
}
return new ArgoClient(httpClient) { BaseUrl = baseUrl };
return new ArgoClient(httpClient, _logFactory) { BaseUrl = baseUrl };
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Runtime.Serialization;

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions
{
[Serializable]
public class ArgoWorkflowNotFoundException : Exception
{
public ArgoWorkflowNotFoundException(string argoWorkflowName)
: base($"Argo workflow '{argoWorkflowName}' not found.")
{
}

public ArgoWorkflowNotFoundException(string? message, Exception? innerException) : base(message, innerException)
{
}

protected ArgoWorkflowNotFoundException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}

public ArgoWorkflowNotFoundException()
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

using System.Runtime.Serialization;

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
namespace Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions
{
[Serializable]
public class ArtifactMappingNotFoundException : Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

using System.Runtime.Serialization;

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
namespace Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions
{
[Serializable]
public class TemplateNotFoundException : Exception
Expand Down
6 changes: 6 additions & 0 deletions src/TaskManager/Plug-ins/Argo/Logging/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,11 @@ public static partial class Log
[LoggerMessage(EventId = 1019, Level = LogLevel.Error, Message = "Error deleting Template in Argo.")]
public static partial void ErrorDeletingWorkflowTemplate(this ILogger logger, Exception ex);

[LoggerMessage(EventId = 1020, Level = LogLevel.Trace, Message = "Calling argo at url {url} : {method} : {stringContent}")]
public static partial void CallingArgoHttpInfo(this ILogger logger, string url, string method, string stringContent);

[LoggerMessage(EventId = 1021, Level = LogLevel.Debug, Message = "Exception stopping argo workflow {workflowId}, does it exist?")]
public static partial void ExecptionStoppingArgoWorkflow(this ILogger logger, string workflowId, Exception ex);

}
}
5 changes: 4 additions & 1 deletion src/TaskManager/TaskManager/Logging/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static partial class Log
[LoggerMessage(EventId = 109, Level = LogLevel.Warning, Message = "Unable to query for job status, no activate executor associated with execution ID={executionId}.")]
public static partial void NoActiveExecutorWithTheId(this ILogger logger, string executionId);

[LoggerMessage(EventId = 110, Level = LogLevel.Error, Message = "Unsupported type of task runner: '{assemblyName}'.")]
[LoggerMessage(EventId = 110, Level = LogLevel.Error, Message = "Exception initialising task runner: '{assemblyName}'.")]
public static partial void UnsupportedRunner(this ILogger logger, string assemblyName, Exception ex);

[LoggerMessage(EventId = 111, Level = LogLevel.Debug, Message = "Sending acknowledgment message for {eventType}.")]
Expand Down Expand Up @@ -122,5 +122,8 @@ public static partial class Log

[LoggerMessage(EventId = 120, Level = LogLevel.Error, Message = "Recovering connection to storage service: {reason}.")]
public static partial void MessagingServiceErrorRecover(this ILogger logger, string reason);

[LoggerMessage(EventId = 121, Level = LogLevel.Error, Message = "Exception handling task : '{assemblyName}' timeout.")]
public static partial void ExectionTimingOutTask(this ILogger logger, string assemblyName, Exception ex);
}
}
21 changes: 17 additions & 4 deletions src/TaskManager/TaskManager/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ private async Task TaskDispatchEventReceivedCallback(MessageReceivedEventArgs ar

private async Task TaskCancelationEventCallback(MessageReceivedEventArgs args)
{
// Cancelation just stops running tasks and does Not set any status
await TaskCallBackGeneric<TaskCancellationEvent>(args, HandleCancellationTask);
}

Expand Down Expand Up @@ -240,6 +241,7 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
}

var pluginAssembly = string.Empty;
ITaskPlugin? taskRunner = null;
try
{
var taskExecution = await _taskDispatchEventService.GetByTaskExecutionIdAsync(message.Body.ExecutionId).ConfigureAwait(false);
Expand All @@ -250,17 +252,28 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
throw new InvalidOperationException("Task Event data not found.");
}

var taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
await taskRunner.HandleTimeout(message.Body.Identity);

AcknowledgeMessage(message);
taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
}
catch (Exception ex)
{
_logger.UnsupportedRunner(pluginAssembly, ex);
await HandleMessageException(message, message.Body.WorkflowInstanceId, message.Body.TaskId, message.Body.ExecutionId, false).ConfigureAwait(false);
return;
}

try
{
await taskRunner.HandleTimeout(message.Body.Identity);
}
catch (Exception ex)
{
// Ignoring exception here as we've asked for the task to be stopped.
_logger.ExectionTimingOutTask(pluginAssembly, ex);
}
finally
{
AcknowledgeMessage(message);
}
}

private async Task HandleTaskCallback(JsonMessage<TaskCallbackEvent> message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private static async Task EnsureIndex(IMongoCollection<WorkflowInstance> workflo
Name = "TasksIndex"
};
var model = new CreateIndexModel<WorkflowInstance>(
Builders<WorkflowInstance>.IndexKeys.Ascending(s => s.Tasks),
Builders<WorkflowInstance>.IndexKeys.Ascending($"{nameof(WorkflowInstance.Tasks)}.{nameof(Task.Status)}"),
options
);

Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowManager/Logging/Log.200000.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static partial class Log
[LoggerMessage(EventId = 200012, Level = LogLevel.Error, Message = "The following task: {taskId} in workflow {workflowInstanceId} is currently timed out and not processing anymore updates, timed out at {timedOut}.")]
public static partial void TaskTimedOut(this ILogger logger, string taskId, string workflowInstanceId, DateTime timedOut);

[LoggerMessage(EventId = 200013, Level = LogLevel.Critical, Message = "Workflow `{workflowId}` not found.")]
[LoggerMessage(EventId = 200013, Level = LogLevel.Critical, Message = "Workflow `{workflowId}` not found or is deleted.")]
public static partial void WorkflowNotFound(this ILogger logger, string workflowId);

[LoggerMessage(EventId = 200014, Level = LogLevel.Error, Message = "The task execution status for task {taskId} cannot be updated from {oldStatus} to {newStatus}. Payload: {payloadId}")]
Expand Down
13 changes: 0 additions & 13 deletions src/WorkflowManager/MonaiBackgroundService/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,6 @@ private async Task PublishCancellationEvent(TaskExecution task, string correlati
{
_logger.TimingOutTaskCancellationEvent(identity, task.WorkflowInstanceId);

var updateEvent = EventMapper.GenerateTaskUpdateEvent(new GenerateTaskUpdateEventParams
{
CorrelationId = correlationId,
ExecutionId = task.ExecutionId,
WorkflowInstanceId = workflowInstanceId,
TaskId = task.TaskId,
TaskExecutionStatus = TaskExecutionStatus.Failed,
FailureReason = FailureReason.TimedOut,
Stats = task.ExecutionStats
});

updateEvent.Validate();

var cancellationEvent = EventMapper.GenerateTaskCancellationEvent(
identity,
task.ExecutionId,
Expand Down
Loading