From 4391d678ba3d1bca2e4550c9b6b8b5f6d7d17006 Mon Sep 17 00:00:00 2001 From: Geir Fiksdal Date: Tue, 17 Jun 2025 10:21:30 +0200 Subject: [PATCH 1/3] TaskSeq.chunkBySize --- README.md | 5 +- .../FSharp.Control.TaskSeq.Test.fsproj | 1 + .../TaskSeq.ChunkBySize.Tests.fs | 193 ++++++++++++++++++ src/FSharp.Control.TaskSeq/TaskSeq.fs | 2 + src/FSharp.Control.TaskSeq/TaskSeq.fsi | 9 + src/FSharp.Control.TaskSeq/TaskSeqInternal.fs | 24 +++ 6 files changed, 232 insertions(+), 2 deletions(-) create mode 100644 src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs diff --git a/README.md b/README.md index cac8cf2..a033f8e 100644 --- a/README.md +++ b/README.md @@ -211,7 +211,8 @@ The `TaskSeq` project already has a wide array of functions and functionalities, - [ ] `average` / `averageBy`, `sum` and related - [x] `forall` / `forallAsync` (see [#240]) - [x] `skip` / `drop` / `truncate` / `take` (see [#209]) - - [ ] `chunkBySize` / `windowed` + - [x] `chunkBySize` (see [TODO]) + - [ ] `windowed` - [ ] `compareWith` - [ ] `distinct` - [ ] `exists2` / `map2` / `fold2` / `iter2` and related '2'-functions @@ -263,7 +264,7 @@ This is what has been implemented so far, is planned or skipped: | ✅ [#67][] | | | `box` | | | ✅ [#67][] | | | `unbox` | | | ✅ [#23][] | `choose` | `choose` | `chooseAsync` | | -| | `chunkBySize` | `chunkBySize` | | | +| ✅ [TODO][]| `chunkBySize` | `chunkBySize` | | | | ✅ [#11][] | `collect` | `collect` | `collectAsync` | | | ✅ [#11][] | | `collectSeq` | `collectSeqAsync` | | | | `compareWith` | `compareWith` | `compareWithAsync` | | diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index dbf1cfc..421459d 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -57,6 +57,7 @@ + diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs new file mode 100644 index 0000000..c1ba00d --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs @@ -0,0 +1,193 @@ +module TaskSeq.Tests.ChunkBySize + +open System + +open FsUnitTyped +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// TaskSeq.chunkBySize +// + +exception SideEffectPastEnd of string + +module EmptySeq = + [)>] + let ``TaskSeq-chunkBySize(0) on empty input should throw InvalidOperation`` variant = + fun () -> + Gen.getEmptyVariant variant + |> TaskSeq.chunkBySize 0 + |> consumeTaskSeq + + |> should throwAsyncExact typeof + + [)>] + let ``TaskSeq-chunkBySize(1) has no effect on empty input`` variant = + // no `task` block needed + Gen.getEmptyVariant variant |> TaskSeq.chunkBySize 1 |> verifyEmpty + + [)>] + let ``TaskSeq-chunkBySize(99) has no effect on empty input`` variant = + // no `task` block needed + Gen.getEmptyVariant variant |> TaskSeq.chunkBySize 99 |> verifyEmpty + + [] + let ``TaskSeq-chunkBySize(-1) should throw ArgumentException on any input`` () = + fun () -> TaskSeq.empty |> TaskSeq.chunkBySize -1 |> consumeTaskSeq + |> should throwAsyncExact typeof + + fun () -> TaskSeq.init 10 id |> TaskSeq.chunkBySize -1 |> consumeTaskSeq + |> should throwAsyncExact typeof + + [] + let ``TaskSeq-chunkBySize(-1) should throw ArgumentException before awaiting`` () = + fun () -> + taskSeq { + do! longDelay () + + if false then + yield 0 // type inference + } + |> TaskSeq.chunkBySize -1 + |> ignore // throws even without running the async. Bad coding, don't ignore a task! + + |> should throw typeof + +module Immutable = + [)>] + let ``TaskSeq-chunkBySize returns all items from source in order`` variant = task { + do! + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 3 + |> TaskSeq.collect TaskSeq.ofArray + |> verify1To10 + } + + [)>] + let ``TaskSeq-chunkBySize returns chunks with items in order`` variant = task { + do! + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 2 + |> TaskSeq.toArrayAsync + |> Task.map (shouldEqual [| [| 1; 2 |]; [| 3; 4 |]; [| 5; 6 |]; [| 7; 8 |]; [| 9; 10 |] |]) + } + + [)>] + let ``TaskSeq-chunkBySize returns exactly 'chunkSize' items per chunk`` variant = task { + do! + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 1 + |> TaskSeq.iter (shouldHaveLength 1) + + do! + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 2 + |> TaskSeq.iter (shouldHaveLength 2) + + do! + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 5 + |> TaskSeq.iter (shouldHaveLength 5) + } + + [)>] + let ``TaskSeq-chunkBySize returns remaining items in last chunk`` variant = task { + let verifyChunk chunkSize lastChunkSize = + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize chunkSize + |> TaskSeq.toArrayAsync + |> Task.map (Array.last >> shouldHaveLength lastChunkSize) + + do! verifyChunk 1 1 + do! verifyChunk 3 1 + do! verifyChunk 4 2 + do! verifyChunk 6 4 + do! verifyChunk 7 3 + do! verifyChunk 8 2 + do! verifyChunk 9 1 + } + + [)>] + let ``TaskSeq-chunkBySize returns all elements when 'chunkSize' > number of items`` variant = + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 11 + |> TaskSeq.toArrayAsync + |> Task.map (Array.exactlyOne >> shouldHaveLength 10) + +module SideEffects = + [)>] + let ``TaskSeq-chunkBySize gets all items`` variant = + Gen.getSeqWithSideEffect variant + |> TaskSeq.chunkBySize 5 + |> TaskSeq.toArrayAsync + |> Task.map (shouldEqual [| [| 1 .. 5 |]; [| 6 .. 10 |] |]) + + [] + let ``TaskSeq-chunkBySize prove we execute empty-seq side-effects`` () = task { + let mutable i = 0 + + let ts = taskSeq { + i <- i + 1 + i <- i + 1 + i <- i + 1 // we should get here + } + + do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq + do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq + do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq + i |> should equal 9 + } + + [] + let ``TaskSeq-chunkBySize prove we execute after-effects`` () = task { + let mutable i = 0 + + let ts = taskSeq { + i <- i + 1 + i <- i + 1 + yield 42 + i <- i + 1 // we should get here + } + + do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq + do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq + do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq + i |> should equal 9 + } + + [)>] + let ``TaskSeq-chunkBySize should go over all items`` variant = task { + let ts = Gen.getSeqWithSideEffect variant + do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq + do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq + do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq + // incl. the iteration of 'last', we reach 40 + do! ts |> TaskSeq.last |> Task.map (should equal 40) + } + + [)>] + let ``TaskSeq-chunkBySize multiple iterations over same sequence`` variant = task { + let ts = Gen.getSeqWithSideEffect variant + let mutable sum = 0 + + do! TaskSeq.chunkBySize 1 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item) + do! TaskSeq.chunkBySize 2 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item) + do! TaskSeq.chunkBySize 3 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item) + do! TaskSeq.chunkBySize 4 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item) + + sum |> should equal 820 // side-effected tasks, so 'item' DOES CHANGE, each next iteration starts 10 higher + } + [] + let ``TaskSeq-chunkBySize prove that an exception from the taskSeq is thrown`` () = + let items = taskSeq { + yield 42 + yield! [ 1; 2 ] + do SideEffectPastEnd "at the end" |> raise + yield 43 + } + + fun () -> items |> TaskSeq.chunkBySize 2 |> consumeTaskSeq + |> should throwAsyncExact typeof diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 710dadd..a0f59e2 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -252,6 +252,8 @@ type TaskSeq private () = yield! source2 } + static member chunkBySize (chunkSize: int) (source: TaskSeq<'T>) = Internal.chunkBySize chunkSize source + // // iter/map/collect functions // diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index ff98586..3be9662 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -823,6 +823,15 @@ type TaskSeq = /// Thrown when the input task sequence is null. static member chooseAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> TaskSeq<'U> + /// Divides the input sequence into chunks of size at most chunkSize. + /// + /// The maximum size of each chunk. + /// The input task sequence. + /// The task sequence divided into chunks. + /// Thrown when the input task sequence is null. + /// Thrown when chunkSize is not positive. + static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T[]> + /// /// Returns a new task sequence containing only the elements of the collection /// for which the given function returns . diff --git a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs index 66a92f2..4f5b97b 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs @@ -1097,3 +1097,27 @@ module internal TaskSeqInternal = go <- step } + + let chunkBySize chunkSize (source: TaskSeq<'T>): TaskSeq<'T[]> = + if chunkSize < 1 then invalidArg (nameof chunkSize) $"The value must be positive, but was %i{chunkSize}." + checkNonNull (nameof source) source + + taskSeq { + use e = source.GetAsyncEnumerator CancellationToken.None + let mutable go = true + let! step = e.MoveNextAsync() + go <- step + + if step then + let buffer = ResizeArray<_>() + while go do + buffer.Add e.Current + if buffer.Count = chunkSize then + yield buffer.ToArray() + buffer.Clear() + + let! step = e.MoveNextAsync() + go <- step + if buffer.Count > 0 then + yield buffer.ToArray() + } From d846f467a3deb36060eca881a9a9ecb56b3604f2 Mon Sep 17 00:00:00 2001 From: Geir Fiksdal Date: Tue, 17 Jun 2025 10:30:28 +0200 Subject: [PATCH 2/3] Reference to pull request in README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a033f8e..7b31031 100644 --- a/README.md +++ b/README.md @@ -211,7 +211,7 @@ The `TaskSeq` project already has a wide array of functions and functionalities, - [ ] `average` / `averageBy`, `sum` and related - [x] `forall` / `forallAsync` (see [#240]) - [x] `skip` / `drop` / `truncate` / `take` (see [#209]) - - [x] `chunkBySize` (see [TODO]) + - [x] `chunkBySize` (see [#265]) - [ ] `windowed` - [ ] `compareWith` - [ ] `distinct` @@ -264,7 +264,7 @@ This is what has been implemented so far, is planned or skipped: | ✅ [#67][] | | | `box` | | | ✅ [#67][] | | | `unbox` | | | ✅ [#23][] | `choose` | `choose` | `chooseAsync` | | -| ✅ [TODO][]| `chunkBySize` | `chunkBySize` | | | +| ✅ [#265][]| `chunkBySize` | `chunkBySize` | | | | ✅ [#11][] | `collect` | `collect` | `collectAsync` | | | ✅ [#11][] | | `collectSeq` | `collectSeqAsync` | | | | `compareWith` | `compareWith` | `compareWithAsync` | | From e577e05057c06db42f2bd07c3a653e5bbeb623df Mon Sep 17 00:00:00 2001 From: Geir Fiksdal Date: Tue, 17 Jun 2025 12:29:23 +0200 Subject: [PATCH 3/3] Fantomas format --- .../TaskSeq.ChunkBySize.Tests.fs | 44 +++++++++++++++---- src/FSharp.Control.TaskSeq/TaskSeqInternal.fs | 9 +++- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs index c1ba00d..fcaf2b7 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs @@ -27,19 +27,29 @@ module EmptySeq = [)>] let ``TaskSeq-chunkBySize(1) has no effect on empty input`` variant = // no `task` block needed - Gen.getEmptyVariant variant |> TaskSeq.chunkBySize 1 |> verifyEmpty + Gen.getEmptyVariant variant + |> TaskSeq.chunkBySize 1 + |> verifyEmpty [)>] let ``TaskSeq-chunkBySize(99) has no effect on empty input`` variant = // no `task` block needed - Gen.getEmptyVariant variant |> TaskSeq.chunkBySize 99 |> verifyEmpty + Gen.getEmptyVariant variant + |> TaskSeq.chunkBySize 99 + |> verifyEmpty [] let ``TaskSeq-chunkBySize(-1) should throw ArgumentException on any input`` () = - fun () -> TaskSeq.empty |> TaskSeq.chunkBySize -1 |> consumeTaskSeq + fun () -> + TaskSeq.empty + |> TaskSeq.chunkBySize -1 + |> consumeTaskSeq |> should throwAsyncExact typeof - fun () -> TaskSeq.init 10 id |> TaskSeq.chunkBySize -1 |> consumeTaskSeq + fun () -> + TaskSeq.init 10 id + |> TaskSeq.chunkBySize -1 + |> consumeTaskSeq |> should throwAsyncExact typeof [] @@ -123,7 +133,7 @@ module SideEffects = Gen.getSeqWithSideEffect variant |> TaskSeq.chunkBySize 5 |> TaskSeq.toArrayAsync - |> Task.map (shouldEqual [| [| 1 .. 5 |]; [| 6 .. 10 |] |]) + |> Task.map (shouldEqual [| [| 1..5 |]; [| 6..10 |] |]) [] let ``TaskSeq-chunkBySize prove we execute empty-seq side-effects`` () = task { @@ -173,13 +183,29 @@ module SideEffects = let ts = Gen.getSeqWithSideEffect variant let mutable sum = 0 - do! TaskSeq.chunkBySize 1 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item) - do! TaskSeq.chunkBySize 2 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item) - do! TaskSeq.chunkBySize 3 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item) - do! TaskSeq.chunkBySize 4 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item) + do! + TaskSeq.chunkBySize 1 ts + |> TaskSeq.collect TaskSeq.ofArray + |> TaskSeq.iter (fun item -> sum <- sum + item) + + do! + TaskSeq.chunkBySize 2 ts + |> TaskSeq.collect TaskSeq.ofArray + |> TaskSeq.iter (fun item -> sum <- sum + item) + + do! + TaskSeq.chunkBySize 3 ts + |> TaskSeq.collect TaskSeq.ofArray + |> TaskSeq.iter (fun item -> sum <- sum + item) + + do! + TaskSeq.chunkBySize 4 ts + |> TaskSeq.collect TaskSeq.ofArray + |> TaskSeq.iter (fun item -> sum <- sum + item) sum |> should equal 820 // side-effected tasks, so 'item' DOES CHANGE, each next iteration starts 10 higher } + [] let ``TaskSeq-chunkBySize prove that an exception from the taskSeq is thrown`` () = let items = taskSeq { diff --git a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs index 4f5b97b..9a6e441 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs @@ -1098,8 +1098,10 @@ module internal TaskSeqInternal = } - let chunkBySize chunkSize (source: TaskSeq<'T>): TaskSeq<'T[]> = - if chunkSize < 1 then invalidArg (nameof chunkSize) $"The value must be positive, but was %i{chunkSize}." + let chunkBySize chunkSize (source: TaskSeq<'T>) : TaskSeq<'T[]> = + if chunkSize < 1 then + invalidArg (nameof chunkSize) $"The value must be positive, but was %i{chunkSize}." + checkNonNull (nameof source) source taskSeq { @@ -1110,14 +1112,17 @@ module internal TaskSeqInternal = if step then let buffer = ResizeArray<_>() + while go do buffer.Add e.Current + if buffer.Count = chunkSize then yield buffer.ToArray() buffer.Clear() let! step = e.MoveNextAsync() go <- step + if buffer.Count > 0 then yield buffer.ToArray() }