diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index f11d424..f749ea7 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -1537,8 +1537,8 @@ module AsyncSeq = result <- interleave result x result - let bufferByCount (bufferSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> = - if (bufferSize < 1) then invalidArg "bufferSize" "must be positive" + let chunkBySize (chunkSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> = + if chunkSize < 1 then invalidArg (nameof chunkSize) "must be positive" asyncSeq { let buffer = new ResizeArray<_>() use ie = source.GetEnumerator() @@ -1546,7 +1546,7 @@ module AsyncSeq = let b = ref move while b.Value.IsSome do buffer.Add b.Value.Value - if buffer.Count = bufferSize then + if buffer.Count = chunkSize then yield buffer.ToArray() buffer.Clear() let! moven = ie.MoveNext() @@ -1554,6 +1554,10 @@ module AsyncSeq = if (buffer.Count > 0) then yield buffer.ToArray() } + [] + let bufferByCount (bufferSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> = + chunkBySize bufferSize source + #if !FABLE_COMPILER let toSortedSeq fn source = toArrayAsync source |> Async.map fn |> Async.RunSynchronously diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index 6db8140..15c1bf5 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -477,6 +477,11 @@ module AsyncSeq = /// Buffer items from the async sequence into buffers of a specified size. /// The last buffer returned may be less than the specified buffer size. + val chunkBySize : chunkSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []> + + /// Buffer items from the async sequence into buffers of a specified size. + /// The last buffer returned may be less than the specified buffer size. + [] val bufferByCount : bufferSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []> #if !FABLE_COMPILER @@ -524,7 +529,7 @@ module AsyncSeq = /// Builds a new asynchronous sequence whose elements are generated by /// applying the specified function to all elements of the input sequence. /// - /// The function is applied to elements in parallel, and results are emitted + /// The function is applied to elements in parallel, and results are emitted /// in the order they complete (unordered), without preserving the original order. /// This can provide better performance than mapAsyncParallel when order doesn't matter. /// Parallelism is bound by the ThreadPool. diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index 35b1d2a..6a44cf4 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -436,7 +436,7 @@ let ``AsyncSeq.interleaveMany 3``() = [] -let ``AsyncSeq.bufferByCount``() = +let ``AsyncSeq.chunkBySize``() = let s = asyncSeq { yield 1 yield 2 @@ -444,23 +444,23 @@ let ``AsyncSeq.bufferByCount``() = yield 4 yield 5 } - let s' = s |> AsyncSeq.bufferByCount 2 |> AsyncSeq.toListSynchronously + let s' = s |> AsyncSeq.chunkBySize 2 |> AsyncSeq.toListSynchronously Assert.True(([[|1;2|];[|3;4|];[|5|]] = s')) [] -let ``AsyncSeq.bufferByCount various sizes``() = +let ``AsyncSeq.chunkBySize various sizes``() = for sz in 0 .. 10 do let s = asyncSeq { for i in 1 .. sz do yield i } - let s' = s |> AsyncSeq.bufferByCount 1 |> AsyncSeq.toListSynchronously + let s' = s |> AsyncSeq.chunkBySize 1 |> AsyncSeq.toListSynchronously Assert.True(([for i in 1 .. sz -> [|i|]] = s')) [] -let ``AsyncSeq.bufferByCount empty``() = +let ``AsyncSeq.chunkBySize empty``() = let s = AsyncSeq.empty - let s' = s |> AsyncSeq.bufferByCount 2 |> AsyncSeq.toListSynchronously + let s' = s |> AsyncSeq.chunkBySize 2 |> AsyncSeq.toListSynchronously Assert.True(([] = s')) @@ -2429,20 +2429,20 @@ let ``AsyncSeq.toChannel and AsyncSeq.fromChannel capture exns``() = // Additional Coverage Tests targeting uncovered edge cases and branches [] -let ``AsyncSeq.bufferByCount with size 1 should work`` () = +let ``AsyncSeq.chunkBySize with size 1 should work`` () = let source = asyncSeq { yield 1; yield 2; yield 3 } - let result = AsyncSeq.bufferByCount 1 source |> AsyncSeq.toListSynchronously + let result = AsyncSeq.chunkBySize 1 source |> AsyncSeq.toListSynchronously Assert.AreEqual([[|1|]; [|2|]; [|3|]], result) [] -let ``AsyncSeq.bufferByCount with empty sequence should return empty`` () = - let result = AsyncSeq.bufferByCount 2 AsyncSeq.empty |> AsyncSeq.toListSynchronously +let ``AsyncSeq.chunkBySize with empty sequence should return empty`` () = + let result = AsyncSeq.chunkBySize 2 AsyncSeq.empty |> AsyncSeq.toListSynchronously Assert.AreEqual([], result) [] -let ``AsyncSeq.bufferByCount with size larger than sequence should return partial`` () = +let ``AsyncSeq.chunkBySize with size larger than sequence should return partial`` () = let source = asyncSeq { yield 1; yield 2 } - let result = AsyncSeq.bufferByCount 5 source |> AsyncSeq.toListSynchronously + let result = AsyncSeq.chunkBySize 5 source |> AsyncSeq.toListSynchronously Assert.AreEqual([[|1; 2|]], result) []