Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
* [#3777](https://github.com/livepeer/go-livepeer/pull/3777) docker: Forcefully SIGKILL runners after timeout (@pwilczynskiclearcode)
* [#3779](https://github.com/livepeer/go-livepeer/pull/3779) worker: Fix orphaned containers on node shutdown (@victorges)
* [#3781](https://github.com/livepeer/go-livepeer/pull/3781) worker/docker: Destroy containers from watch routines (@victorges)
* [#3727](https://github.com/livepeer/go-livepeer/pull/3727) BYOC: add streaming for BYOC pipelines using trickle (@ad-astra-video)

#### CLI
48 changes: 45 additions & 3 deletions core/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ func (b *Balance) Balance() *big.Rat {

// AddressBalances holds credit balances for ETH addresses
type AddressBalances struct {
balances map[ethcommon.Address]*Balances
mtx sync.Mutex
ttl time.Duration
balances map[ethcommon.Address]*Balances
mtx sync.Mutex
sharedBalMtx sync.Mutex
ttl time.Duration
}

// NewAddressBalances creates a new AddressBalances instance
Expand Down Expand Up @@ -99,6 +100,47 @@ func (a *AddressBalances) Balance(addr ethcommon.Address, id ManifestID) *big.Ra
return a.balancesForAddr(addr).Balance(id)
}

// compares expected balance with current balance and updates accordingly with the expected balance being the target
// returns the difference and if minimum balance was covered
// also returns if balance was reset to zero because expected was zero
func (a *AddressBalances) CompareAndUpdateBalance(addr ethcommon.Address, id ManifestID, expected *big.Rat, minimumBal *big.Rat) (*big.Rat, *big.Rat, bool, bool) {
a.sharedBalMtx.Lock()
defer a.sharedBalMtx.Unlock()
current := a.balancesForAddr(addr).Balance(id)
if current == nil {
//create a balance of 1 to start tracking
a.Debit(addr, id, big.NewRat(0, 1))
current = a.balancesForAddr(addr).Balance(id)
}
if expected == nil {
expected = big.NewRat(0, 1)
}
diff := new(big.Rat).Sub(expected, current)

if diff.Sign() > 0 {
a.Credit(addr, id, diff)
} else {
a.Debit(addr, id, new(big.Rat).Abs(diff))
}

var resetToZero bool
if expected.Sign() == 0 {
a.Debit(addr, id, current)

resetToZero = true
}

//get updated balance after changes
current = a.balancesForAddr(addr).Balance(id)

var minimumBalCovered bool
if current.Cmp(minimumBal) >= 0 {
minimumBalCovered = true
}

return current, diff, minimumBalCovered, resetToZero
}

// StopCleanup stops the cleanup loop for all balances
func (a *AddressBalances) StopCleanup() {
a.mtx.Lock()
Expand Down
94 changes: 94 additions & 0 deletions core/accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,97 @@ func TestBalancesCleanup(t *testing.T) {
// Now balance for mid1 should be cleaned as well
assert.Nil(b.Balance(mid1))
}

func TestAddressBalances_CompareAndUpdateBalance(t *testing.T) {
addr := ethcommon.BytesToAddress([]byte("foo"))
mid := ManifestID("some manifestID")
balances := NewAddressBalances(1 * time.Minute)
defer balances.StopCleanup()

assert := assert.New(t)

// Test 1: Balance doesn't exist - should initialize to 1 and then update to expected
expected := big.NewRat(10, 1)
minimumBal := big.NewRat(5, 1)
current, diff, minimumBalCovered, resetToZero := balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
assert.Zero(big.NewRat(10, 1).Cmp(diff), "Diff should be expected - initial (10 - 1)")
assert.True(minimumBalCovered, "Minimum balance should be covered when going from 1 to 10")
assert.False(resetToZero, "Should not be reset to zero")

// Test 2: Expected > Current (Credit scenario)
expected = big.NewRat(20, 1)
minimumBal = big.NewRat(15, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
assert.Zero(big.NewRat(10, 1).Cmp(diff), "Diff should be 20 - 10 = 10")
assert.True(minimumBalCovered, "Minimum balance should be covered when crossing threshold")
assert.False(resetToZero, "Should not be reset to zero")

// Test 3: Expected < Current (Debit scenario)
expected = big.NewRat(5, 1)
minimumBal = big.NewRat(3, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
assert.Zero(big.NewRat(-15, 1).Cmp(diff), "Diff should be 5 - 20 = -15")
assert.True(minimumBalCovered, "Minimum balance should still be covered")
assert.False(resetToZero, "Should not be reset to zero")

// Test 4: Expected == Current (No change)
expected = big.NewRat(5, 1)
minimumBal = big.NewRat(3, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should remain the same")
assert.Zero(big.NewRat(0, 1).Cmp(diff), "Diff should be 0")
assert.True(minimumBalCovered, "Minimum balance should still be covered")
assert.False(resetToZero, "Should not be reset to zero")

// Test 5: Reset to zero (current > 0, expected = 0)
balances.Credit(addr, mid, big.NewRat(5, 1)) // Set current to 10
expected = big.NewRat(0, 1)
minimumBal = big.NewRat(3, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be reset to zero")
assert.Zero(big.NewRat(-10, 1).Cmp(diff), "Diff should be 0 - 10 = -10")
assert.False(minimumBalCovered, "Minimum balance should not be covered when resetting to zero")
assert.True(resetToZero, "Should be marked as reset to zero")

// Test 6: Minimum balance covered threshold - just below to just above
expected = big.NewRat(2, 1)
minimumBal = big.NewRat(5, 1)
balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal) // Set to 2

expected = big.NewRat(5, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to 5")
assert.Zero(big.NewRat(3, 1).Cmp(diff), "Diff should be 5 - 2 = 3")
assert.True(minimumBalCovered, "Minimum balance should be covered when crossing from below to at threshold")
assert.False(resetToZero, "Should not be reset to zero")

// Test 7: Minimum balance not covered - already above threshold
expected = big.NewRat(10, 1)
minimumBal = big.NewRat(5, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to 10")
assert.Zero(big.NewRat(5, 1).Cmp(diff), "Diff should be 10 - 5 = 5")
assert.True(minimumBalCovered, "Minimum balance should still be covered")
assert.False(resetToZero, "Should not be reset to zero")

// Test 8: Negative balance handling
balances.Debit(addr, mid, big.NewRat(20, 1)) // Force negative: 10 - 20 = -10
expected = big.NewRat(5, 1)
minimumBal = big.NewRat(3, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
assert.Zero(big.NewRat(15, 1).Cmp(diff), "Diff should be 5 - (-10) = 15")
assert.True(minimumBalCovered, "Minimum balance should be covered when going from negative to positive above minimum")
assert.False(resetToZero, "Should not be reset to zero")
}
14 changes: 10 additions & 4 deletions core/ai_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,8 +1163,8 @@ func (orch *orchestrator) CheckExternalCapabilityCapacity(extCapability string)
func (orch *orchestrator) ReserveExternalCapabilityCapacity(extCapability string) error {
cap, ok := orch.node.ExternalCapabilities.Capabilities[extCapability]
if ok {
cap.mu.Lock()
defer cap.mu.Unlock()
cap.Mu.Lock()
defer cap.Mu.Unlock()

cap.Load++
return nil
Expand All @@ -1176,8 +1176,8 @@ func (orch *orchestrator) ReserveExternalCapabilityCapacity(extCapability string
func (orch *orchestrator) FreeExternalCapabilityCapacity(extCapability string) error {
cap, ok := orch.node.ExternalCapabilities.Capabilities[extCapability]
if ok {
cap.mu.Lock()
defer cap.mu.Unlock()
cap.Mu.Lock()
defer cap.Mu.Unlock()

cap.Load--
return nil
Expand All @@ -1200,6 +1200,12 @@ func (orch *orchestrator) JobPriceInfo(sender ethcommon.Address, jobCapability s
return nil, err
}

//ensure price numerator and denominator can be int64
jobPrice, err = common.PriceToInt64(jobPrice)
if err != nil {
return nil, fmt.Errorf("invalid job price: %w", err)
}

return &net.PriceInfo{
PricePerUnit: jobPrice.Num().Int64(),
PixelsPerUnit: jobPrice.Denom().Int64(),
Expand Down
Loading
Loading