Skip to content
12 changes: 12 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ type WorkflowOutboundInterceptor interface {
// SideEffect intercepts workflow.SideEffect.
SideEffect(ctx Context, f func(ctx Context) interface{}) converter.EncodedValue

// SideEffectWithOptions intercepts workflow.SideEffectWithOptions.
SideEffectWithOptions(ctx Context, options SideEffectOptions, f func(ctx Context) interface{}) converter.EncodedValue

// MutableSideEffect intercepts workflow.MutableSideEffect.
MutableSideEffect(
ctx Context,
Expand All @@ -305,6 +308,15 @@ type WorkflowOutboundInterceptor interface {
equals func(a, b interface{}) bool,
) converter.EncodedValue

// MutableSideEffectWithOptions intercepts workflow.MutableSideEffectWithOptions.
MutableSideEffectWithOptions(
ctx Context,
id string,
options MutableSideEffectOptions,
f func(ctx Context) interface{},
equals func(a, b interface{}) bool,
) converter.EncodedValue

// GetVersion intercepts workflow.GetVersion.
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version

Expand Down
20 changes: 20 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,15 @@ func (w *WorkflowOutboundInterceptorBase) SideEffect(
return w.Next.SideEffect(ctx, f)
}

// SideEffectWithOptions implements WorkflowOutboundInterceptor.SideEffectWithOptions.
func (w *WorkflowOutboundInterceptorBase) SideEffectWithOptions(
ctx Context,
options SideEffectOptions,
f func(ctx Context) interface{},
) converter.EncodedValue {
return w.Next.SideEffectWithOptions(ctx, options, f)
}

// MutableSideEffect implements WorkflowOutboundInterceptor.MutableSideEffect.
func (w *WorkflowOutboundInterceptorBase) MutableSideEffect(
ctx Context,
Expand All @@ -373,6 +382,17 @@ func (w *WorkflowOutboundInterceptorBase) MutableSideEffect(
return w.Next.MutableSideEffect(ctx, id, f, equals)
}

// MutableSideEffectWithOptions implements WorkflowOutboundInterceptor.MutableSideEffectWithOptions.
func (w *WorkflowOutboundInterceptorBase) MutableSideEffectWithOptions(
ctx Context,
id string,
options MutableSideEffectOptions,
f func(ctx Context) interface{},
equals func(a, b interface{}) bool,
) converter.EncodedValue {
return w.Next.MutableSideEffectWithOptions(ctx, id, options, f, equals)
}

// GetVersion implements WorkflowOutboundInterceptor.GetVersion.
func (w *WorkflowOutboundInterceptorBase) GetVersion(
ctx Context,
Expand Down
20 changes: 20 additions & 0 deletions internal/interceptortest/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ func (p *proxyWorkflowOutbound) SideEffect(
return
}

func (p *proxyWorkflowOutbound) SideEffectWithOptions(
ctx workflow.Context,
options workflow.SideEffectOptions,
f func(ctx workflow.Context) interface{},
) (ret converter.EncodedValue) {
ret, _ = p.invoke(ctx, options, f)[0].Interface().(converter.EncodedValue)
return
}

func (p *proxyWorkflowOutbound) MutableSideEffect(
ctx workflow.Context,
id string,
Expand All @@ -410,6 +419,17 @@ func (p *proxyWorkflowOutbound) MutableSideEffect(
return
}

func (p *proxyWorkflowOutbound) MutableSideEffectWithOptions(
ctx workflow.Context,
id string,
options workflow.MutableSideEffectOptions,
f func(ctx workflow.Context) interface{},
equals func(a, b interface{}) bool,
) (ret converter.EncodedValue) {
ret, _ = p.invoke(ctx, id, options, f, equals)[0].Interface().(converter.EncodedValue)
return
}

func (p *proxyWorkflowOutbound) GetVersion(
ctx workflow.Context,
changeID string,
Expand Down
8 changes: 4 additions & 4 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,7 @@ func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string, sea
}
}

func (h *commandsHelper) recordSideEffectMarker(sideEffectID int64, data *commonpb.Payloads, dc converter.DataConverter) commandStateMachine {
func (h *commandsHelper) recordSideEffectMarker(sideEffectID int64, data *commonpb.Payloads, dc converter.DataConverter, userMetadata *sdk.UserMetadata) commandStateMachine {
markerID := fmt.Sprintf("%v_%v", sideEffectMarkerName, sideEffectID)
sideEffectIDPayload, err := dc.ToPayloads(sideEffectID)
if err != nil {
Expand All @@ -1336,7 +1336,7 @@ func (h *commandsHelper) recordSideEffectMarker(sideEffectID int64, data *common
sideEffectMarkerDataName: data,
},
}
command := h.newMarkerCommandStateMachine(markerID, attributes, nil)
command := h.newMarkerCommandStateMachine(markerID, attributes, userMetadata)
h.addCommand(command)
return command
}
Expand All @@ -1359,7 +1359,7 @@ func (h *commandsHelper) recordLocalActivityMarker(activityID string, details ma
return command
}

func (h *commandsHelper) recordMutableSideEffectMarker(mutableSideEffectID string, callCountHint int, data *commonpb.Payloads, dc converter.DataConverter) commandStateMachine {
func (h *commandsHelper) recordMutableSideEffectMarker(mutableSideEffectID string, callCountHint int, data *commonpb.Payloads, dc converter.DataConverter, userMetadata *sdk.UserMetadata) commandStateMachine {
// In order to avoid duplicate marker IDs, we must append the counter to the
// user-provided ID
mutableSideEffectID = fmt.Sprintf("%v_%v", mutableSideEffectID, h.getNextID())
Expand All @@ -1383,7 +1383,7 @@ func (h *commandsHelper) recordMutableSideEffectMarker(mutableSideEffectID strin
mutableSideEffectCallCounterName: mutableSideEffectCounterPayload,
},
}
command := h.newMarkerCommandStateMachine(markerID, attributes, nil)
command := h.newMarkerCommandStateMachine(markerID, attributes, userMetadata)
h.addCommand(command)
return command
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_command_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func Test_MarkerStateMachine(t *testing.T) {
h := newCommandsHelper()

// record marker for side effect
d := h.recordSideEffectMarker(1, nil, converter.GetDefaultDataConverter())
d := h.recordSideEffectMarker(1, nil, converter.GetDefaultDataConverter(), nil)
require.Equal(t, commandStateCreated, d.getState())

// send commands
Expand Down
24 changes: 16 additions & 8 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ func getChangeVersion(changeID string, version Version) string {
return fmt.Sprintf("%s-%v", changeID, version)
}

func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler) {
func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler, summary string) {
sideEffectID := wc.getNextSideEffectID()
var result *commonpb.Payloads
if wc.isReplay {
Expand Down Expand Up @@ -961,7 +961,11 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, erro
}
}

wc.commandsHelper.recordSideEffectMarker(sideEffectID, result, wc.dataConverter)
userMetadata, err := buildUserMetadata(summary, "", wc.dataConverter)
if err != nil {
panic(fmt.Sprintf("failed to build user metadata for side effect: %v", err))
}
wc.commandsHelper.recordSideEffectMarker(sideEffectID, result, wc.dataConverter, userMetadata)

callback(result, nil)
wc.logger.Debug("SideEffect Marker added", tagSideEffectID, sideEffectID)
Expand Down Expand Up @@ -1033,7 +1037,7 @@ func (wc *workflowEnvironmentImpl) lookupMutableSideEffect(id string) *commonpb.
return payloads
}

func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool, summary string) converter.EncodedValue {
wc.mutableSideEffectCallCounter[id]++
callCount := wc.mutableSideEffectCallCounter[id]

Expand All @@ -1044,7 +1048,7 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa
// recorded on the next task. We have to append the current command
// counter to the user-provided ID to avoid duplicates.
if wc.mutableSideEffectsRecorded[fmt.Sprintf("%v_%v", id, wc.commandsHelper.getNextID())] {
return wc.recordMutableSideEffect(id, callCount, result)
return wc.recordMutableSideEffect(id, callCount, result, summary)
}
return encodedResult
}
Expand All @@ -1054,15 +1058,15 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa
return encodedResult
}

return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(newValue))
return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(newValue), summary)
}

