Sha256: 4ee0f8f1452e1e4f89a25c93288816cc62916c08ea51ca8d68c4c0f7d2b9ec37
Contents?: true
Size: 1.92 KB
Versions: 2
Compression:
Stored size: 1.92 KB
Contents
module QueueKit module Worker include Instrumentable def initialize(queue, options = {}) @queue = queue @processor = options.fetch(:processor) { method(:process) } @cooler = options.fetch(:cooler) { method(:cool) } @error_handler = options.fetch(:error_handler) { method(:handle_error) } @stopped = true instrumenter_from(options) end def process(item) raise NotImplementedError, "This worker can't do anything with #{item.inspect}" end def cool end def handle_error(err) raise err end def trap_signals(signal_handler) SignalChecker.trap(self, signal_handler) end def run start interval_debugger = lambda { "worker.interval" } loop do work break unless working? debug(&interval_debugger) end end def procline(string) $0 = "QueueKit-#{QueueKit::VERSION}: #{string}" debug { ["worker.procline", {:message => string}] } end def work wrap_error { work! } end def work! if item = @queue.pop set_working_procline @processor.call(item) set_popping_procline else @cooler.call if working? end end def wrap_error yield rescue Exception => exception @error_handler.call(exception) end def name @name ||= "#{self.class} #{Socket.gethostname}:#{Process.pid}" end def start instrument "worker.start" set_popping_procline @stopped = false end def stop instrument "worker.stop" @stopped = true end def working? !@stopped end def set_working_procline procline("Processing since #{Time.now.to_i}") end def set_popping_procline @last_job_at = Time.now procline("Waiting since #{@last_job_at.to_i}") end def default_instrument_options {:worker => self} end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
queue_kit-0.0.9 | lib/queue_kit/worker.rb |
queue_kit-0.0.8 | lib/queue_kit/worker.rb |