Skip to content

Commit 0f98eea

Browse files
authored
[parallelisation] few helpers to ease parallel processing (#749)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description - Break on Error and EOF - Transform functions ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent 87d5e0a commit 0f98eea

File tree

6 files changed

+66
-2
lines changed

6 files changed

+66
-2
lines changed

changes/20251117175436.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[parallelisation]` Added Transform helpers to ease the creation `transform` operations

changes/20251117175538.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[parallelisation]` Added `BreakOnErrorOrEOF` to help performing actions in parallel but for which `EOF` is considered as termination rather than an error

utils/parallelisation/contextual.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package parallelisation
22

33
import (
44
"context"
5+
"io"
56

67
"github.com/ARM-software/golang-utils/utils/commonerrors"
78
)
@@ -55,3 +56,8 @@ func BreakOnError(ctx context.Context, executionOptions *StoreOptions, contextua
5556
group.RegisterFunction(contextualFunc...)
5657
return group.Execute(ctx)
5758
}
59+
60+
// BreakOnErrorOrEOF is similar to BreakOnError but also stops on EOF. However, in this case, no error is returned
61+
func BreakOnErrorOrEOF(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error {
62+
return commonerrors.Ignore(BreakOnError(ctx, executionOptions, contextualFunc...), commonerrors.ErrEOF, io.EOF)
63+
}

utils/parallelisation/contextual_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package parallelisation
33
import (
44
"context"
55
"errors"
6+
"io"
67
"testing"
78
"time"
89

@@ -35,6 +36,7 @@ func TestForEach(t *testing.T) {
3536
closeError := commonerrors.ErrUnexpected
3637
errortest.AssertError(t, ForEach(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
3738
errortest.AssertError(t, BreakOnError(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
39+
errortest.AssertError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
3840
})
3941

4042
t.Run("close with cancellation", func(t *testing.T) {
@@ -43,11 +45,23 @@ func TestForEach(t *testing.T) {
4345
cancel()
4446
errortest.AssertError(t, ForEach(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled)
4547
errortest.AssertError(t, BreakOnError(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled)
48+
errortest.AssertError(t, BreakOnErrorOrEOF(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled)
4649
})
4750

4851
t.Run("break on error with no error", func(t *testing.T) {
4952
require.NoError(t, BreakOnError(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)))
5053
})
54+
t.Run("break on error or EOF with no error", func(t *testing.T) {
55+
require.NoError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)))
56+
})
57+
t.Run("break on error or EOF with no error", func(t *testing.T) {
58+
require.NoError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), func(_ context.Context) error {
59+
return commonerrors.ErrEOF
60+
}))
61+
require.NoError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), func(_ context.Context) error {
62+
return io.EOF
63+
}))
64+
})
5165
t.Run("for each with no error", func(t *testing.T) {
5266
require.NoError(t, ForEach(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)))
5367
})

utils/parallelisation/transform.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package parallelisation
22

33
import (
44
"context"
5+
"iter"
6+
"slices"
57

68
"go.uber.org/atomic"
79

@@ -101,12 +103,17 @@ func (g *TransformGroup[I, O]) appendResult(o *resultElement[O]) {
101103

102104
// Inputs registers inputs to transform.
103105
func (g *TransformGroup[I, O]) Inputs(ctx context.Context, i ...I) error {
104-
for j := range i {
106+
return g.InputSequence(ctx, slices.Values(i))
107+
}
108+
109+
// InputSequence registers inputs to transform.
110+
func (g *TransformGroup[I, O]) InputSequence(ctx context.Context, i iter.Seq[I]) error {
111+
for e := range i {
105112
err := DetermineContextError(ctx)
106113
if err != nil {
107114
return err
108115
}
109-
g.RegisterFunction(i[j])
116+
g.RegisterFunction(e)
110117
}
111118
return nil
112119
}
@@ -159,3 +166,31 @@ func NewTransformGroup[I any, O any](transform TransformFunc[I, O], options ...S
159166
}, options...)
160167
return g
161168
}
169+
170+
func transformF[I any, O any](ctx context.Context, inputs iter.Seq[I], transform TransformFunc[I, O], ordered bool, options ...StoreOption) (result []O, err error) {
171+
grp := NewTransformGroup[I, O](transform, options...)
172+
err = grp.InputSequence(ctx, inputs)
173+
if err != nil {
174+
return
175+
}
176+
err = grp.Transform(ctx)
177+
if err != nil {
178+
return
179+
}
180+
if ordered {
181+
result, err = grp.OrderedOutputs(ctx)
182+
} else {
183+
result, err = grp.Outputs(ctx)
184+
}
185+
return
186+
}
187+
188+
// Transform transforms inputs into outputs using the transform function.
189+
func Transform[I any, O any](ctx context.Context, inputs iter.Seq[I], transform TransformFunc[I, O], options ...StoreOption) ([]O, error) {
190+
return transformF[I, O](ctx, inputs, transform, false, options...)
191+
}
192+
193+
// TransformInOrder transforms inputs into outputs using the transform function but returns the output in the same order as the input.
194+
func TransformInOrder[I any, O any](ctx context.Context, inputs iter.Seq[I], transform TransformFunc[I, O], options ...StoreOption) ([]O, error) {
195+
return transformF[I, O](ctx, inputs, transform, true, options...)
196+
}

utils/parallelisation/transform_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package parallelisation
22

33
import (
44
"context"
5+
"slices"
56
"strconv"
67
"testing"
78

@@ -47,6 +48,9 @@ func TestNewTransformGroup(t *testing.T) {
4748
o, err = g.Outputs(context.Background())
4849
require.NoError(t, err)
4950
assert.ElementsMatch(t, in, o)
51+
o, err = Transform[string, int](context.Background(), slices.Values(in2), tr, RetainAfterExecution, Parallel)
52+
require.NoError(t, err)
53+
assert.ElementsMatch(t, in, o)
5054
o, err = g.OrderedOutputs(context.Background())
5155
require.NoError(t, err)
5256
assert.Empty(t, o)
@@ -55,6 +59,9 @@ func TestNewTransformGroup(t *testing.T) {
5559
o, err = g.OrderedOutputs(context.Background())
5660
require.NoError(t, err)
5761
assert.Equal(t, in, o)
62+
o, err = TransformInOrder[string, int](context.Background(), slices.Values(in2), tr, RetainAfterExecution, Parallel)
63+
require.NoError(t, err)
64+
assert.Equal(t, in, o)
5865
err = g.Inputs(context.Background(), in2...)
5966
require.NoError(t, err)
6067
assert.Equal(t, 2*numberOfInput, g.Len())

0 commit comments

Comments
 (0)