if wc.isReplay {
// This should not happen
panicIllegalState(fmt.Sprintf("[TMPRL1100] Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id))
}

return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(f()))
return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(f()), summary)
}

func (wc *workflowEnvironmentImpl) isEqualValue(newValue interface{}, encodedOldValue *commonpb.Payloads, equals func(a, b interface{}) bool) bool {
Expand Down Expand Up @@ -1098,12 +1102,16 @@ func (wc *workflowEnvironmentImpl) encodeArg(arg interface{}) (*commonpb.Payload
return wc.GetDataConverter().ToPayloads(arg)
}

func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, callCountHint int, data *commonpb.Payloads) converter.EncodedValue {
func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, callCountHint int, data *commonpb.Payloads, summary string) converter.EncodedValue {
details, err := encodeArgs(wc.GetDataConverter(), []interface{}{id, data})
if err != nil {
panic(err)
}
wc.commandsHelper.recordMutableSideEffectMarker(id, callCountHint, details, wc.dataConverter)
userMetadata, err := buildUserMetadata(summary, "", wc.dataConverter)
if err != nil {
panic(fmt.Sprintf("failed to build user metadata for mutable side effect: %v", err))
}
wc.commandsHelper.recordMutableSideEffectMarker(id, callCountHint, details, wc.dataConverter, userMetadata)
if wc.mutableSideEffect[id] == nil {
wc.mutableSideEffect[id] = make(map[int]*commonpb.Payloads)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type (
AsyncActivityClient
LocalActivityClient
WorkflowTimerClient
SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler)
SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler, summary string)
GetVersion(changeID string, minSupported, maxSupported Version) Version
WorkflowInfo() *WorkflowInfo
TypedSearchAttributes() SearchAttributes
Expand Down Expand Up @@ -114,7 +114,7 @@ type (
handler func(string, string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks),
)
IsReplaying() bool
MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool) converter.EncodedValue
MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool, summary string) converter.EncodedValue
GetDataConverter() converter.DataConverter
GetFailureConverter() converter.FailureConverter
AddSession(sessionInfo *SessionInfo)
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2740,7 +2740,7 @@ func (env *testWorkflowEnvironmentImpl) makeUniqueNexusOperationToken(
return fmt.Sprintf("%s_%s_%s", service, operation, token)
}

