diff --git a/.gitignore b/.gitignore index b04a8c8..62a994a 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ /pkg/ /spec/reports/ /tmp/ +/Gemfile.lock # rspec failure tracking .rspec_status diff --git a/Gemfile b/Gemfile index 8ae182e..122455d 100644 --- a/Gemfile +++ b/Gemfile @@ -4,6 +4,10 @@ gemspec gem "pg" +group :development, :test do + gem 'activerecord' +end + group :development do gem 'pry' end diff --git a/lib/async/postgres.rb b/lib/async/postgres.rb index 14b5a19..c704694 100644 --- a/lib/async/postgres.rb +++ b/lib/async/postgres.rb @@ -21,6 +21,7 @@ require_relative "postgres/version" require_relative "postgres/connection" require_relative "postgres/pool" +require_relative "postgres/replace_connection_handler" if defined?(Rails) require 'pg' diff --git a/lib/async/postgres/condition.rb b/lib/async/postgres/condition.rb new file mode 100644 index 0000000..a50a2f0 --- /dev/null +++ b/lib/async/postgres/condition.rb @@ -0,0 +1,41 @@ +module Async + module Postgres + class Condition + def initialize + @waiters = [] + end + + # @param timeout [Number, NilClass] + # @raise Async::TimeoutError + def wait(timeout = nil) + fiber = Fiber.current + @waiters.push(fiber) + + Async::Task.current.with_timeout(timeout) do |timer| + begin + Async::Task.yield + timer.cancel + rescue Async::TimeoutError => e + @waiters.delete(fiber) + raise e + end + end if timeout + end + + # @param immediate [Boolean] + def signal(immediate = true) + return if @waiters.empty? + fiber = @waiters.shift + signal unless fiber.alive? + + if immediate + fiber.resume + else + Async::Task.current.reactor << fiber + end + + nil + end + end + end +end diff --git a/lib/async/postgres/connection_handler.rb b/lib/async/postgres/connection_handler.rb new file mode 100644 index 0000000..5377db3 --- /dev/null +++ b/lib/async/postgres/connection_handler.rb @@ -0,0 +1,29 @@ +require_relative 'connection_pool' + +module Async + module Postgres + class ConnectionHandler < ActiveRecord::ConnectionAdapters::ConnectionHandler + def establish_connection(config) + resolver = ConnectionSpecification::Resolver.new(Base.configurations) + spec = resolver.spec(config) + + remove_connection(spec.name) + + message_bus = ActiveSupport::Notifications.instrumenter + payload = { + connection_id: object_id + } + if spec + payload[:spec_name] = spec.name + payload[:config] = spec.config + end + + message_bus.instrument("!connection.active_record", payload) do + owner_to_pool[spec.name] = Async::Postgres::ConnectionPool.new(spec) + end + + owner_to_pool[spec.name] + end + end + end +end diff --git a/lib/async/postgres/connection_pool.rb b/lib/async/postgres/connection_pool.rb new file mode 100644 index 0000000..d97c053 --- /dev/null +++ b/lib/async/postgres/connection_pool.rb @@ -0,0 +1,12 @@ +require_relative 'queue' + +module Async + module Postgres + class ConnectionPool < ActiveRecord::ConnectionAdapters::ConnectionPool + def initialize(*) + super + @available = Async::Postgres::Queue.new(self) + end + end + end +end diff --git a/lib/async/postgres/patch_reactor.rb b/lib/async/postgres/patch_reactor.rb new file mode 100644 index 0000000..1fb7058 --- /dev/null +++ b/lib/async/postgres/patch_reactor.rb @@ -0,0 +1,7 @@ +require 'async/reactor' + +module Async + class Reactor < Node + attr_accessor :postgres_pools + end +end diff --git a/lib/async/postgres/pool.rb b/lib/async/postgres/pool.rb index b98abe1..e33cbf1 100644 --- a/lib/async/postgres/pool.rb +++ b/lib/async/postgres/pool.rb @@ -18,50 +18,12 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -require 'async/reactor' - module Async - class Reactor < Node - attr_accessor :postgres_pools - end - module Postgres - class Proxy - def initialize(connection_string, task: Task.current) - @connection_string = connection_string - - pools = task.reactor.postgres_pools ||= {} - - @pool = pools[@connection_string] ||= Pool.new do - Connection.new(@connection_string) - end - end - - def close - @pool.close - end - - def async_exec(*args) - @pool.acquire do |connection| - connection.async_exec(*args) - end - end - - def respond_to?(*args) - @pool.acquire do |connection| - connection.respond_to?(*args) - end - end - - def method_missing(*args, &block) - @pool.acquire do |connection| - connection.send(*args, &block) - end - end - end - - # This pool doesn't impose a maximum number of open resources, but it WILL block if there are no available resources and trying to allocate another one fails. class Pool + # This pool doesn't impose a maximum number of open resources, but it WILL block + # if there are no available resources and trying to allocate another one fails. + def initialize(&block) @available = [] @waiting = [] diff --git a/lib/async/postgres/proxy.rb b/lib/async/postgres/proxy.rb new file mode 100644 index 0000000..eab2ca3 --- /dev/null +++ b/lib/async/postgres/proxy.rb @@ -0,0 +1,40 @@ +require_relative 'patch_reactor' +require_relative 'pool' + +module Async + module Postgres + class Proxy + def initialize(connection_string, task: Task.current) + @connection_string = connection_string + + pools = task.reactor.postgres_pools ||= {} + + @pool = pools[@connection_string] ||= Pool.new do + Connection.new(@connection_string) + end + end + + def close + @pool.close + end + + def async_exec(*args) + @pool.acquire do |connection| + connection.async_exec(*args) + end + end + + def respond_to?(*args) + @pool.acquire do |connection| + connection.respond_to?(*args) + end + end + + def method_missing(*args, &block) + @pool.acquire do |connection| + connection.send(*args, &block) + end + end + end + end +end diff --git a/lib/async/postgres/queue.rb b/lib/async/postgres/queue.rb new file mode 100644 index 0000000..232aa41 --- /dev/null +++ b/lib/async/postgres/queue.rb @@ -0,0 +1,29 @@ +require_relative 'condition' + +module Async + module Postgres + class Queue < ActiveRecord::ConnectionAdapters::ConnectionPool::ConnectionLeasingQueue + def initialize(*) + super + @cond = Async::Postgres::Condition.new + end + + def wait_poll(timeout) + @num_waiting += 1 + t0 = Time.now + + ActiveSupport::Dependencies.interlock.permit_concurrent_loads do + @cond.wait(timeout) + end + return remove + rescue Async::TimeoutError => _ + elapsed = Time.now - t0 + msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" % + [timeout, elapsed] + raise ActiveRecord::ConnectionTimeoutError, msg + ensure + @num_waiting -= 1 + end + end + end +end diff --git a/lib/async/postgres/replace_connection_handler.rb b/lib/async/postgres/replace_connection_handler.rb new file mode 100644 index 0000000..670a996 --- /dev/null +++ b/lib/async/postgres/replace_connection_handler.rb @@ -0,0 +1,4 @@ +require 'active_record' +require_relative 'connection_handler' + +ActiveRecord::Base.default_connection_handler = Async::Postgres::ConnectionHandler.new