Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/biz/model/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ func (v Version) Increment() Version {
return Version(uint(v) + 1)
}

// Decrement returns a new Version with the value decremented by 1.
// Returns 0 if the current value is already 0 (prevents underflow).
func (v Version) Decrement() Version {
if uint(v) == 0 {
return Version(0)
}
return Version(uint(v) - 1)
}

func (v Version) Serialize() uint {
return SerializeUint(v)
}
Expand Down
10 changes: 5 additions & 5 deletions internal/biz/model/representations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ import (
// If a representation is present, its version must also be present (and vice versa).
type Representations struct {
commonData Representation
commonVersion *uint
commonVersion *Version
reporterData Representation
reporterRepresentationVersion *uint
reporterRepresentationVersion *Version
}

// NewRepresentations creates a Representations with optional common and reporter data.
// At least one of common or reporter representation must be provided.
// If a representation is provided, its version must also be provided.
func NewRepresentations(
commonData Representation,
commonVersion *uint,
commonVersion *Version,
reporterData Representation,
reporterRepresentationVersion *uint,
reporterRepresentationVersion *Version,
) (*Representations, error) {
// Validate that at least one representation is present
hasCommon := len(commonData) > 0 && commonVersion != nil
Expand Down Expand Up @@ -56,7 +56,7 @@ func (r *Representations) CommonData() Representation {
}

// CommonVersion returns a pointer to the common version, or nil if not present.
func (r *Representations) CommonVersion() *uint {
func (r *Representations) CommonVersion() *Version {
return r.commonVersion
}

Expand Down
6 changes: 3 additions & 3 deletions internal/biz/model/resource_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
type ResourceRepository interface {
NextResourceId() (ResourceId, error)
NextReporterResourceId() (ReporterResourceId, error)
Save(tx *gorm.DB, resource Resource, operationType EventOperationType, txid string) error
Save(tx *gorm.DB, resource Resource, operationType EventOperationType, txid TransactionId) error
FindResourceByKeys(tx *gorm.DB, key ReporterResourceKey) (*Resource, error)
FindCurrentAndPreviousVersionedRepresentations(tx *gorm.DB, key ReporterResourceKey, currentVersion *uint, operationType EventOperationType) (*Representations, *Representations, error)
FindCurrentAndPreviousVersionedRepresentations(tx *gorm.DB, key ReporterResourceKey, currentVersion *Version, operationType EventOperationType) (*Representations, *Representations, error)
FindLatestRepresentations(tx *gorm.DB, key ReporterResourceKey) (*Representations, error)
GetDB() *gorm.DB
GetTransactionManager() TransactionManager
HasTransactionIdBeenProcessed(tx *gorm.DB, transactionId string) (bool, error)
HasTransactionIdBeenProcessed(tx *gorm.DB, transactionId TransactionId) (bool, error)
}
54 changes: 24 additions & 30 deletions internal/biz/model/schema_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestCalculateTuples(t *testing.T) {
tests := []struct {
name string
version uint
version model.Version
currentWorkspaceID string
previousWorkspaceID string
expectTuplesToCreate bool
Expand All @@ -25,7 +25,7 @@ func TestCalculateTuples(t *testing.T) {
}{
{
name: "version 0 creates initial tuple",
version: 0,
version: model.NewVersion(0),
currentWorkspaceID: "workspace-initial",
previousWorkspaceID: "",
expectTuplesToCreate: true,
Expand All @@ -35,7 +35,7 @@ func TestCalculateTuples(t *testing.T) {
},
{
name: "workspace change creates and deletes tuples",
version: 2,
version: model.NewVersion(2),
currentWorkspaceID: "workspace-new",
previousWorkspaceID: "workspace-old",
expectTuplesToCreate: true,
Expand All @@ -47,7 +47,7 @@ func TestCalculateTuples(t *testing.T) {
},
{
name: "workspace change creates and deletes tuples version 1",
version: 1,
version: model.NewVersion(1),
currentWorkspaceID: "workspace-new",
previousWorkspaceID: "workspace-old",
expectTuplesToCreate: true,
Expand All @@ -59,7 +59,7 @@ func TestCalculateTuples(t *testing.T) {
},
{
name: "same workspace does not create or delete tuples",
version: 2,
version: model.NewVersion(2),
currentWorkspaceID: "workspace-same",
previousWorkspaceID: "workspace-same",
expectTuplesToCreate: false,
Expand All @@ -85,19 +85,17 @@ func TestCalculateTuples(t *testing.T) {
if tt.currentWorkspaceID != "" {
currentData = map[string]interface{}{"workspace_id": tt.currentWorkspaceID}
}
ver := tt.version
current, err = model.NewRepresentations(
model.Representation(currentData),
&tt.version,
&ver,
nil,
nil,
)
require.NoError(t, err)

if tt.previousWorkspaceID != "" {
prevVer := uint(0)
if tt.version > 0 {
prevVer = tt.version - 1
}
prevVer := tt.version.Decrement()
previous, err = model.NewRepresentations(
model.Representation(map[string]interface{}{"workspace_id": tt.previousWorkspaceID}),
&prevVer,
Expand Down Expand Up @@ -158,18 +156,18 @@ func TestGetWorkspaceVersions(t *testing.T) {
)
require.NoError(t, err)

version := uint(1)
ver := model.NewVersion(1)
current, err := model.NewRepresentations(
model.Representation(map[string]interface{}{"workspace_id": "ws-current"}),
&version,
&ver,
nil,
nil,
)
require.NoError(t, err)
prevVersion := version - 1
prevVer := ver.Decrement()
previous, err := model.NewRepresentations(
model.Representation(map[string]interface{}{"workspace_id": "ws-prev"}),
&prevVersion,
&prevVer,
nil,
nil,
)
Expand Down Expand Up @@ -253,19 +251,19 @@ func TestDetermineTupleOperations(t *testing.T) {
)
require.NoError(t, err)

version2 := uint(2)
ver2 := model.NewVersion(2)
current, err := model.NewRepresentations(
model.Representation(map[string]interface{}{"workspace_id": "workspace-new"}),
&version2,
&ver2,
nil,
nil,
)
require.NoError(t, err)

version1 := uint(1)
ver1 := model.NewVersion(1)
previous, err := model.NewRepresentations(
model.Representation(map[string]interface{}{"workspace_id": "workspace-old"}),
&version1,
&ver1,
nil,
nil,
)
Expand All @@ -281,7 +279,7 @@ func TestCalculateTuples_OperationTypeScenarios(t *testing.T) {
testCases := []struct {
name string
operationType model.EventOperationType // kept for scenario naming; not used by CalculateTuples
version uint
version model.Version
currentWorkspaceID string
previousWorkspaceID string
expectTuplesToCreate bool
Expand All @@ -290,7 +288,7 @@ func TestCalculateTuples_OperationTypeScenarios(t *testing.T) {
{
name: "CREATE operation should only create tuples",
operationType: model.OperationTypeCreated,
version: 0,
version: model.NewVersion(0),
currentWorkspaceID: "workspace-new",
previousWorkspaceID: "",
expectTuplesToCreate: true,
Expand All @@ -299,7 +297,7 @@ func TestCalculateTuples_OperationTypeScenarios(t *testing.T) {
{
name: "UPDATE operation with workspace change should create and delete tuples",
operationType: model.OperationTypeUpdated,
version: 1,
version: model.NewVersion(1),
currentWorkspaceID: "workspace-new",
previousWorkspaceID: "workspace-old",
expectTuplesToCreate: true,
Expand All @@ -308,7 +306,7 @@ func TestCalculateTuples_OperationTypeScenarios(t *testing.T) {
{
name: "UPDATE operation with same workspace should not create or delete tuples",
operationType: model.OperationTypeUpdated,
version: 1,
version: model.NewVersion(1),
currentWorkspaceID: "workspace-same",
previousWorkspaceID: "workspace-same",
expectTuplesToCreate: false,
Expand All @@ -317,7 +315,7 @@ func TestCalculateTuples_OperationTypeScenarios(t *testing.T) {
{
name: "DELETE operation should only delete tuples",
operationType: model.OperationTypeDeleted,
version: 1,
version: model.NewVersion(1),
currentWorkspaceID: "", // synthetic empty current
previousWorkspaceID: "workspace-current", // previous holds latest
expectTuplesToCreate: false,
Expand All @@ -343,25 +341,21 @@ func TestCalculateTuples_OperationTypeScenarios(t *testing.T) {
// Build current representation
if tc.currentWorkspaceID != "" {
currentData := map[string]interface{}{"workspace_id": tc.currentWorkspaceID}
curVer := tc.version
currentRep, err := model.NewRepresentations(
model.Representation(currentData),
&tc.version,
&curVer,
nil,
nil,
)
require.NoError(t, err)
current = currentRep
} else {
// For DELETE: current is nil (no new/current state)
current = nil
}

// Build previous representation
if tc.previousWorkspaceID != "" {
prevVer := uint(0)
if tc.version > 0 {
prevVer = tc.version - 1
}
prevVer := tc.version.Decrement()
previous, err = model.NewRepresentations(
model.Representation(map[string]interface{}{"workspace_id": tc.previousWorkspaceID}),
&prevVer,
Expand Down
4 changes: 2 additions & 2 deletions internal/biz/model_legacy/outboxevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func convertResourceToTupleEvent(reporterResourceKey bizmodel.ReporterResourceKe
return payload, nil
}

func NewOutboxEventsFromResourceEvent(domainResourceEvent bizmodel.ResourceEvent, operationType bizmodel.EventOperationType, txid string) (*OutboxEvent, *OutboxEvent, error) {
func NewOutboxEventsFromResourceEvent(domainResourceEvent bizmodel.ResourceEvent, operationType bizmodel.EventOperationType, txid bizmodel.TransactionId) (*OutboxEvent, *OutboxEvent, error) {
var payload internal.JsonObject
var tuplePayload internal.JsonObject
var err error
Expand Down Expand Up @@ -238,7 +238,7 @@ func NewOutboxEventsFromResourceEvent(domainResourceEvent bizmodel.ResourceEvent
Operation: operationType,
AggregateType: TupleAggregateType,
AggregateID: domainResourceEvent.Id().String(),
TxId: txid,
TxId: txid.String(),
Payload: tuplePayload,
}

Expand Down
4 changes: 2 additions & 2 deletions internal/biz/model_legacy/outboxevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/assert"
)

const txid = "txid"
var txid = bizmodel.NewTransactionId("txid")

func createTestResourceReportEvent() bizmodel.ResourceReportEvent {
resourceIdUUID := uuid.MustParse("550e8400-e29b-41d4-a716-446655440001")
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestNewOutboxEventsFromResourceEventDeleted(t *testing.T) {

func assertTupleEventFromDomainEvent(t *testing.T, resourceEvent bizmodel.ResourceReportEvent, event *OutboxEvent) {
assert.NotNil(t, event)
assert.Equal(t, txid, event.TxId)
assert.Equal(t, txid.String(), event.TxId)
assert.Equal(t, TupleAggregateType, event.AggregateType)
assert.Equal(t, resourceEvent.Id().String(), event.AggregateID)

Expand Down
16 changes: 8 additions & 8 deletions internal/biz/usecase/resources/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (uc *Usecase) ReportResource(ctx context.Context, cmd ReportResourceCommand
// provides the actual correctness guarantee, if a duplicate sneaks past this
// advisory check due to a concurrent commit.
if uc.Config.IdempotencyCheckEnabled && cmd.TransactionId != nil {
alreadyProcessed, err := uc.resourceRepository.HasTransactionIdBeenProcessed(uc.resourceRepository.GetDB(), cmd.TransactionId.String())
alreadyProcessed, err := uc.resourceRepository.HasTransactionIdBeenProcessed(uc.resourceRepository.GetDB(), *cmd.TransactionId)
if err != nil {
return fmt.Errorf("failed to check transaction ID: %w", err)
}
Expand All @@ -174,12 +174,12 @@ func (uc *Usecase) ReportResource(ctx context.Context, cmd ReportResourceCommand
if err == nil && res != nil {
log.Info("Resource already exists, updating: ")
operationType = model.OperationTypeUpdated
return uc.updateResource(tx, cmd, res, txid.String())
return uc.updateResource(tx, cmd, res, txid)
}

log.Info("Creating new resource")
operationType = model.OperationTypeCreated
return uc.createResource(tx, cmd, txid.String())
return uc.createResource(tx, cmd, txid)
Comment on lines +177 to +182
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Inspecting the resource write path and outbox txid usage..."
fd 'resource_service.go$' internal -x sed -n '120,310p' {}
fd 'resource_repository.go$' internal -x sed -n '120,220p;360,430p' {}
fd 'outboxevents.go$' internal -x sed -n '200,250p' {}

echo
echo "Searching for the two transaction-id flows..."
rg -n 'HasTransactionIdBeenProcessed\(|model\.NewResource\(|existingResource\.Update\(|resourceRepository\.Save\(' internal -g '!**/*_test.go'

Repository: project-kessel/inventory-api

Length of output: 19958


Resolve transaction ID inconsistency across the write path.

The resource model stores *cmd.TransactionId (lines 251, 283) while Save() receives and forwards a separate request-scoped txid to the outbox (lines 269, 296). When a caller supplies an idempotency key, these diverge from the start. The retry logic (lines 188–191) compounds this by rewriting cmd.TransactionId without updating the captured txid, causing the same operation to persist with different transaction IDs in the resource vs. outbox event stream. This breaks correlation between stored state and published events, compromising idempotency tracking and event replay.

Either thread a single effective transaction ID through both the resource model and outbox calls, or explicitly split these into distinct idempotency key and outbox correlation ID concepts with clear naming to prevent future regressions.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/biz/usecase/resources/resource_service.go` around lines 177 - 182,
Save currently diverges between cmd.TransactionId and the local request-scoped
txid (used in createResource/updateResource and outbox), and the retry path
rewrites cmd.TransactionId without updating the captured txid; fix by computing
a single effective transaction ID up front (e.g. effectiveTxID := first non-nil
of cmd.TransactionId and request txid or generate one) and use that same
effectiveTxID everywhere: set cmd.TransactionId = effectiveTxID before any
retries, pass effectiveTxID into uc.createResource and uc.updateResource calls
and into the outbox invocation, or alternatively rename and separate concepts
explicitly (e.g. IdempotencyKey vs OutboxCorrelationId) and wire them
consistently so resource model (cmd.TransactionId), the local txid variable, and
outbox events always carry the same identifier.

},
)
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func (uc *Usecase) ReportResource(ctx context.Context, cmd ReportResourceCommand
return nil
}

func (uc *Usecase) createResource(tx *gorm.DB, cmd ReportResourceCommand, txidStr string) error {
func (uc *Usecase) createResource(tx *gorm.DB, cmd ReportResourceCommand, txid model.TransactionId) error {
resourceId, err := uc.resourceRepository.NextResourceId()
if err != nil {
return err
Expand Down Expand Up @@ -266,10 +266,10 @@ func (uc *Usecase) createResource(tx *gorm.DB, cmd ReportResourceCommand, txidSt
return err
}

return uc.resourceRepository.Save(tx, resource, model.OperationTypeCreated, txidStr)
return uc.resourceRepository.Save(tx, resource, model.OperationTypeCreated, txid)
}

func (uc *Usecase) updateResource(tx *gorm.DB, cmd ReportResourceCommand, existingResource *model.Resource, txidStr string) error {
func (uc *Usecase) updateResource(tx *gorm.DB, cmd ReportResourceCommand, existingResource *model.Resource, txid model.TransactionId) error {
reporterResourceKey, err := model.NewReporterResourceKey(
cmd.LocalResourceId,
cmd.ResourceType,
Expand All @@ -293,7 +293,7 @@ func (uc *Usecase) updateResource(tx *gorm.DB, cmd ReportResourceCommand, existi
return fmt.Errorf("failed to update resource: %w", err)
}

return uc.resourceRepository.Save(tx, *existingResource, model.OperationTypeUpdated, txidStr)
return uc.resourceRepository.Save(tx, *existingResource, model.OperationTypeUpdated, txid)
}

func (uc *Usecase) Delete(ctx context.Context, reporterResourceKey model.ReporterResourceKey) error {
Expand Down Expand Up @@ -322,7 +322,7 @@ func (uc *Usecase) Delete(ctx context.Context, reporterResourceKey model.Reporte
if err != nil {
return fmt.Errorf("failed to delete resource: %w", err)
}
return uc.resourceRepository.Save(tx, *res, model.OperationTypeDeleted, txid.String())
return uc.resourceRepository.Save(tx, *res, model.OperationTypeDeleted, txid)
} else {
if errors.Is(err, gorm.ErrRecordNotFound) {
return ErrResourceNotFound
Expand Down
3 changes: 2 additions & 1 deletion internal/biz/usecase/resources/resource_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,9 +1121,10 @@ func TestGetCurrentAndPreviousWorkspaceID(t *testing.T) {

// Helper function to create a Representations for testing
func createTestRep(t *testing.T, version uint, data map[string]interface{}) *model.Representations {
v := model.NewVersion(version)
rep, err := model.NewRepresentations(
model.Representation(data),
&version,
&v,
nil,
nil,
)
Expand Down
Loading
Loading