Sha256: 4ee0f8f1452e1e4f89a25c93288816cc62916c08ea51ca8d68c4c0f7d2b9ec37

Contents?: true

Size: 1.92 KB

Versions: 2

Compression:

Stored size: 1.92 KB

Contents

module QueueKit
  module Worker
    include Instrumentable

    def initialize(queue, options = {})
      @queue = queue
      @processor = options.fetch(:processor) { method(:process) }
      @cooler = options.fetch(:cooler) { method(:cool) }
      @error_handler = options.fetch(:error_handler) { method(:handle_error) }
      @stopped = true

      instrumenter_from(options)
    end

    def process(item)
      raise NotImplementedError, "This worker can't do anything with #{item.inspect}"
    end

    def cool
    end

    def handle_error(err)
      raise err
    end

    def trap_signals(signal_handler)
      SignalChecker.trap(self, signal_handler)
    end

    def run
      start
      interval_debugger = lambda { "worker.interval" }

      loop do
        work
        break unless working?
        debug(&interval_debugger)
      end
    end

    def procline(string)
      $0 = "QueueKit-#{QueueKit::VERSION}: #{string}"
      debug { ["worker.procline", {:message => string}] }
    end

    def work
      wrap_error { work! }
    end

    def work!
      if item = @queue.pop
        set_working_procline
        @processor.call(item)
        set_popping_procline
      else
        @cooler.call if working?
      end
    end

    def wrap_error
      yield
    rescue Exception => exception
      @error_handler.call(exception)
    end

    def name
      @name ||= "#{self.class} #{Socket.gethostname}:#{Process.pid}"
    end

    def start
      instrument "worker.start"
      set_popping_procline
      @stopped = false
    end

    def stop
      instrument "worker.stop"
      @stopped = true
    end

    def working?
      !@stopped
    end

    def set_working_procline
      procline("Processing since #{Time.now.to_i}")
    end

    def set_popping_procline
      @last_job_at = Time.now
      procline("Waiting since #{@last_job_at.to_i}")
    end

    def default_instrument_options
      {:worker => self}
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
queue_kit-0.0.9 lib/queue_kit/worker.rb
queue_kit-0.0.8 lib/queue_kit/worker.rb