diff --git a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj
index 9c102755..19815165 100644
--- a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj
+++ b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj
@@ -15,6 +15,7 @@
+
@@ -23,6 +24,7 @@
+
diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Concat.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Concat.Tests.fs
new file mode 100644
index 00000000..d1bab1c9
--- /dev/null
+++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Concat.Tests.fs
@@ -0,0 +1,51 @@
+module FSharpy.Tests.Concat
+
+open System
+
+open Xunit
+open FsUnit.Xunit
+open FsToolkit.ErrorHandling
+
+open FSharpy
+open System.Collections.Generic
+
+//
+// TaskSeq.concat
+//
+
+let validateSequence ts =
+ ts
+ |> TaskSeq.toSeqCachedAsync
+ |> Task.map (Seq.map string)
+ |> Task.map (String.concat "")
+ |> Task.map (should equal "123456789101234567891012345678910")
+
+module EmptySeq =
+ [)>]
+ let ``TaskSeq-concat with empty sequences`` variant =
+ taskSeq {
+ yield Gen.getEmptyVariant variant // not yield-bang!
+ yield Gen.getEmptyVariant variant
+ yield Gen.getEmptyVariant variant
+ }
+ |> TaskSeq.concat
+ |> verifyEmpty
+
+ [)>]
+ let ``TaskSeq-concat with top sequence empty`` variant =
+ Gen.getEmptyVariant variant
+ |> TaskSeq.box
+ |> TaskSeq.cast> // casting an int to an enumerable, LOL!
+ |> TaskSeq.concat
+ |> verifyEmpty
+
+module Immutable =
+ [)>]
+ let ``TaskSeq-concat with empty sequences`` variant =
+ taskSeq {
+ yield Gen.getSeqImmutable variant // not yield-bang!
+ yield Gen.getSeqImmutable variant
+ yield Gen.getSeqImmutable variant
+ }
+ |> TaskSeq.concat
+ |> validateSequence
diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Init.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Init.Tests.fs
new file mode 100644
index 00000000..6ec29857
--- /dev/null
+++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Init.Tests.fs
@@ -0,0 +1,142 @@
+module FSharpy.Tests.Init
+
+open System
+
+open Xunit
+open FsUnit.Xunit
+open FsToolkit.ErrorHandling
+
+open FSharpy
+
+//
+// TaskSeq.init
+// TaskSeq.initInfinite
+// TaskSeq.initAsync
+// TaskSeq.initInfiniteAsync
+//
+
+/// Asserts that a sequence contains the char values 'A'..'J'.
+
+module EmptySeq =
+ []
+ let ``TaskSeq-init can generate an empty sequence`` () = TaskSeq.init 0 (fun x -> x) |> verifyEmpty
+
+ []
+ let ``TaskSeq-initAsync can generate an empty sequence`` () =
+ TaskSeq.initAsync 0 (fun x -> Task.fromResult x)
+ |> verifyEmpty
+
+ []
+ let ``TaskSeq-init with a negative count gives an error`` () =
+ fun () ->
+ TaskSeq.init -1 (fun x -> Task.fromResult x)
+ |> TaskSeq.toArrayAsync
+ |> Task.ignore
+
+ |> should throwAsyncExact typeof
+
+ fun () ->
+ TaskSeq.init Int32.MinValue (fun x -> Task.fromResult x)
+ |> TaskSeq.toArrayAsync
+ |> Task.ignore
+
+ |> should throwAsyncExact typeof
+
+ []
+ let ``TaskSeq-initAsync with a negative count gives an error`` () =
+ fun () ->
+ TaskSeq.initAsync Int32.MinValue (fun x -> Task.fromResult x)
+ |> TaskSeq.toArrayAsync
+ |> Task.ignore
+
+ |> should throwAsyncExact typeof
+
+module Immutable =
+ []
+ let ``TaskSeq-init singleton`` () =
+ TaskSeq.init 1 id
+ |> TaskSeq.head
+ |> Task.map (should equal 0)
+
+ []
+ let ``TaskSeq-initAsync singleton`` () =
+ TaskSeq.initAsync 1 (id >> Task.fromResult)
+ |> TaskSeq.head
+ |> Task.map (should equal 0)
+
+ []
+ let ``TaskSeq-init some values`` () =
+ TaskSeq.init 42 (fun x -> x / 2)
+ |> TaskSeq.length
+ |> Task.map (should equal 42)
+
+ []
+ let ``TaskSeq-initAsync some values`` () =
+ TaskSeq.init 42 (fun x -> Task.fromResult (x / 2))
+ |> TaskSeq.length
+ |> Task.map (should equal 42)
+
+ []
+ let ``TaskSeq-initInfinite`` () =
+ TaskSeq.initInfinite (fun x -> x / 2)
+ |> TaskSeq.item 1_000_001
+ |> Task.map (should equal 500_000)
+
+ []
+ let ``TaskSeq-initInfiniteAsync`` () =
+ TaskSeq.initInfiniteAsync (fun x -> Task.fromResult (x / 2))
+ |> TaskSeq.item 1_000_001
+ |> Task.map (should equal 500_000)
+
+module SideEffects =
+ let inc (i: int byref) =
+ i <- i + 1
+ i
+
+ []
+ let ``TaskSeq-init singleton with side effects`` () = task {
+ let mutable x = 0
+
+ let ts = TaskSeq.init 1 (fun _ -> inc &x)
+
+ do! TaskSeq.head ts |> Task.map (should equal 1)
+ do! TaskSeq.head ts |> Task.map (should equal 2)
+ do! TaskSeq.head ts |> Task.map (should equal 3) // state mutates
+ }
+
+ []
+ let ``TaskSeq-init singleton with side effects -- Current`` () = task {
+ let mutable x = 0
+
+ let ts = TaskSeq.init 1 (fun _ -> inc &x)
+
+ let enumerator = ts.GetAsyncEnumerator()
+ let! _ = enumerator.MoveNextAsync()
+ do enumerator.Current |> should equal 1
+ do enumerator.Current |> should equal 1
+ do enumerator.Current |> should equal 1 // current state does not mutate
+ }
+
+ []
+ let ``TaskSeq-initAsync singleton with side effects`` () = task {
+ let mutable x = 0
+
+ let ts = TaskSeq.initAsync 1 (fun _ -> Task.fromResult (inc &x))
+
+ do! TaskSeq.head ts |> Task.map (should equal 1)
+ do! TaskSeq.head ts |> Task.map (should equal 2)
+ do! TaskSeq.head ts |> Task.map (should equal 3) // state mutates
+ }
+
+ []
+ let ``TaskSeq-initAsync singleton with side effects -- Current`` () = task {
+ let mutable x = 0
+
+ let ts = TaskSeq.initAsync 1 (fun _ -> Task.fromResult (inc &x))
+
+ let enumerator = ts.GetAsyncEnumerator()
+ let! _ = enumerator.MoveNextAsync()
+ do enumerator.Current |> should equal 1
+ do enumerator.Current |> should equal 1
+ do enumerator.Current |> should equal 1 // current state does not mutate
+ }
diff --git a/src/FSharpy.TaskSeq/TaskSeq.fs b/src/FSharpy.TaskSeq/TaskSeq.fs
index 3bb1a9c7..64cd33c9 100644
--- a/src/FSharpy.TaskSeq/TaskSeq.fs
+++ b/src/FSharpy.TaskSeq/TaskSeq.fs
@@ -156,8 +156,18 @@ module TaskSeq =
//
let length source = Internal.lengthBy None source
+ let lengthOrMax max source = Internal.lengthBeforeMax max source
let lengthBy predicate source = Internal.lengthBy (Some(Predicate predicate)) source
let lengthByAsync predicate source = Internal.lengthBy (Some(PredicateAsync predicate)) source
+ let init count initializer = Internal.init (Some count) (InitAction initializer)
+ let initInfinite initializer = Internal.init None (InitAction initializer)
+ let initAsync count initializer = Internal.init (Some count) (InitActionAsync initializer)
+ let initInfiniteAsync initializer = Internal.init None (InitActionAsync initializer)
+
+ let concat (sources: taskSeq<#taskSeq<'T>>) = taskSeq {
+ for ts in sources do
+ yield! (ts :> taskSeq<'T>)
+ }
//
// iter/map/collect functions
@@ -262,6 +272,7 @@ module TaskSeq =
| None -> return Internal.raiseNotFound ()
}
+
let findAsync predicate source = task {
match! Internal.tryFind (PredicateAsync predicate) source with
| Some item -> return item
diff --git a/src/FSharpy.TaskSeq/TaskSeq.fsi b/src/FSharpy.TaskSeq/TaskSeq.fsi
index 4841289d..493111fa 100644
--- a/src/FSharpy.TaskSeq/TaskSeq.fsi
+++ b/src/FSharpy.TaskSeq/TaskSeq.fsi
@@ -15,10 +15,17 @@ module TaskSeq =
///
/// Returns the length of the sequence. This operation requires the whole sequence to be evaluated and
- /// should not be used on potentially infinite sequences.
+ /// should not be used on potentially infinite sequences, see for an alternative.
///
val length: source: taskSeq<'T> -> Task
+ ///
+ /// Returns the length of the sequence, or , whichever comes first. This operation requires the task sequence
+ /// to be evaluated in full, or until items have been processed. Use this method instead of
+ /// if you want to prevent too many items to be evaluated, or if the sequence is potentially infinite.
+ ///
+ val lengthOrMax: max: int -> source: taskSeq<'T> -> Task
+
///
/// Returns the length of the sequence of all items for which the returns true.
/// This operation requires the whole sequence to be evaluated and should not be used on potentially infinite sequences.
@@ -32,6 +39,74 @@ module TaskSeq =
///
val lengthByAsync: predicate: ('T -> #Task) -> source: taskSeq<'T> -> Task
+ ///
+ /// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
+ /// with the current index, up to the given count. Each element is saved after its initialization for successive access to
+ /// , which will not re-evaluate the . However,
+ /// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
+ /// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
+ /// not be accessed concurrently.
+ ///
+ ///
+ /// The maximum number of items to generate for the sequence.
+ /// A function that generates an item in the sequence from a given index.
+ /// The resulting task sequence.
+ /// Thrown when count is negative.
+ val init: count: int -> initializer: (int -> 'T) -> taskSeq<'T>
+
+ ///
+ /// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
+ /// with the current index, up to the given count. Each element is saved after its initialization for successive access to
+ /// , which will not re-evaluate the . However,
+ /// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
+ /// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
+ /// not be accessed concurrently.
+ ///
+ ///
+ /// The maximum number of items to generate for the sequence.
+ /// A function that generates an item in the sequence from a given index.
+ /// The resulting task sequence.
+ /// Thrown when count is negative.
+ val initAsync: count: int -> initializer: (int -> #Task<'T>) -> taskSeq<'T>
+
+ ///
+ /// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
+ /// with the current index, ad infinitum, or until is reached.
+ /// Each element is saved after its initialization for successive access to
+ /// , which will not re-evaluate the . However,
+ /// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
+ /// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
+ /// not be accessed concurrently.
+ ///
+ ///
+ /// A function that generates an item in the sequence from a given index.
+ /// The resulting task sequence.
+ val initInfinite: initializer: (int -> 'T) -> taskSeq<'T>
+
+ ///
+ /// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
+ /// with the current index, ad infinitum, or until is reached.
+ /// Each element is saved after its initialization for successive access to
+ /// , which will not re-evaluate the . However,
+ /// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
+ /// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
+ /// not be accessed concurrently.
+ ///
+ ///
+ /// A function that generates an item in the sequence from a given index.
+ /// The resulting task sequence.
+ val initInfiniteAsync: initializer: (int -> #Task<'T>) -> taskSeq<'T>
+
+ ///
+ /// Combines the given task sequence of task sequences and concatenates them end-to-end, to form a
+ /// new flattened, single task sequence. Each task sequence is awaited item by item, before the next is iterated.
+ ///
+ ///
+ /// The input enumeration-of-enumerations.
+ /// The resulting task sequence.
+ /// Thrown when the input sequence is null.
+ val concat: sources: taskSeq<#taskSeq<'T>> -> taskSeq<'T>
+
/// Returns taskSeq as an array. This function is blocking until the sequence is exhausted and will properly dispose of the resources.
val toList: source: taskSeq<'T> -> 'T list
diff --git a/src/FSharpy.TaskSeq/TaskSeqInternal.fs b/src/FSharpy.TaskSeq/TaskSeqInternal.fs
index 5d804b12..7d3870ef 100644
--- a/src/FSharpy.TaskSeq/TaskSeqInternal.fs
+++ b/src/FSharpy.TaskSeq/TaskSeqInternal.fs
@@ -33,11 +33,20 @@ type PredicateAction<'T, 'U, 'TaskBool when 'TaskBool :> Task> =
| Predicate of try_filter: ('T -> bool)
| PredicateAsync of async_try_filter: ('T -> 'TaskBool)
+[]
+type InitAction<'T, 'TaskT when 'TaskT :> Task<'T>> =
+ | InitAction of init_item: (int -> 'T)
+ | InitActionAsync of async_init_item: (int -> 'TaskT)
+
module internal TaskSeqInternal =
let inline raiseEmptySeq () =
ArgumentException("The asynchronous input sequence was empty.", "source")
|> raise
+ let inline raiseCannotBeNegative (name: string) =
+ ArgumentException("The value cannot be negative", name)
+ |> raise
+
let inline raiseInsufficient () =
ArgumentException("The asynchronous input sequence was has an insufficient number of elements.", "source")
|> raise
@@ -64,7 +73,7 @@ module internal TaskSeqInternal =
| None ->
while go do
let! step = e.MoveNextAsync()
- i <- i + 1
+ i <- i + 1 // update before moving: we are counting, not indexing
go <- step
| Some (Predicate predicate) ->
@@ -87,6 +96,22 @@ module internal TaskSeqInternal =
return i
}
+ /// Returns length unconditionally, or based on a predicate
+ let lengthBeforeMax max (source: taskSeq<_>) = task {
+ use e = source.GetAsyncEnumerator(CancellationToken())
+ let mutable go = true
+ let mutable i = 0
+ let! step = e.MoveNextAsync()
+ go <- step
+
+ while go && i < max do
+ i <- i + 1 // update before moving: we are counting, not indexing
+ let! step = e.MoveNextAsync()
+ go <- step
+
+ return i
+ }
+
let tryExactlyOne (source: taskSeq<_>) = task {
use e = source.GetAsyncEnumerator(CancellationToken())
@@ -104,6 +129,45 @@ module internal TaskSeqInternal =
return None
}
+
+ let init count initializer = taskSeq {
+ let mutable i = 0
+ let mutable value: Lazy<'T> = Unchecked.defaultof<_>
+
+ let count =
+ match count with
+ | Some c -> if c >= 0 then c else raiseCannotBeNegative (nameof count)
+ | None -> Int32.MaxValue
+
+ match initializer with
+ | InitAction init ->
+ while i < count do
+ // using Lazy gives us locking and safe multiple access to the cached value, if
+ // multiple threads access the same item through the same enumerator (which is
+ // bad practice, but hey, who're we to judge).
+ if isNull value then
+ value <- Lazy<_>.Create (fun () -> init i)
+
+ yield value.Force()
+ value <- Unchecked.defaultof<_>
+ i <- i + 1
+
+ | InitActionAsync asyncInit ->
+ while i < count do
+ // using Lazy gives us locking and safe multiple access to the cached value, if
+ // multiple threads access the same item through the same enumerator (which is
+ // bad practice, but hey, who're we to judge).
+ if isNull value then
+ // TODO: is there a 'Lazy' we can use with Task?
+ let! value' = asyncInit i
+ value <- Lazy<_>.CreateFromValue value'
+
+ yield value.Force()
+ value <- Unchecked.defaultof<_>
+ i <- i + 1
+
+ }
+
let iter action (source: taskSeq<_>) = task {
use e = source.GetAsyncEnumerator(CancellationToken())
let mutable go = true