diff --git a/lib/faulty/circuit.rb b/lib/faulty/circuit.rb index 286b63d..58b8328 100644 --- a/lib/faulty/circuit.rb +++ b/lib/faulty/circuit.rb @@ -308,9 +308,11 @@ def run(cache: nil, &block) return cached_value if !cached_value.nil? && !cache_should_refresh?(cache) current_status = status - return run_skipped(cached_value) unless current_status.can_run? - - run_exec(current_status, cached_value, cache, &block) + if current_status.can_run? && reserve(current_status) + run_exec(current_status, cached_value, cache, &block) + else + run_skipped(cached_value) unless current_status.can_run? + end end # Force the circuit to stay open until unlocked @@ -403,6 +405,16 @@ def run_skipped(cached_value) cached_value end + # Reserves execution for this circuit when it is half-open + # + # This prevents concurrent evaluation from allowing multiple simultaneous + # runs for half-open circuits. + def reserve(status) + return true unless status.half_open? + + storage.reserve(self, Faulty.current_time, status.reserved_at) + end + # Execute a run # # @param cached_value The cached value if one is available diff --git a/lib/faulty/status.rb b/lib/faulty/status.rb index cdfb392..c3f63f8 100644 --- a/lib/faulty/status.rb +++ b/lib/faulty/status.rb @@ -34,10 +34,12 @@ class Faulty :state, :lock, :opened_at, + :reserved_at, :failure_rate, :sample_size, :options, - :stub + :stub, + :current_time ) class Status @@ -66,7 +68,8 @@ class Status # sample_size # @return [Status] def self.from_entries(entries, **hash) - window_start = Faulty.current_time - hash[:options].evaluation_window + current_time = Faulty.current_time + window_start = current_time - hash[:options].evaluation_window size = entries.size i = 0 failures = 0 @@ -84,7 +87,8 @@ def self.from_entries(entries, **hash) new(hash.merge( sample_size: sample_size, - failure_rate: sample_size.zero? ? 0.0 : failures.to_f / sample_size + failure_rate: sample_size.zero? ? 0.0 : failures.to_f / sample_size, + current_time: current_time )) end @@ -94,7 +98,7 @@ def self.from_entries(entries, **hash) # # @return [Boolean] True if open def open? - state == :open && opened_at + options.cool_down > Faulty.current_time + state == :open && opened_at + options.cool_down > current_time end # Whether the circuit is closed @@ -112,7 +116,7 @@ def closed? # # @return [Boolean] True if half-open def half_open? - state == :open && opened_at + options.cool_down <= Faulty.current_time + state == :open && opened_at + options.cool_down <= current_time end # Whether the circuit is locked open @@ -129,6 +133,12 @@ def locked_closed? lock == :closed end + def reserved? + return false unless reserved_at + + state == :open && reserved_at + options.cool_down >= current_time + end + # Whether the circuit can be run # # Takes the circuit state, locks and cooldown into account @@ -136,6 +146,7 @@ def locked_closed? # @return [Boolean] True if the circuit can be run def can_run? return false if locked_open? + return false if reserved? closed? || locked_closed? || half_open? end @@ -166,7 +177,8 @@ def defaults state: :closed, failure_rate: 0.0, sample_size: 0, - stub: false + stub: false, + current_time: Faulty.current_time } end end diff --git a/lib/faulty/storage/fault_tolerant_proxy.rb b/lib/faulty/storage/fault_tolerant_proxy.rb index 89a51e0..8fd83e7 100644 --- a/lib/faulty/storage/fault_tolerant_proxy.rb +++ b/lib/faulty/storage/fault_tolerant_proxy.rb @@ -177,6 +177,21 @@ def status(circuit) stub_status(circuit) end + # Safely reserve execution of a circuit + # + # If the backend is unavailable, this returns `true` to always allow + # execution. + # + # @see Interface#reserve + # @param (see Interface#reserve) + # @return (see Interface#reserve) + def reserve(circuit, reserved_at, previous_reserved_at) + @storage.reserve(circuit, reserved_at, previous_reserved_at) + rescue StandardError => e + options.notifier.notify(:storage_failure, circuit: circuit, action: :reserve, error: e) + true + end + # This cache makes any storage fault tolerant, so this is always `true` # # @return [true] diff --git a/lib/faulty/storage/interface.rb b/lib/faulty/storage/interface.rb index c8d5bf2..9f36bf2 100644 --- a/lib/faulty/storage/interface.rb +++ b/lib/faulty/storage/interface.rb @@ -90,6 +90,9 @@ def reopen(circuit, opened_at, previous_opened_at) # may be called more than once. If so, this method should return true # only once, when the circuit transitions from open to closed. # + # The backend should reset the reserved_at value to empty when closing + # the circuit. + # # If the backend does not support locking or atomic operations, then # it may always return true, but that could result in duplicate close # notifications. @@ -99,6 +102,26 @@ def close(circuit) raise NotImplementedError end + # Reserve an exclusive run for this circuit + # + # This is used when the circuit is half-open and the test run is being + # attempted. We need to make sure only a single run is allowed. + # + # The backend should store reserved_at and use it to serve future status + # requests. When setting reserved_at, the backend should atomically + # compare any existing value using previous_reserved_at. This ensures + # that mutltiple parallel processes can't reserve the circuit. + # + # The backend should return true if the reservation was successful, and + # false if it was not. + # + # If the backend does not support locking or atomic operations, then + # it may always return true, but will result in duplicate half-open test + # runs. + def reserve(circuit, reserved_at, previous_reserved_at) + raise NotImplementedError + end + # Lock the circuit in a given state # # No concurrency gurantees are provided for locking diff --git a/lib/faulty/storage/memory.rb b/lib/faulty/storage/memory.rb index 38d2d70..41530bc 100644 --- a/lib/faulty/storage/memory.rb +++ b/lib/faulty/storage/memory.rb @@ -41,11 +41,12 @@ def defaults # The internal object for storing a circuit # # @private - MemoryCircuit = Struct.new(:state, :runs, :opened_at, :lock, :options) do + MemoryCircuit = Struct.new(:state, :runs, :opened_at, :reserved_at, :lock, :options) do def initialize self.state = Concurrent::Atom.new(:closed) self.runs = Concurrent::MVar.new([], dup_on_deref: true) self.opened_at = Concurrent::Atom.new(nil) + self.reserved_at = Concurrent::Atom.new(nil) self.lock = nil end @@ -61,6 +62,7 @@ def status(circuit_options) state: state.value, lock: lock, opened_at: opened_at.value, + reserved_at: reserved_at.value, options: circuit_options ) end @@ -139,7 +141,19 @@ def reopen(circuit, opened_at, previous_opened_at) def close(circuit) memory = fetch(circuit) memory.runs.modify { |_old| [] } - memory.state.compare_and_set(:open, :closed) + closed = memory.state.compare_and_set(:open, :closed) + memory.reserved_at.reset(nil) if closed + closed + end + + # Reserve an exclusive run for this circuit + # + # @see Interface#reserve + # @param (see Interface#reserve) + # @return (see Interface#reserve) + def reserve(circuit, reserved_at, previous_reserved_at) + memory = fetch(circuit) + memory.reserved_at.compare_and_set(previous_reserved_at, reserved_at) end # Lock a circuit open or closed diff --git a/lib/faulty/storage/null.rb b/lib/faulty/storage/null.rb index 2833208..1d84256 100644 --- a/lib/faulty/storage/null.rb +++ b/lib/faulty/storage/null.rb @@ -46,6 +46,12 @@ def close(_circuit) true end + # @param (see Interface#reserve) + # @return (see Interface#reserve) + def reserve(_circuit, _reserved_at, _previous_reserved_at) + true + end + # @param (see Interface#lock) # @return (see Interface#lock) def lock(_circuit, _state) diff --git a/lib/faulty/storage/redis.rb b/lib/faulty/storage/redis.rb index 3b96c91..7a7e5f8 100644 --- a/lib/faulty/storage/redis.rb +++ b/lib/faulty/storage/redis.rb @@ -174,11 +174,25 @@ def close(circuit) result = watch_exec(key, ['open']) do |m| m.set(key, 'closed', ex: ex) m.del(entries_key(circuit.name)) + m.del(reserved_at_key(circuit.name)) end result && result[0] == 'OK' end + # Reserve an exclusive run for this circuit + # + # @see Interface#reserve + # @param (see Interface#reserve) + # @return (see Interface#reserve) + def reserve(circuit, reserved_at, previous_reserved_at) + key = reserved_at_key(circuit.name) + result = watch_exec(key, [previous_reserved_at.to_s]) do |m| + m.set(key, reserved_at, ex: options.circuit_ttl) + end + result && result[0] == 'OK' + end + # Lock a circuit open or closed # # The circuit_ttl does not apply to locks @@ -228,18 +242,21 @@ def status(circuit) futures[:state] = r.get(state_key(circuit.name)) futures[:lock] = r.get(lock_key(circuit.name)) futures[:opened_at] = r.get(opened_at_key(circuit.name)) + futures[:reserved_at] = r.get(reserved_at_key(circuit.name)) futures[:entries] = r.lrange(entries_key(circuit.name), 0, -1) end state = futures[:state].value&.to_sym || :closed opened_at = futures[:opened_at].value ? Float(futures[:opened_at].value) : nil opened_at = Faulty.current_time - options.circuit_ttl if state == :open && opened_at.nil? + reserved_at = futures[:reserved_at].value ? Float(futures[:reserved_at].value) : nil Faulty::Status.from_entries( map_entries(futures[:entries].value), state: state, lock: futures[:lock].value&.to_sym, opened_at: opened_at, + reserved_at: reserved_at, options: circuit.options ) end @@ -321,6 +338,11 @@ def opened_at_key(circuit_name) ckey(circuit_name, 'opened_at') end + # @return [String] The key for circuit opened_at + def reserved_at_key(circuit_name) + ckey(circuit_name, 'reserved_at') + end + # Get the current key to add circuit names to def list_key key('list', current_list_block)