Sha256: 3809290db33efa06eb5619f6fdbf9b31b394cd61625c7bf8203ca40a62395ed1

Contents?: true

Size: 1.64 KB

Versions: 8

Compression:

Stored size: 1.64 KB

Contents

require 'adrian/failure_handler'

module Adrian
  class Dispatcher
    attr_reader :running

    def initialize(options = {})
      @failure_handler     = FailureHandler.new
      @stop_when_done      = !!options[:stop_when_done]
      @stop_when_signalled = options.fetch(:stop_when_signalled, true)
      @sleep               = options[:sleep] || 0.5
      @options             = options
    end

    def on_failure(*exceptions)
      @failure_handler.add_rule(*exceptions, &Proc.new)
    end

    def on_done
      @failure_handler.add_rule(nil, &Proc.new)
    end

    def start(queue, worker_class)
      trap_stop_signals if @stop_when_signalled
      @running = true

      while @running do
        begin
          item = queue.pop
        rescue Adrian::Queue::ItemTooOldError => e
          if handler = @failure_handler.handle(e)
            handler.call(e.item, nil, e)
          end
          item = nil
          next
        end

        if item
          delegate_work(item, worker_class)
        else
          if @stop_when_done
            stop
          else
            sleep(@sleep) if @sleep
          end
        end
      end
    end

    def stop
      @running = false
    end

    def trap_stop_signals
      Signal.trap('TERM') { stop }
      Signal.trap('INT')  { stop }
    end

    def delegate_work(item, worker_class)
      worker = worker_class.new(item)
      worker.report_to(self)
      worker.perform
    end

    def work_done(item, worker, exception = nil)
      if handler = @failure_handler.handle(exception)
        handler.call(item, worker, exception)
      else
        raise exception if exception
      end
    end

  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
adrian-2.0.1 lib/adrian/dispatcher.rb
adrian-2.0.0 lib/adrian/dispatcher.rb
adrian-1.5.0 lib/adrian/dispatcher.rb
adrian-1.4.0 lib/adrian/dispatcher.rb
adrian-1.3.3 lib/adrian/dispatcher.rb
adrian-1.3.2 lib/adrian/dispatcher.rb
adrian-1.3.1 lib/adrian/dispatcher.rb
adrian-1.3.0 lib/adrian/dispatcher.rb