Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions lib/faulty/circuit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,11 @@ def run(cache: nil, &block)
return cached_value if !cached_value.nil? && !cache_should_refresh?(cache)

current_status = status
return run_skipped(cached_value) unless current_status.can_run?

run_exec(current_status, cached_value, cache, &block)
if current_status.can_run? && reserve(current_status)
run_exec(current_status, cached_value, cache, &block)
else
run_skipped(cached_value) unless current_status.can_run?
end
end

# Force the circuit to stay open until unlocked
Expand Down Expand Up @@ -403,6 +405,16 @@ def run_skipped(cached_value)
cached_value
end

# Reserves execution for this circuit when it is half-open
#
# This prevents concurrent evaluation from allowing multiple simultaneous
# runs for half-open circuits.
def reserve(status)
return true unless status.half_open?

storage.reserve(self, Faulty.current_time, status.reserved_at)
end

# Execute a run
#
# @param cached_value The cached value if one is available
Expand Down
24 changes: 18 additions & 6 deletions lib/faulty/status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ class Faulty
:state,
:lock,
:opened_at,
:reserved_at,
:failure_rate,
:sample_size,
:options,
:stub
:stub,
:current_time
)

class Status
Expand Down Expand Up @@ -66,7 +68,8 @@ class Status
# sample_size
# @return [Status]
def self.from_entries(entries, **hash)
window_start = Faulty.current_time - hash[:options].evaluation_window
current_time = Faulty.current_time
window_start = current_time - hash[:options].evaluation_window
size = entries.size
i = 0
failures = 0
Expand All @@ -84,7 +87,8 @@ def self.from_entries(entries, **hash)

new(hash.merge(
sample_size: sample_size,
failure_rate: sample_size.zero? ? 0.0 : failures.to_f / sample_size
failure_rate: sample_size.zero? ? 0.0 : failures.to_f / sample_size,
current_time: current_time
))
end

Expand All @@ -94,7 +98,7 @@ def self.from_entries(entries, **hash)
#
# @return [Boolean] True if open
def open?
state == :open && opened_at + options.cool_down > Faulty.current_time
state == :open && opened_at + options.cool_down > current_time
end

# Whether the circuit is closed
Expand All @@ -112,7 +116,7 @@ def closed?
#
# @return [Boolean] True if half-open
def half_open?
state == :open && opened_at + options.cool_down <= Faulty.current_time
state == :open && opened_at + options.cool_down <= current_time
end

# Whether the circuit is locked open
Expand All @@ -129,13 +133,20 @@ def locked_closed?
lock == :closed
end

def reserved?
return false unless reserved_at

state == :open && reserved_at + options.cool_down >= current_time
end

# Whether the circuit can be run
#
# Takes the circuit state, locks and cooldown into account
#
# @return [Boolean] True if the circuit can be run
def can_run?
return false if locked_open?
return false if reserved?

closed? || locked_closed? || half_open?
end
Expand Down Expand Up @@ -166,7 +177,8 @@ def defaults
state: :closed,
failure_rate: 0.0,
sample_size: 0,
stub: false
stub: false,
current_time: Faulty.current_time
}
end
end
Expand Down
15 changes: 15 additions & 0 deletions lib/faulty/storage/fault_tolerant_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,21 @@ def status(circuit)
stub_status(circuit)
end

# Safely reserve execution of a circuit
#
# If the backend is unavailable, this returns `true` to always allow
# execution.
#
# @see Interface#reserve
# @param (see Interface#reserve)
# @return (see Interface#reserve)
def reserve(circuit, reserved_at, previous_reserved_at)
@storage.reserve(circuit, reserved_at, previous_reserved_at)
rescue StandardError => e
options.notifier.notify(:storage_failure, circuit: circuit, action: :reserve, error: e)
true
end

# This cache makes any storage fault tolerant, so this is always `true`
#
# @return [true]
Expand Down
23 changes: 23 additions & 0 deletions lib/faulty/storage/interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def reopen(circuit, opened_at, previous_opened_at)
# may be called more than once. If so, this method should return true
# only once, when the circuit transitions from open to closed.
#
# The backend should reset the reserved_at value to empty when closing
# the circuit.
#
# If the backend does not support locking or atomic operations, then
# it may always return true, but that could result in duplicate close
# notifications.
Expand All @@ -99,6 +102,26 @@ def close(circuit)
raise NotImplementedError
end

