Skip to content
This repository was archived by the owner on May 22, 2021. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/pkg/
/spec/reports/
/tmp/
/Gemfile.lock

# rspec failure tracking
.rspec_status
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ gemspec

gem "pg"

group :development, :test do
gem 'activerecord'
end

group :development do
gem 'pry'
end
Expand Down
1 change: 1 addition & 0 deletions lib/async/postgres.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
require_relative "postgres/version"
require_relative "postgres/connection"
require_relative "postgres/pool"
require_relative "postgres/replace_connection_handler" if defined?(Rails)

require 'pg'

Expand Down
41 changes: 41 additions & 0 deletions lib/async/postgres/condition.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
module Async
module Postgres
class Condition
def initialize
@waiters = []
end

# @param timeout [Number, NilClass]
# @raise Async::TimeoutError
def wait(timeout = nil)
fiber = Fiber.current
@waiters.push(fiber)

Async::Task.current.with_timeout(timeout) do |timer|
begin
Async::Task.yield
timer.cancel
rescue Async::TimeoutError => e
@waiters.delete(fiber)
raise e
end
end if timeout
end

# @param immediate [Boolean]
def signal(immediate = true)
return if @waiters.empty?
fiber = @waiters.shift
signal unless fiber.alive?

if immediate
fiber.resume
else
Async::Task.current.reactor << fiber
end

nil
end
end
end
end
29 changes: 29 additions & 0 deletions lib/async/postgres/connection_handler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
require_relative 'connection_pool'

module Async
module Postgres
class ConnectionHandler < ActiveRecord::ConnectionAdapters::ConnectionHandler
def establish_connection(config)
resolver = ConnectionSpecification::Resolver.new(Base.configurations)
spec = resolver.spec(config)

remove_connection(spec.name)

message_bus = ActiveSupport::Notifications.instrumenter
payload = {
connection_id: object_id
}
if spec
payload[:spec_name] = spec.name
payload[:config] = spec.config
end

message_bus.instrument("!connection.active_record", payload) do
owner_to_pool[spec.name] = Async::Postgres::ConnectionPool.new(spec)
end

owner_to_pool[spec.name]
end
end
end
end
12 changes: 12 additions & 0 deletions lib/async/postgres/connection_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require_relative 'queue'

module Async
module Postgres
class ConnectionPool < ActiveRecord::ConnectionAdapters::ConnectionPool
def initialize(*)
super
@available = Async::Postgres::Queue.new(self)
end
end
end
end
7 changes: 7 additions & 0 deletions lib/async/postgres/patch_reactor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require 'async/reactor'

module Async
class Reactor < Node
attr_accessor :postgres_pools
end
end
44 changes: 3 additions & 41 deletions lib/async/postgres/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,12 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require 'async/reactor'

module Async
class Reactor < Node
attr_accessor :postgres_pools
end

module Postgres
class Proxy
def initialize(connection_string, task: Task.current)
@connection_string = connection_string

pools = task.reactor.postgres_pools ||= {}

@pool = pools[@connection_string] ||= Pool.new do
Connection.new(@connection_string)
end
end

def close
@pool.close
end

def async_exec(*args)
@pool.acquire do |connection|
connection.async_exec(*args)
end
end

def respond_to?(*args)
@pool.acquire do |connection|
connection.respond_to?(*args)
end
end

def method_missing(*args, &block)
@pool.acquire do |connection|
connection.send(*args, &block)
end
end
end

# This pool doesn't impose a maximum number of open resources, but it WILL block if there are no available resources and trying to allocate another one fails.
class Pool
# This pool doesn't impose a maximum number of open resources, but it WILL block
# if there are no available resources and trying to allocate another one fails.

def initialize(&block)
@available = []
@waiting = []
Expand Down
40 changes: 40 additions & 0 deletions lib/async/postgres/proxy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
require_relative 'patch_reactor'
require_relative 'pool'

module Async
module Postgres
class Proxy
def initialize(connection_string, task: Task.current)
@connection_string = connection_string

pools = task.reactor.postgres_pools ||= {}

@pool = pools[@connection_string] ||= Pool.new do
Connection.new(@connection_string)
end
end

def close
@pool.close
end

def async_exec(*args)
@pool.acquire do |connection|
connection.async_exec(*args)
end
end

def respond_to?(*args)
@pool.acquire do |connection|
connection.respond_to?(*args)
end
end

def method_missing(*args, &block)
@pool.acquire do |connection|
connection.send(*args, &block)
end
end
end
end
end
29 changes: 29 additions & 0 deletions lib/async/postgres/queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
require_relative 'condition'

module Async
module Postgres
class Queue < ActiveRecord::ConnectionAdapters::ConnectionPool::ConnectionLeasingQueue
def initialize(*)
super
@cond = Async::Postgres::Condition.new
end

def wait_poll(timeout)
@num_waiting += 1
t0 = Time.now

ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
@cond.wait(timeout)
end
return remove
rescue Async::TimeoutError => _
elapsed = Time.now - t0
msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" %
[timeout, elapsed]
raise ActiveRecord::ConnectionTimeoutError, msg
ensure
@num_waiting -= 1
end
end
end
end
4 changes: 4 additions & 0 deletions lib/async/postgres/replace_connection_handler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
require 'active_record'
require_relative 'connection_handler'

ActiveRecord::Base.default_connection_handler = Async::Postgres::ConnectionHandler.new