func (env *testWorkflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler) {
func (env *testWorkflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler, _ string) {
callback(f())
}

Expand Down Expand Up @@ -2872,7 +2872,7 @@ func (env *testWorkflowEnvironmentImpl) UpsertMemo(memoMap map[string]interface{
return err
}

func (env *testWorkflowEnvironmentImpl) MutableSideEffect(_ string, f func() interface{}, _ func(a, b interface{}) bool) converter.EncodedValue {
func (env *testWorkflowEnvironmentImpl) MutableSideEffect(_ string, f func() interface{}, _ func(a, b interface{}) bool, _ string) converter.EncodedValue {
return newEncodedValue(env.encodeValue(f()), env.GetDataConverter())
}

Expand Down
69 changes: 67 additions & 2 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,32 @@ type (
// NOTE: Experimental
TimerOptions TimerOptions
}

// SideEffectOptions are options for executing a side effect.
//
// Exposed as: [go.temporal.io/sdk/workflow.SideEffectOptions]
SideEffectOptions struct {
// Summary is a single-line summary of this side effect that will appear in UI/CLI.
// This can be in single-line Temporal Markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
Summary string
}

// MutableSideEffectOptions are options for executing a mutable side effect.
//
// Exposed as: [go.temporal.io/sdk/workflow.MutableSideEffectOptions]
MutableSideEffectOptions struct {
// Summary is a single-line summary of this side effect that will appear in UI/CLI.
// This can be in single-line Temporal Markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
Summary string
}
)

// Await blocks the calling thread until condition() returns true
Expand Down Expand Up @@ -2062,7 +2088,27 @@ func SideEffect(ctx Context, f func(ctx Context) interface{}) converter.EncodedV
return i.SideEffect(ctx, f)
}

// SideEffectWithOptions executes the provided function once, records its result into the workflow history.
// The recorded result on history will be returned without executing the provided function during replay.
// This guarantees the deterministic requirement for workflow as the exact same result will be returned in replay.
// Common use case is to run some short non-deterministic code in workflow, like getting random number or new UUID.
// The only way to fail SideEffect is to panic which causes workflow task failure. The workflow task after timeout is
// rescheduled and re-executed giving SideEffect another chance to succeed.
//
// The options parameter allows specifying additional options like a summary that will be displayed in UI/CLI.
//
// Exposed as: [go.temporal.io/sdk/workflow.SideEffectWithOptions]
func SideEffectWithOptions(ctx Context, options SideEffectOptions, f func(ctx Context) interface{}) converter.EncodedValue {
assertNotInReadOnlyState(ctx)
i := getWorkflowOutboundInterceptor(ctx)
return i.SideEffectWithOptions(ctx, options, f)
}

func (wc *workflowEnvironmentInterceptor) SideEffect(ctx Context, f func(ctx Context) interface{}) converter.EncodedValue {
return wc.SideEffectWithOptions(ctx, SideEffectOptions{}, f)
}

func (wc *workflowEnvironmentInterceptor) SideEffectWithOptions(ctx Context, options SideEffectOptions, f func(ctx Context) interface{}) converter.EncodedValue {
dc := getDataConverterFromWorkflowContext(ctx)
future, settable := NewFuture(ctx)
wrapperFunc := func() (*commonpb.Payloads, error) {
Expand All @@ -2075,7 +2121,7 @@ func (wc *workflowEnvironmentInterceptor) SideEffect(ctx Context, f func(ctx Con
resultCallback := func(result *commonpb.Payloads, err error) {
settable.Set(EncodedValue{result, dc}, err)
}
wc.env.SideEffect(wrapperFunc, resultCallback)
wc.env.SideEffect(wrapperFunc, resultCallback, options.Summary)
var encoded EncodedValue
if err := future.Get(ctx, &encoded); err != nil {
panic(err)
Expand Down Expand Up @@ -2107,14 +2153,33 @@ func MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{},
return i.MutableSideEffect(ctx, id, f, equals)
}

// MutableSideEffectWithOptions executes the provided function once, then it looks up the history for the value with the given id.
// If there is no existing value, then it records the function result as a value with the given id on history;
// otherwise, it compares whether the existing value from history has changed from the new function result by calling
// the provided equals function. If they are equal, it returns the value without recording a new one in history;
// otherwise, it records the new value with the same id on history.
//
// The options parameter allows specifying additional options like a summary that will be displayed in UI/CLI.
//
// Exposed as: [go.temporal.io/sdk/workflow.MutableSideEffectWithOptions]
func MutableSideEffectWithOptions(ctx Context, id string, options MutableSideEffectOptions, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
assertNotInReadOnlyState(ctx)
i := getWorkflowOutboundInterceptor(ctx)
return i.MutableSideEffectWithOptions(ctx, id, options, f, equals)
}

func (wc *workflowEnvironmentInterceptor) MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
return wc.MutableSideEffectWithOptions(ctx, id, MutableSideEffectOptions{}, f, equals)
}

func (wc *workflowEnvironmentInterceptor) MutableSideEffectWithOptions(ctx Context, id string, options MutableSideEffectOptions, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
wrapperFunc := func() interface{} {
coroutineState := getState(ctx)
defer coroutineState.dispatcher.setIsReadOnly(false)
coroutineState.dispatcher.setIsReadOnly(true)
return f(ctx)
}
return wc.env.MutableSideEffect(id, wrapperFunc, equals)
return wc.env.MutableSideEffect(id, wrapperFunc, equals, options.Summary)
}

// DefaultVersion is a version returned by GetVersion for code that wasn't versioned before
Expand Down
Loading
Loading