Sha256: c1bb665d29b081640c165c0ed300082e9ebe8739a8cded458f67fb0d01a556fb
Contents?: true
Size: 1.64 KB
Versions: 1
Compression:
Stored size: 1.64 KB
Contents
require 'adrian/failure_handler' module Adrian class Dispatcher attr_reader :running def initialize(options = {}) @failure_handler = FailureHandler.new @stop_when_done = !!options[:stop_when_done] @stop_when_signalled = options.fetch(:stop_when_signalled, true) @sleep = options[:sleep] || 0.5 @options = options end def on_failure(*exceptions, &blk) @failure_handler.add_rule(*exceptions, &blk) end def on_done(&blk) @failure_handler.add_rule(nil, &blk) end def start(queue, worker_class) trap_stop_signals if @stop_when_signalled @running = true while @running do begin item = queue.pop rescue Adrian::Queue::ItemTooOldError => e if handler = @failure_handler.handle(e) handler.call(e.item, nil, e) end item = nil next end if item delegate_work(item, worker_class) else if @stop_when_done stop else sleep(@sleep) if @sleep end end end end def stop @running = false end def trap_stop_signals Signal.trap('TERM') { stop } Signal.trap('INT') { stop } end def delegate_work(item, worker_class) worker = worker_class.new(item) worker.report_to(self) worker.perform end def work_done(item, worker, exception = nil) if handler = @failure_handler.handle(exception) handler.call(item, worker, exception) else raise exception if exception end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
adrian-2.0.2 | lib/adrian/dispatcher.rb |