# Reserve an exclusive run for this circuit
#
# This is used when the circuit is half-open and the test run is being
# attempted. We need to make sure only a single run is allowed.
#
# The backend should store reserved_at and use it to serve future status
# requests. When setting reserved_at, the backend should atomically
# compare any existing value using previous_reserved_at. This ensures
# that mutltiple parallel processes can't reserve the circuit.
#
# The backend should return true if the reservation was successful, and
# false if it was not.
#
# If the backend does not support locking or atomic operations, then
# it may always return true, but will result in duplicate half-open test
# runs.
def reserve(circuit, reserved_at, previous_reserved_at)
raise NotImplementedError
end

# Lock the circuit in a given state
#
# No concurrency gurantees are provided for locking
Expand Down
18 changes: 16 additions & 2 deletions lib/faulty/storage/memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ def defaults
# The internal object for storing a circuit
#
# @private
MemoryCircuit = Struct.new(:state, :runs, :opened_at, :lock, :options) do
MemoryCircuit = Struct.new(:state, :runs, :opened_at, :reserved_at, :lock, :options) do
def initialize
self.state = Concurrent::Atom.new(:closed)
self.runs = Concurrent::MVar.new([], dup_on_deref: true)
self.opened_at = Concurrent::Atom.new(nil)
self.reserved_at = Concurrent::Atom.new(nil)
self.lock = nil
end

Expand All @@ -61,6 +62,7 @@ def status(circuit_options)
state: state.value,
lock: lock,
opened_at: opened_at.value,
reserved_at: reserved_at.value,
options: circuit_options
)
end
Expand Down Expand Up @@ -139,7 +141,19 @@ def reopen(circuit, opened_at, previous_opened_at)
def close(circuit)
memory = fetch(circuit)
memory.runs.modify { |_old| [] }
memory.state.compare_and_set(:open, :closed)
closed = memory.state.compare_and_set(:open, :closed)
memory.reserved_at.reset(nil) if closed
closed
end

# Reserve an exclusive run for this circuit
#
# @see Interface#reserve
# @param (see Interface#reserve)
# @return (see Interface#reserve)
def reserve(circuit, reserved_at, previous_reserved_at)
memory = fetch(circuit)
memory.reserved_at.compare_and_set(previous_reserved_at, reserved_at)
end

# Lock a circuit open or closed
Expand Down
6 changes: 6 additions & 0 deletions lib/faulty/storage/null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def close(_circuit)
true
end

# @param (see Interface#reserve)
# @return (see Interface#reserve)
def reserve(_circuit, _reserved_at, _previous_reserved_at)
true
end

# @param (see Interface#lock)
# @return (see Interface#lock)
def lock(_circuit, _state)
Expand Down
22 changes: 22 additions & 0 deletions lib/faulty/storage/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,25 @@ def close(circuit)
result = watch_exec(key, ['open']) do |m|
m.set(key, 'closed', ex: ex)
m.del(entries_key(circuit.name))
m.del(reserved_at_key(circuit.name))
end

result && result[0] == 'OK'
end

# Reserve an exclusive run for this circuit
#
# @see Interface#reserve
# @param (see Interface#reserve)
# @return (see Interface#reserve)
def reserve(circuit, reserved_at, previous_reserved_at)
key = reserved_at_key(circuit.name)
result = watch_exec(key, [previous_reserved_at.to_s]) do |m|
m.set(key, reserved_at, ex: options.circuit_ttl)
end
result && result[0] == 'OK'
end

# Lock a circuit open or closed
#
# The circuit_ttl does not apply to locks
Expand Down Expand Up @@ -228,18 +242,21 @@ def status(circuit)
futures[:state] = r.get(state_key(circuit.name))
futures[:lock] = r.get(lock_key(circuit.name))
futures[:opened_at] = r.get(opened_at_key(circuit.name))
futures[:reserved_at] = r.get(reserved_at_key(circuit.name))
futures[:entries] = r.lrange(entries_key(circuit.name), 0, -1)
end

state = futures[:state].value&.to_sym || :closed
opened_at = futures[:opened_at].value ? Float(futures[:opened_at].value) : nil
opened_at = Faulty.current_time - options.circuit_ttl if state == :open && opened_at.nil?
reserved_at = futures[:reserved_at].value ? Float(futures[:reserved_at].value) : nil

Faulty::Status.from_entries(
map_entries(futures[:entries].value),
state: state,
lock: futures[:lock].value&.to_sym,
opened_at: opened_at,
reserved_at: reserved_at,
options: circuit.options
)
end
Expand Down Expand Up @@ -321,6 +338,11 @@ def opened_at_key(circuit_name)
ckey(circuit_name, 'opened_at')
end

# @return [String] The key for circuit opened_at
def reserved_at_key(circuit_name)
ckey(circuit_name, 'reserved_at')
end

# Get the current key to add circuit names to
def list_key
key('list', current_list_block)
Expand Down
Loading