Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1537,23 +1537,27 @@ module AsyncSeq =
result <- interleave result x
result

let bufferByCount (bufferSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
if (bufferSize < 1) then invalidArg "bufferSize" "must be positive"
let chunkBySize (chunkSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
if chunkSize < 1 then invalidArg (nameof chunkSize) "must be positive"
asyncSeq {
let buffer = new ResizeArray<_>()
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
let b = ref move
while b.Value.IsSome do
buffer.Add b.Value.Value
if buffer.Count = bufferSize then
if buffer.Count = chunkSize then
yield buffer.ToArray()
buffer.Clear()
let! moven = ie.MoveNext()
b := moven
if (buffer.Count > 0) then
yield buffer.ToArray() }

[<Obsolete("Use AsyncSeq.chunkBySize instead")>]
let bufferByCount (bufferSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
chunkBySize bufferSize source

#if !FABLE_COMPILER
let toSortedSeq fn source =
toArrayAsync source |> Async.map fn |> Async.RunSynchronously
Expand Down
7 changes: 6 additions & 1 deletion src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,11 @@ module AsyncSeq =

/// Buffer items from the async sequence into buffers of a specified size.
/// The last buffer returned may be less than the specified buffer size.
val chunkBySize : chunkSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>

/// Buffer items from the async sequence into buffers of a specified size.
/// The last buffer returned may be less than the specified buffer size.
[<Obsolete("Use AsyncSeq.chunkBySize instead")>]
val bufferByCount : bufferSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>

#if !FABLE_COMPILER
Expand Down Expand Up @@ -524,7 +529,7 @@ module AsyncSeq =
/// Builds a new asynchronous sequence whose elements are generated by
/// applying the specified function to all elements of the input sequence.
///
/// The function is applied to elements in parallel, and results are emitted
/// The function is applied to elements in parallel, and results are emitted
/// in the order they complete (unordered), without preserving the original order.
/// This can provide better performance than mapAsyncParallel when order doesn't matter.
/// Parallelism is bound by the ThreadPool.
Expand Down
24 changes: 12 additions & 12 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -436,31 +436,31 @@ let ``AsyncSeq.interleaveMany 3``() =


[<Test>]
let ``AsyncSeq.bufferByCount``() =
let ``AsyncSeq.chunkBySize``() =
let s = asyncSeq {
yield 1
yield 2
yield 3
yield 4
yield 5
}
let s' = s |> AsyncSeq.bufferByCount 2 |> AsyncSeq.toListSynchronously
let s' = s |> AsyncSeq.chunkBySize 2 |> AsyncSeq.toListSynchronously
Assert.True(([[|1;2|];[|3;4|];[|5|]] = s'))

[<Test>]
let ``AsyncSeq.bufferByCount various sizes``() =
let ``AsyncSeq.chunkBySize various sizes``() =
for sz in 0 .. 10 do
let s = asyncSeq {
for i in 1 .. sz do
yield i
}
let s' = s |> AsyncSeq.bufferByCount 1 |> AsyncSeq.toListSynchronously
let s' = s |> AsyncSeq.chunkBySize 1 |> AsyncSeq.toListSynchronously
Assert.True(([for i in 1 .. sz -> [|i|]] = s'))

[<Test>]
let ``AsyncSeq.bufferByCount empty``() =
let ``AsyncSeq.chunkBySize empty``() =
let s = AsyncSeq.empty<int>
let s' = s |> AsyncSeq.bufferByCount 2 |> AsyncSeq.toListSynchronously
let s' = s |> AsyncSeq.chunkBySize 2 |> AsyncSeq.toListSynchronously
Assert.True(([] = s'))


Expand Down Expand Up @@ -2429,20 +2429,20 @@ let ``AsyncSeq.toChannel and AsyncSeq.fromChannel capture exns``() =
// Additional Coverage Tests targeting uncovered edge cases and branches

[<Test>]
let ``AsyncSeq.bufferByCount with size 1 should work`` () =
let ``AsyncSeq.chunkBySize with size 1 should work`` () =
let source = asyncSeq { yield 1; yield 2; yield 3 }
let result = AsyncSeq.bufferByCount 1 source |> AsyncSeq.toListSynchronously
let result = AsyncSeq.chunkBySize 1 source |> AsyncSeq.toListSynchronously
Assert.AreEqual([[|1|]; [|2|]; [|3|]], result)

[<Test>]
let ``AsyncSeq.bufferByCount with empty sequence should return empty`` () =
let result = AsyncSeq.bufferByCount 2 AsyncSeq.empty |> AsyncSeq.toListSynchronously
let ``AsyncSeq.chunkBySize with empty sequence should return empty`` () =
let result = AsyncSeq.chunkBySize 2 AsyncSeq.empty |> AsyncSeq.toListSynchronously
Assert.AreEqual([], result)

[<Test>]
let ``AsyncSeq.bufferByCount with size larger than sequence should return partial`` () =
let ``AsyncSeq.chunkBySize with size larger than sequence should return partial`` () =
let source = asyncSeq { yield 1; yield 2 }
let result = AsyncSeq.bufferByCount 5 source |> AsyncSeq.toListSynchronously
let result = AsyncSeq.chunkBySize 5 source |> AsyncSeq.toListSynchronously
Assert.AreEqual([[|1; 2|]], result)

[<Test>]
Expand Down