Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 66 additions & 19 deletions src/Microsoft.Android.Build.BaseTasks/AsyncTaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// https://github.com/xamarin/xamarin-android/blob/83854738b8e01747f9536f426fe17ad784cc2081/src/Xamarin.Android.Build.Tasks/Utilities/AsyncTaskExtensions.cs
// https://github.com/xamarin/xamarin-android/blob/83854738b8e01747f9536f426fe17ad784cc2081/src/Xamarin.Android.Build.Tasks/Utilities/AsyncTaskExtensions.cs

using System;
using System.Collections.Generic;
Expand All @@ -12,17 +12,24 @@ public static class AsyncTaskExtensions
/// <summary>
/// Creates a collection of Task with proper CancellationToken and error handling and waits via Task.WhenAll
/// </summary>
public static Task WhenAll<TSource>(this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body)
public static Task WhenAll<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body) =>
asyncTask.WhenAll (source, body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);

/// <summary>
/// Creates a collection of Task with proper CancellationToken and error handling and waits via Task.WhenAll
/// </summary>
public static Task WhenAll<TSource>(this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body, int maxConcurrencyLevel, TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning)
{
var scheduler = GetTaskScheduler (maxConcurrencyLevel);
var tasks = new List<Task> ();
foreach (var s in source) {
tasks.Add (Task.Run (() => {
tasks.Add (Task.Factory.StartNew (() => {
try {
body (s);
} catch (Exception exc) {
LogErrorAndCancel (asyncTask, exc);
}
}, asyncTask.CancellationToken));
}, asyncTask.CancellationToken, creationOptions, scheduler));
}
return Task.WhenAll (tasks);
}
Expand All @@ -31,28 +38,42 @@ public static Task WhenAll<TSource>(this AsyncTask asyncTask, IEnumerable<TSourc
/// Creates a collection of Task with proper CancellationToken and error handling and waits via Task.WhenAll
/// Passes an object the inner method can use for locking. The callback is of the form: (T item, object lockObject)
/// </summary>
public static Task WhenAllWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body)
public static Task WhenAllWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body) =>
asyncTask.WhenAllWithLock (source, body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);

/// <summary>
/// Creates a collection of Task with proper CancellationToken and error handling and waits via Task.WhenAll
/// Passes an object the inner method can use for locking. The callback is of the form: (T item, object lockObject)
/// </summary>
public static Task WhenAllWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body, int maxConcurrencyLevel, TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning)
{
var scheduler = GetTaskScheduler (maxConcurrencyLevel);
var lockObject = new object ();
var tasks = new List<Task> ();
foreach (var s in source) {
tasks.Add (Task.Run (() => {
tasks.Add (Task.Factory.StartNew (() => {
try {
body (s, lockObject);
} catch (Exception exc) {
LogErrorAndCancel (asyncTask, exc);
}
}, asyncTask.CancellationToken));
}, asyncTask.CancellationToken, creationOptions, scheduler));
}
return Task.WhenAll (tasks);
}

/// <summary>
/// Calls Parallel.ForEach() with appropriate ParallelOptions and exception handling.
/// </summary>
public static ParallelLoopResult ParallelForEach<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body)
public static ParallelLoopResult ParallelForEach<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body) =>
asyncTask.ParallelForEach (source, body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);

