Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions lib/async/actor/proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ def call(id)
def initialize(target)
@target = target

@queue = ::Thread::Queue.new
@thread = __start__

# Define a finalizer to ensure the thread is closed:
::ObjectSpace.define_finalizer(self, Finalizer.new(@queue, @thread))
@queue = nil
@thread = nil
@guard = ::Thread::Mutex.new
end

# @parameter return_value [Symbol] One of :ignore, :promise or :wait.
def method_missing(*arguments, return_value: :wait, **options, &block)
__start__

unless return_value == :ignore
result = Variable.new
end
Expand All @@ -49,18 +49,40 @@ def method_missing(*arguments, return_value: :wait, **options, &block)

protected

def __kill__
@guard.synchronize do
@queue&.close
@queue = nil

@thread&.kill
@thread = nil
end
end

def __start__
::Thread.new do
::Kernel.Sync do |task|
while operation = @queue.pop
task.async do
arguments, options, block, result = operation
Variable.fulfill(result) do
@target.public_send(*arguments, **options, &block)
return if @thread&.alive?

@guard.synchronize do
return if @thread&.alive?

@queue&.close
@queue = ::Thread::Queue.new

@thread = ::Thread.new do
::Kernel.Sync do |task|
while operation = @queue.pop
task.async do
arguments, options, block, result = operation
Variable.fulfill(result) do
@target.public_send(*arguments, **options, &block)
end
end
end
end
end

# Define a finalizer to ensure the thread is closed:
::ObjectSpace.define_finalizer(self, Finalizer.new(@queue, @thread))
end
end
end
Expand Down
10 changes: 10 additions & 0 deletions test/async/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,14 @@

expect(actor).to be_a(Array)
end

it "can restart actor if thread is killed" do
actor = Async::Actor.new(Array.new)

expect(actor).to be_a(Array)

actor.__send__(:__kill__)

expect(actor).to be_a(Array)
end
end