lib/async/container/threaded.rb in async-container-0.9.0 vs lib/async/container/threaded.rb in async-container-0.10.0
- old
+ new
@@ -18,13 +18,12 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
require 'async/reactor'
require 'thread'
+require_relative 'statistics'
-require 'async/io/notification'
-
module Async
module Container
# Manages a reactor within one or more threads.
class Threaded
class Instance
@@ -35,48 +34,103 @@
def name= value
@thread.name = value
end
end
- def initialize(concurrency: 1, name: nil, &block)
- @reactors = concurrency.times.collect do
- Async::Reactor.new
+ def self.run(*args, &block)
+ self.new.run(*args, &block)
+ end
+
+ def initialize
+ @threads = []
+ @running = true
+ @statistics = Statistics.new
+ end
+
+ attr :statistics
+
+ def run(count: Container.processor_count, **options, &block)
+ count.times do
+ async(**options, &block)
end
- @threads = @reactors.collect do |reactor|
- ::Thread.new do
- thread = ::Thread.current
-
- thread.abort_on_exception = true
- thread.name = name if name
-
+ return self
+ end
+
+ def spawn(name: nil, restart: false, &block)
+ @statistics.spawn!
+
+ thread = ::Thread.new do
+ thread = ::Thread.current
+
+ thread.report_on_exception = false
+ thread.name = name if name
+
+ instance = Instance.new(thread)
+
+ while @running
begin
- reactor.run(Instance.new(thread), &block)
- rescue Interrupt
- # Exit cleanly.
+ yield instance
+ rescue Exception => exception
+ Async.logger.error(self) {exception}
+
+ @statistics.failure!
end
+
+ if restart
+ @statistics.restart!
+ else
+ break
+ end
end
end
- @finished = nil
+ @threads << thread
+
+ return self
end
+ def async(name: nil, restart: false, &block)
+ spawn do |instance|
+ begin
+ Async::Reactor.run(instance, &block)
+ rescue Interrupt
+ # Graceful exit.
+ end
+ end
+
+ return self
+ end
+
def self.multiprocess?
false
end
def wait
- return if @finished
+ yield if block_given?
@threads.each(&:join)
-
- @finished = true
+ @threads.clear
+
+ return nil
+ rescue Interrupt
+ # Graceful exit.
+ Async.logger.debug(self) {$!}
end
- def stop
- @reactors.each(&:stop)
+ # Gracefully shut down all reactors.
+ def stop(graceful = true, &block)
+ @running = false
- wait
+ if graceful
+ @threads.each{|thread| thread.raise(Interrupt)}
+ else
+ @threads.each(&:kill)
+ end
+
+ self.wait(&block)
+ ensure
+ @running = true
end
end
end
end