From 1bd6a763d0fd39dd007806f4aef0cdbc22d4927b Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Fri, 4 Nov 2022 00:03:52 +0100 Subject: [PATCH] Implement TaskSeq.init, initAsync, initInfinite, initInfiniteAsync and TaskSeq.concat, plus tests and docs --- .../FSharpy.TaskSeq.Test.fsproj | 2 + .../TaskSeq.Concat.Tests.fs | 51 +++++++ .../TaskSeq.Init.Tests.fs | 142 ++++++++++++++++++ src/FSharpy.TaskSeq/TaskSeq.fs | 11 ++ src/FSharpy.TaskSeq/TaskSeq.fsi | 77 +++++++++- src/FSharpy.TaskSeq/TaskSeqInternal.fs | 66 +++++++- 6 files changed, 347 insertions(+), 2 deletions(-) create mode 100644 src/FSharpy.TaskSeq.Test/TaskSeq.Concat.Tests.fs create mode 100644 src/FSharpy.TaskSeq.Test/TaskSeq.Init.Tests.fs diff --git a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj index 9c102755..19815165 100644 --- a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj +++ b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj @@ -15,6 +15,7 @@ + @@ -23,6 +24,7 @@ + diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Concat.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Concat.Tests.fs new file mode 100644 index 00000000..d1bab1c9 --- /dev/null +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Concat.Tests.fs @@ -0,0 +1,51 @@ +module FSharpy.Tests.Concat + +open System + +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharpy +open System.Collections.Generic + +// +// TaskSeq.concat +// + +let validateSequence ts = + ts + |> TaskSeq.toSeqCachedAsync + |> Task.map (Seq.map string) + |> Task.map (String.concat "") + |> Task.map (should equal "123456789101234567891012345678910") + +module EmptySeq = + [)>] + let ``TaskSeq-concat with empty sequences`` variant = + taskSeq { + yield Gen.getEmptyVariant variant // not yield-bang! + yield Gen.getEmptyVariant variant + yield Gen.getEmptyVariant variant + } + |> TaskSeq.concat + |> verifyEmpty + + [)>] + let ``TaskSeq-concat with top sequence empty`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.box + |> TaskSeq.cast> // casting an int to an enumerable, LOL! + |> TaskSeq.concat + |> verifyEmpty + +module Immutable = + [)>] + let ``TaskSeq-concat with empty sequences`` variant = + taskSeq { + yield Gen.getSeqImmutable variant // not yield-bang! + yield Gen.getSeqImmutable variant + yield Gen.getSeqImmutable variant + } + |> TaskSeq.concat + |> validateSequence diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Init.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Init.Tests.fs new file mode 100644 index 00000000..6ec29857 --- /dev/null +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Init.Tests.fs @@ -0,0 +1,142 @@ +module FSharpy.Tests.Init + +open System + +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharpy + +// +// TaskSeq.init +// TaskSeq.initInfinite +// TaskSeq.initAsync +// TaskSeq.initInfiniteAsync +// + +/// Asserts that a sequence contains the char values 'A'..'J'. + +module EmptySeq = + [] + let ``TaskSeq-init can generate an empty sequence`` () = TaskSeq.init 0 (fun x -> x) |> verifyEmpty + + [] + let ``TaskSeq-initAsync can generate an empty sequence`` () = + TaskSeq.initAsync 0 (fun x -> Task.fromResult x) + |> verifyEmpty + + [] + let ``TaskSeq-init with a negative count gives an error`` () = + fun () -> + TaskSeq.init -1 (fun x -> Task.fromResult x) + |> TaskSeq.toArrayAsync + |> Task.ignore + + |> should throwAsyncExact typeof + + fun () -> + TaskSeq.init Int32.MinValue (fun x -> Task.fromResult x) + |> TaskSeq.toArrayAsync + |> Task.ignore + + |> should throwAsyncExact typeof + + [] + let ``TaskSeq-initAsync with a negative count gives an error`` () = + fun () -> + TaskSeq.initAsync Int32.MinValue (fun x -> Task.fromResult x) + |> TaskSeq.toArrayAsync + |> Task.ignore + + |> should throwAsyncExact typeof + +module Immutable = + [] + let ``TaskSeq-init singleton`` () = + TaskSeq.init 1 id + |> TaskSeq.head + |> Task.map (should equal 0) + + [] + let ``TaskSeq-initAsync singleton`` () = + TaskSeq.initAsync 1 (id >> Task.fromResult) + |> TaskSeq.head + |> Task.map (should equal 0) + + [] + let ``TaskSeq-init some values`` () = + TaskSeq.init 42 (fun x -> x / 2) + |> TaskSeq.length + |> Task.map (should equal 42) + + [] + let ``TaskSeq-initAsync some values`` () = + TaskSeq.init 42 (fun x -> Task.fromResult (x / 2)) + |> TaskSeq.length + |> Task.map (should equal 42) + + [] + let ``TaskSeq-initInfinite`` () = + TaskSeq.initInfinite (fun x -> x / 2) + |> TaskSeq.item 1_000_001 + |> Task.map (should equal 500_000) + + [] + let ``TaskSeq-initInfiniteAsync`` () = + TaskSeq.initInfiniteAsync (fun x -> Task.fromResult (x / 2)) + |> TaskSeq.item 1_000_001 + |> Task.map (should equal 500_000) + +module SideEffects = + let inc (i: int byref) = + i <- i + 1 + i + + [] + let ``TaskSeq-init singleton with side effects`` () = task { + let mutable x = 0 + + let ts = TaskSeq.init 1 (fun _ -> inc &x) + + do! TaskSeq.head ts |> Task.map (should equal 1) + do! TaskSeq.head ts |> Task.map (should equal 2) + do! TaskSeq.head ts |> Task.map (should equal 3) // state mutates + } + + [] + let ``TaskSeq-init singleton with side effects -- Current`` () = task { + let mutable x = 0 + + let ts = TaskSeq.init 1 (fun _ -> inc &x) + + let enumerator = ts.GetAsyncEnumerator() + let! _ = enumerator.MoveNextAsync() + do enumerator.Current |> should equal 1 + do enumerator.Current |> should equal 1 + do enumerator.Current |> should equal 1 // current state does not mutate + } + + [] + let ``TaskSeq-initAsync singleton with side effects`` () = task { + let mutable x = 0 + + let ts = TaskSeq.initAsync 1 (fun _ -> Task.fromResult (inc &x)) + + do! TaskSeq.head ts |> Task.map (should equal 1) + do! TaskSeq.head ts |> Task.map (should equal 2) + do! TaskSeq.head ts |> Task.map (should equal 3) // state mutates + } + + [] + let ``TaskSeq-initAsync singleton with side effects -- Current`` () = task { + let mutable x = 0 + + let ts = TaskSeq.initAsync 1 (fun _ -> Task.fromResult (inc &x)) + + let enumerator = ts.GetAsyncEnumerator() + let! _ = enumerator.MoveNextAsync() + do enumerator.Current |> should equal 1 + do enumerator.Current |> should equal 1 + do enumerator.Current |> should equal 1 // current state does not mutate + } diff --git a/src/FSharpy.TaskSeq/TaskSeq.fs b/src/FSharpy.TaskSeq/TaskSeq.fs index 3bb1a9c7..64cd33c9 100644 --- a/src/FSharpy.TaskSeq/TaskSeq.fs +++ b/src/FSharpy.TaskSeq/TaskSeq.fs @@ -156,8 +156,18 @@ module TaskSeq = // let length source = Internal.lengthBy None source + let lengthOrMax max source = Internal.lengthBeforeMax max source let lengthBy predicate source = Internal.lengthBy (Some(Predicate predicate)) source let lengthByAsync predicate source = Internal.lengthBy (Some(PredicateAsync predicate)) source + let init count initializer = Internal.init (Some count) (InitAction initializer) + let initInfinite initializer = Internal.init None (InitAction initializer) + let initAsync count initializer = Internal.init (Some count) (InitActionAsync initializer) + let initInfiniteAsync initializer = Internal.init None (InitActionAsync initializer) + + let concat (sources: taskSeq<#taskSeq<'T>>) = taskSeq { + for ts in sources do + yield! (ts :> taskSeq<'T>) + } // // iter/map/collect functions @@ -262,6 +272,7 @@ module TaskSeq = | None -> return Internal.raiseNotFound () } + let findAsync predicate source = task { match! Internal.tryFind (PredicateAsync predicate) source with | Some item -> return item diff --git a/src/FSharpy.TaskSeq/TaskSeq.fsi b/src/FSharpy.TaskSeq/TaskSeq.fsi index 4841289d..493111fa 100644 --- a/src/FSharpy.TaskSeq/TaskSeq.fsi +++ b/src/FSharpy.TaskSeq/TaskSeq.fsi @@ -15,10 +15,17 @@ module TaskSeq = /// /// Returns the length of the sequence. This operation requires the whole sequence to be evaluated and - /// should not be used on potentially infinite sequences. + /// should not be used on potentially infinite sequences, see for an alternative. /// val length: source: taskSeq<'T> -> Task + /// + /// Returns the length of the sequence, or , whichever comes first. This operation requires the task sequence + /// to be evaluated in full, or until items have been processed. Use this method instead of + /// if you want to prevent too many items to be evaluated, or if the sequence is potentially infinite. + /// + val lengthOrMax: max: int -> source: taskSeq<'T> -> Task + /// /// Returns the length of the sequence of all items for which the returns true. /// This operation requires the whole sequence to be evaluated and should not be used on potentially infinite sequences. @@ -32,6 +39,74 @@ module TaskSeq = /// val lengthByAsync: predicate: ('T -> #Task) -> source: taskSeq<'T> -> Task + /// + /// Generates a new task sequence which, when iterated, will return successive elements by calling the given function + /// with the current index, up to the given count. Each element is saved after its initialization for successive access to + /// , which will not re-evaluate the . However, + /// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may + /// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should + /// not be accessed concurrently. + /// + /// + /// The maximum number of items to generate for the sequence. + /// A function that generates an item in the sequence from a given index. + /// The resulting task sequence. + /// Thrown when count is negative. + val init: count: int -> initializer: (int -> 'T) -> taskSeq<'T> + + /// + /// Generates a new task sequence which, when iterated, will return successive elements by calling the given function + /// with the current index, up to the given count. Each element is saved after its initialization for successive access to + /// , which will not re-evaluate the . However, + /// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may + /// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should + /// not be accessed concurrently. + /// + /// + /// The maximum number of items to generate for the sequence. + /// A function that generates an item in the sequence from a given index. + /// The resulting task sequence. + /// Thrown when count is negative. + val initAsync: count: int -> initializer: (int -> #Task<'T>) -> taskSeq<'T> + + /// + /// Generates a new task sequence which, when iterated, will return successive elements by calling the given function + /// with the current index, ad infinitum, or until is reached. + /// Each element is saved after its initialization for successive access to + /// , which will not re-evaluate the . However, + /// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may + /// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should + /// not be accessed concurrently. + /// + /// + /// A function that generates an item in the sequence from a given index. + /// The resulting task sequence. + val initInfinite: initializer: (int -> 'T) -> taskSeq<'T> + + /// + /// Generates a new task sequence which, when iterated, will return successive elements by calling the given function + /// with the current index, ad infinitum, or until is reached. + /// Each element is saved after its initialization for successive access to + /// , which will not re-evaluate the . However, + /// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may + /// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should + /// not be accessed concurrently. + /// + /// + /// A function that generates an item in the sequence from a given index. + /// The resulting task sequence. + val initInfiniteAsync: initializer: (int -> #Task<'T>) -> taskSeq<'T> + + /// + /// Combines the given task sequence of task sequences and concatenates them end-to-end, to form a + /// new flattened, single task sequence. Each task sequence is awaited item by item, before the next is iterated. + /// + /// + /// The input enumeration-of-enumerations. + /// The resulting task sequence. + /// Thrown when the input sequence is null. + val concat: sources: taskSeq<#taskSeq<'T>> -> taskSeq<'T> + /// Returns taskSeq as an array. This function is blocking until the sequence is exhausted and will properly dispose of the resources. val toList: source: taskSeq<'T> -> 'T list diff --git a/src/FSharpy.TaskSeq/TaskSeqInternal.fs b/src/FSharpy.TaskSeq/TaskSeqInternal.fs index 5d804b12..7d3870ef 100644 --- a/src/FSharpy.TaskSeq/TaskSeqInternal.fs +++ b/src/FSharpy.TaskSeq/TaskSeqInternal.fs @@ -33,11 +33,20 @@ type PredicateAction<'T, 'U, 'TaskBool when 'TaskBool :> Task> = | Predicate of try_filter: ('T -> bool) | PredicateAsync of async_try_filter: ('T -> 'TaskBool) +[] +type InitAction<'T, 'TaskT when 'TaskT :> Task<'T>> = + | InitAction of init_item: (int -> 'T) + | InitActionAsync of async_init_item: (int -> 'TaskT) + module internal TaskSeqInternal = let inline raiseEmptySeq () = ArgumentException("The asynchronous input sequence was empty.", "source") |> raise + let inline raiseCannotBeNegative (name: string) = + ArgumentException("The value cannot be negative", name) + |> raise + let inline raiseInsufficient () = ArgumentException("The asynchronous input sequence was has an insufficient number of elements.", "source") |> raise @@ -64,7 +73,7 @@ module internal TaskSeqInternal = | None -> while go do let! step = e.MoveNextAsync() - i <- i + 1 + i <- i + 1 // update before moving: we are counting, not indexing go <- step | Some (Predicate predicate) -> @@ -87,6 +96,22 @@ module internal TaskSeqInternal = return i } + /// Returns length unconditionally, or based on a predicate + let lengthBeforeMax max (source: taskSeq<_>) = task { + use e = source.GetAsyncEnumerator(CancellationToken()) + let mutable go = true + let mutable i = 0 + let! step = e.MoveNextAsync() + go <- step + + while go && i < max do + i <- i + 1 // update before moving: we are counting, not indexing + let! step = e.MoveNextAsync() + go <- step + + return i + } + let tryExactlyOne (source: taskSeq<_>) = task { use e = source.GetAsyncEnumerator(CancellationToken()) @@ -104,6 +129,45 @@ module internal TaskSeqInternal = return None } + + let init count initializer = taskSeq { + let mutable i = 0 + let mutable value: Lazy<'T> = Unchecked.defaultof<_> + + let count = + match count with + | Some c -> if c >= 0 then c else raiseCannotBeNegative (nameof count) + | None -> Int32.MaxValue + + match initializer with + | InitAction init -> + while i < count do + // using Lazy gives us locking and safe multiple access to the cached value, if + // multiple threads access the same item through the same enumerator (which is + // bad practice, but hey, who're we to judge). + if isNull value then + value <- Lazy<_>.Create (fun () -> init i) + + yield value.Force() + value <- Unchecked.defaultof<_> + i <- i + 1 + + | InitActionAsync asyncInit -> + while i < count do + // using Lazy gives us locking and safe multiple access to the cached value, if + // multiple threads access the same item through the same enumerator (which is + // bad practice, but hey, who're we to judge). + if isNull value then + // TODO: is there a 'Lazy' we can use with Task? + let! value' = asyncInit i + value <- Lazy<_>.CreateFromValue value' + + yield value.Force() + value <- Unchecked.defaultof<_> + i <- i + 1 + + } + let iter action (source: taskSeq<_>) = task { use e = source.GetAsyncEnumerator(CancellationToken()) let mutable go = true