diff --git a/README.md b/README.md
index cac8cf2..7b31031 100644
--- a/README.md
+++ b/README.md
@@ -211,7 +211,8 @@ The `TaskSeq` project already has a wide array of functions and functionalities,
- [ ] `average` / `averageBy`, `sum` and related
- [x] `forall` / `forallAsync` (see [#240])
- [x] `skip` / `drop` / `truncate` / `take` (see [#209])
- - [ ] `chunkBySize` / `windowed`
+ - [x] `chunkBySize` (see [#265])
+ - [ ] `windowed`
- [ ] `compareWith`
- [ ] `distinct`
- [ ] `exists2` / `map2` / `fold2` / `iter2` and related '2'-functions
@@ -263,7 +264,7 @@ This is what has been implemented so far, is planned or skipped:
| ✅ [#67][] | | | `box` | |
| ✅ [#67][] | | | `unbox` | |
| ✅ [#23][] | `choose` | `choose` | `chooseAsync` | |
-| | `chunkBySize` | `chunkBySize` | | |
+| ✅ [#265][]| `chunkBySize` | `chunkBySize` | | |
| ✅ [#11][] | `collect` | `collect` | `collectAsync` | |
| ✅ [#11][] | | `collectSeq` | `collectSeqAsync` | |
| | `compareWith` | `compareWith` | `compareWithAsync` | |
diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj
index dbf1cfc..421459d 100644
--- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj
+++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj
@@ -57,6 +57,7 @@
+
diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs
new file mode 100644
index 0000000..fcaf2b7
--- /dev/null
+++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs
@@ -0,0 +1,219 @@
+module TaskSeq.Tests.ChunkBySize
+
+open System
+
+open FsUnitTyped
+open Xunit
+open FsUnit.Xunit
+
+open FSharp.Control
+
+//
+// TaskSeq.chunkBySize
+//
+
+exception SideEffectPastEnd of string
+
+module EmptySeq =
+ [)>]
+ let ``TaskSeq-chunkBySize(0) on empty input should throw InvalidOperation`` variant =
+ fun () ->
+ Gen.getEmptyVariant variant
+ |> TaskSeq.chunkBySize 0
+ |> consumeTaskSeq
+
+ |> should throwAsyncExact typeof
+
+ [)>]
+ let ``TaskSeq-chunkBySize(1) has no effect on empty input`` variant =
+ // no `task` block needed
+ Gen.getEmptyVariant variant
+ |> TaskSeq.chunkBySize 1
+ |> verifyEmpty
+
+ [)>]
+ let ``TaskSeq-chunkBySize(99) has no effect on empty input`` variant =
+ // no `task` block needed
+ Gen.getEmptyVariant variant
+ |> TaskSeq.chunkBySize 99
+ |> verifyEmpty
+
+ []
+ let ``TaskSeq-chunkBySize(-1) should throw ArgumentException on any input`` () =
+ fun () ->
+ TaskSeq.empty
+ |> TaskSeq.chunkBySize -1
+ |> consumeTaskSeq
+ |> should throwAsyncExact typeof
+
+ fun () ->
+ TaskSeq.init 10 id
+ |> TaskSeq.chunkBySize -1
+ |> consumeTaskSeq
+ |> should throwAsyncExact typeof
+
+ []
+ let ``TaskSeq-chunkBySize(-1) should throw ArgumentException before awaiting`` () =
+ fun () ->
+ taskSeq {
+ do! longDelay ()
+
+ if false then
+ yield 0 // type inference
+ }
+ |> TaskSeq.chunkBySize -1
+ |> ignore // throws even without running the async. Bad coding, don't ignore a task!
+
+ |> should throw typeof
+
+module Immutable =
+ [)>]
+ let ``TaskSeq-chunkBySize returns all items from source in order`` variant = task {
+ do!
+ Gen.getSeqImmutable variant
+ |> TaskSeq.chunkBySize 3
+ |> TaskSeq.collect TaskSeq.ofArray
+ |> verify1To10
+ }
+
+ [)>]
+ let ``TaskSeq-chunkBySize returns chunks with items in order`` variant = task {
+ do!
+ Gen.getSeqImmutable variant
+ |> TaskSeq.chunkBySize 2
+ |> TaskSeq.toArrayAsync
+ |> Task.map (shouldEqual [| [| 1; 2 |]; [| 3; 4 |]; [| 5; 6 |]; [| 7; 8 |]; [| 9; 10 |] |])
+ }
+
+ [)>]
+ let ``TaskSeq-chunkBySize returns exactly 'chunkSize' items per chunk`` variant = task {
+ do!
+ Gen.getSeqImmutable variant
+ |> TaskSeq.chunkBySize 1
+ |> TaskSeq.iter (shouldHaveLength 1)
+
+ do!
+ Gen.getSeqImmutable variant
+ |> TaskSeq.chunkBySize 2
+ |> TaskSeq.iter (shouldHaveLength 2)
+
+ do!
+ Gen.getSeqImmutable variant
+ |> TaskSeq.chunkBySize 5
+ |> TaskSeq.iter (shouldHaveLength 5)
+ }
+
+ [)>]
+ let ``TaskSeq-chunkBySize returns remaining items in last chunk`` variant = task {
+ let verifyChunk chunkSize lastChunkSize =
+ Gen.getSeqImmutable variant
+ |> TaskSeq.chunkBySize chunkSize
+ |> TaskSeq.toArrayAsync
+ |> Task.map (Array.last >> shouldHaveLength lastChunkSize)
+
+ do! verifyChunk 1 1
+ do! verifyChunk 3 1
+ do! verifyChunk 4 2
+ do! verifyChunk 6 4
+ do! verifyChunk 7 3
+ do! verifyChunk 8 2
+ do! verifyChunk 9 1
+ }
+
+ [)>]
+ let ``TaskSeq-chunkBySize returns all elements when 'chunkSize' > number of items`` variant =
+ Gen.getSeqImmutable variant
+ |> TaskSeq.chunkBySize 11
+ |> TaskSeq.toArrayAsync
+ |> Task.map (Array.exactlyOne >> shouldHaveLength 10)
+
+module SideEffects =
+ [)>]
+ let ``TaskSeq-chunkBySize gets all items`` variant =
+ Gen.getSeqWithSideEffect variant
+ |> TaskSeq.chunkBySize 5
+ |> TaskSeq.toArrayAsync
+ |> Task.map (shouldEqual [| [| 1..5 |]; [| 6..10 |] |])
+
+ []
+ let ``TaskSeq-chunkBySize prove we execute empty-seq side-effects`` () = task {
+ let mutable i = 0
+
+ let ts = taskSeq {
+ i <- i + 1
+ i <- i + 1
+ i <- i + 1 // we should get here
+ }
+
+ do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq
+ do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
+ do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq
+ i |> should equal 9
+ }
+
+ []
+ let ``TaskSeq-chunkBySize prove we execute after-effects`` () = task {
+ let mutable i = 0
+
+ let ts = taskSeq {
+ i <- i + 1
+ i <- i + 1
+ yield 42
+ i <- i + 1 // we should get here
+ }
+
+ do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq
+ do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
+ do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq
+ i |> should equal 9
+ }
+
+ [)>]
+ let ``TaskSeq-chunkBySize should go over all items`` variant = task {
+ let ts = Gen.getSeqWithSideEffect variant
+ do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq
+ do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
+ do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq
+ // incl. the iteration of 'last', we reach 40
+ do! ts |> TaskSeq.last |> Task.map (should equal 40)
+ }
+
+ [)>]
+ let ``TaskSeq-chunkBySize multiple iterations over same sequence`` variant = task {
+ let ts = Gen.getSeqWithSideEffect variant
+ let mutable sum = 0
+
+ do!
+ TaskSeq.chunkBySize 1 ts
+ |> TaskSeq.collect TaskSeq.ofArray
+ |> TaskSeq.iter (fun item -> sum <- sum + item)
+
+ do!
+ TaskSeq.chunkBySize 2 ts
+ |> TaskSeq.collect TaskSeq.ofArray
+ |> TaskSeq.iter (fun item -> sum <- sum + item)
+
+ do!
+ TaskSeq.chunkBySize 3 ts
+ |> TaskSeq.collect TaskSeq.ofArray
+ |> TaskSeq.iter (fun item -> sum <- sum + item)
+
+ do!
+ TaskSeq.chunkBySize 4 ts
+ |> TaskSeq.collect TaskSeq.ofArray
+ |> TaskSeq.iter (fun item -> sum <- sum + item)
+
+ sum |> should equal 820 // side-effected tasks, so 'item' DOES CHANGE, each next iteration starts 10 higher
+ }
+
+ []
+ let ``TaskSeq-chunkBySize prove that an exception from the taskSeq is thrown`` () =
+ let items = taskSeq {
+ yield 42
+ yield! [ 1; 2 ]
+ do SideEffectPastEnd "at the end" |> raise
+ yield 43
+ }
+
+ fun () -> items |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
+ |> should throwAsyncExact typeof
diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs
index 710dadd..a0f59e2 100644
--- a/src/FSharp.Control.TaskSeq/TaskSeq.fs
+++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs
@@ -252,6 +252,8 @@ type TaskSeq private () =
yield! source2
}
+ static member chunkBySize (chunkSize: int) (source: TaskSeq<'T>) = Internal.chunkBySize chunkSize source
+
//
// iter/map/collect functions
//
diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi
index ff98586..3be9662 100644
--- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi
+++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi
@@ -823,6 +823,15 @@ type TaskSeq =
/// Thrown when the input task sequence is null.
static member chooseAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> TaskSeq<'U>
+ /// Divides the input sequence into chunks of size at most chunkSize.
+ ///
+ /// The maximum size of each chunk.
+ /// The input task sequence.
+ /// The task sequence divided into chunks.
+ /// Thrown when the input task sequence is null.
+ /// Thrown when chunkSize is not positive.
+ static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T[]>
+
///
/// Returns a new task sequence containing only the elements of the collection
/// for which the given function returns .
diff --git a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
index 66a92f2..9a6e441 100644
--- a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
+++ b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
@@ -1097,3 +1097,32 @@ module internal TaskSeqInternal =
go <- step
}
+
+ let chunkBySize chunkSize (source: TaskSeq<'T>) : TaskSeq<'T[]> =
+ if chunkSize < 1 then
+ invalidArg (nameof chunkSize) $"The value must be positive, but was %i{chunkSize}."
+
+ checkNonNull (nameof source) source
+
+ taskSeq {
+ use e = source.GetAsyncEnumerator CancellationToken.None
+ let mutable go = true
+ let! step = e.MoveNextAsync()
+ go <- step
+
+ if step then
+ let buffer = ResizeArray<_>()
+
+ while go do
+ buffer.Add e.Current
+
+ if buffer.Count = chunkSize then
+ yield buffer.ToArray()
+ buffer.Clear()
+
+ let! step = e.MoveNextAsync()
+ go <- step
+
+ if buffer.Count > 0 then
+ yield buffer.ToArray()
+ }