/// <summary>
/// Calls Parallel.ForEach() with appropriate ParallelOptions and exception handling.
/// </summary>
public static ParallelLoopResult ParallelForEach<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body, int maxConcurrencyLevel)
{
var options = ParallelOptions (asyncTask);
var options = ParallelOptions (asyncTask, maxConcurrencyLevel);
return Parallel.ForEach (source, options, s => {
try {
body (s);
Expand All @@ -66,9 +87,16 @@ public static ParallelLoopResult ParallelForEach<TSource> (this AsyncTask asyncT
/// Calls Parallel.ForEach() with appropriate ParallelOptions and exception handling.
/// Passes an object the inner method can use for locking. The callback is of the form: (T item, object lockObject)
/// </summary>
public static ParallelLoopResult ParallelForEachWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body)
public static ParallelLoopResult ParallelForEachWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body) =>
asyncTask.ParallelForEachWithLock (source, body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);

/// <summary>
/// Calls Parallel.ForEach() with appropriate ParallelOptions and exception handling.
/// Passes an object the inner method can use for locking. The callback is of the form: (T item, object lockObject)
/// </summary>
public static ParallelLoopResult ParallelForEachWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body, int maxConcurrencyLevel)
{
var options = ParallelOptions (asyncTask);
var options = ParallelOptions (asyncTask, maxConcurrencyLevel);
var lockObject = new object ();
return Parallel.ForEach (source, options, s => {
try {
Expand All @@ -79,28 +107,47 @@ public static ParallelLoopResult ParallelForEachWithLock<TSource> (this AsyncTas
});
}

static ParallelOptions ParallelOptions (AsyncTask asyncTask) => new ParallelOptions {
static ParallelOptions ParallelOptions (AsyncTask asyncTask, int maxConcurrencyLevel) => new ParallelOptions {
CancellationToken = asyncTask.CancellationToken,
TaskScheduler = TaskScheduler.Default,
TaskScheduler = GetTaskScheduler (maxConcurrencyLevel),
};

static TaskScheduler GetTaskScheduler (int maxConcurrencyLevel)
{
var pair = new ConcurrentExclusiveSchedulerPair (TaskScheduler.Default, maxConcurrencyLevel);
return pair.ConcurrentScheduler;
}

static int DefaultMaxConcurrencyLevel => Math.Max (1, Environment.ProcessorCount - 1);

static void LogErrorAndCancel (AsyncTask asyncTask, Exception exc)
{
asyncTask.LogCodedError ("XA0000", Properties.Resources.XA0000_Exception, exc);
asyncTask.Cancel ();
}

/// <summary>
/// Calls Task.Run() with a proper CancellationToken.
/// Calls Task.Factory.StartNew() with a proper CancellationToken, TaskScheduler, and TaskCreationOptions.LongRunning.
/// </summary>
public static Task RunTask (this AsyncTask asyncTask, Action body) =>
asyncTask.RunTask (body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);

/// <summary>
/// Calls Task.Factory.StartNew() with a proper CancellationToken
/// </summary>
public static Task RunTask (this AsyncTask asyncTask, Action body) =>
Task.Run (body, asyncTask.CancellationToken);
public static Task RunTask (this AsyncTask asyncTask, Action body, int maxConcurrencyLevel, TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning) =>
Task.Factory.StartNew (body, asyncTask.CancellationToken, creationOptions, GetTaskScheduler (maxConcurrencyLevel));

/// <summary>
/// Calls Task.Factory.StartNew<T>() with a proper CancellationToken, TaskScheduler, and TaskCreationOptions.LongRunning.
/// </summary>
public static Task<TSource> RunTask<TSource> (this AsyncTask asyncTask, Func<TSource> body) =>
asyncTask.RunTask (body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);

/// <summary>
/// Calls Task.Run<T>() with a proper CancellationToken.
/// Calls Task.Factory.StartNew<T>() with a proper CancellationToken.
/// </summary>
public static Task<TSource> RunTask<TSource> (this AsyncTask asyncTask, Func<TSource> body) =>
Task.Run (body, asyncTask.CancellationToken);
public static Task<TSource> RunTask<TSource> (this AsyncTask asyncTask, Func<TSource> body, int maxConcurrencyLevel, TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning) =>
Task.Factory.StartNew (body, asyncTask.CancellationToken, creationOptions, GetTaskScheduler (maxConcurrencyLevel));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Android.Build.Tasks;
using NUnit.Framework;
using Xamarin.Build;

namespace Microsoft.Android.Build.BaseTasks.Tests
{
[TestFixture]
public class AsyncTaskExtensionsTests
{
const int Iterations = 32;

[Test]
public async Task RunTask ()
{
bool set = false;
await new AsyncTask ().RunTask (delegate { set = true; }); // delegate { } has void return type
Assert.IsTrue (set);
}

[Test]
public async Task RunTaskOfT ()
{
bool set = false;
Assert.IsTrue (await new AsyncTask ().RunTask (() => set = true), "RunTask should return true");
Assert.IsTrue (set);
}

[Test]
public async Task WhenAll ()
{
bool set = false;
await new AsyncTask ().WhenAll (new [] { 0 }, _ => set = true);
Assert.IsTrue (set);
}

[Test]
public async Task WhenAllWithLock ()
{
var input = new int [Iterations];
var output = new List<int> ();
await new AsyncTask ().WhenAllWithLock (input, (i, l) => {
lock (l) output.Add (i);
});
Assert.AreEqual (Iterations, output.Count);
}

[Test]
public void ParallelForEach ()
{
bool set = false;
new AsyncTask ().ParallelForEach (new [] { 0 }, _ => set = true);
Assert.IsTrue (set);
}

[Test]
public void ParallelForEachWithLock ()
{
var input = new int [Iterations];
var output = new List<int> ();
new AsyncTask ().ParallelForEachWithLock (input, (i, l) => {
lock (l) output.Add (i);
});
Assert.AreEqual (Iterations, output.Count);
}
}
}