lib/qs/daemon.rb in qs-0.3.0 vs lib/qs/daemon.rb in qs-0.4.0

- old
+ new

@@ -7,11 +7,11 @@ require 'qs/client' require 'qs/daemon_data' require 'qs/io_pipe' require 'qs/logger' require 'qs/payload_handler' -require 'qs/redis_item' +require 'qs/queue_item' module Qs module Daemon @@ -31,11 +31,11 @@ attr_reader :daemon_data, :logger attr_reader :signals_redis_key, :queue_redis_keys # * Set the size of the client to the max workers + 1. This ensures we # have 1 connection for fetching work from redis and at least 1 - # connection for each worker to requeue its job when hard-shutdown. + # connection for each worker to requeue its message when hard-shutdown. def initialize self.class.configuration.validate! Qs.init @daemon_data = DaemonData.new(self.class.configuration.to_hash) @logger = @daemon_data.logger @@ -94,12 +94,12 @@ wait_for_shutdown if wait end private - def process(redis_item) - Qs::PayloadHandler.new(self.daemon_data, redis_item).run + def process(queue_item) + Qs::PayloadHandler.new(self.daemon_data, queue_item).run end def work_loop log "Starting work loop", :debug setup_redis_and_ios @@ -126,22 +126,22 @@ def build_worker_pool wp = DatWorkerPool.new( self.daemon_data.min_workers, self.daemon_data.max_workers - ){ |redis_item| process(redis_item) } - wp.on_worker_error do |worker, exception, redis_item| - handle_worker_exception(exception, redis_item) + ){ |queue_item| process(queue_item) } + wp.on_worker_error do |worker, exception, queue_item| + handle_worker_exception(exception, queue_item) end wp.on_worker_sleep{ @worker_available_io.write(SIGNAL) } wp.start wp end # * Shuffle the queue redis keys to avoid queue starvation. Redis will - # pull jobs off queues in the order they are passed to the command, by - # shuffling we ensure they are randomly ordered so every queue should + # pull messages off queues in the order they are passed to the command, + # by shuffling we ensure they are randomly ordered so every queue should # get a chance. # * Use 0 for the brpop timeout which means block indefinitely. # * Rescue runtime errors so the daemon thread doesn't fail if redis is # temporarily down. Sleep for a second to keep the thread from thrashing # by repeatedly erroring if redis is down. @@ -149,13 +149,13 @@ wait_for_available_worker return unless @worker_pool.worker_available? && @signal.start? begin args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten - redis_key, serialized_payload = @client.block_dequeue(*args) + redis_key, encoded_payload = @client.block_dequeue(*args) if redis_key != @signals_redis_key - @worker_pool.add_work(RedisItem.new(redis_key, serialized_payload)) + @worker_pool.add_work(QueueItem.new(redis_key, encoded_payload)) end rescue RuntimeError => exception log "Error dequeueing #{exception.message.inspect}", :error log exception.backtrace.join("\n"), :error sleep 1 @@ -176,13 +176,13 @@ log "Shutting down, waiting up to #{timeout} seconds for work to finish" else log "Shutting down, waiting for work to finish" end @worker_pool.shutdown(timeout) - log "Requeueing #{@worker_pool.work_items.size} job(s)" + log "Requeueing #{@worker_pool.work_items.size} message(s)" @worker_pool.work_items.each do |ri| - @client.prepend(ri.queue_redis_key, ri.serialized_payload) + @client.prepend(ri.queue_redis_key, ri.encoded_payload) end end def wait_for_shutdown @work_loop_thread.join if @work_loop_thread @@ -194,21 +194,21 @@ end # * This only catches errors that happen outside of running the payload # handler. The only known use-case for this is dat worker pools # hard-shutdown errors. - # * If there isn't a redis item (this can happen when an idle worker is + # * If there isn't a queue item (this can happen when an idle worker is # being forced to exit) then we don't need to do anything. - # * If we never started processing the redis item, its safe to requeue it. + # * If we never started processing the queue item, its safe to requeue it. # Otherwise it happened while processing so the payload handler caught # it or it happened after the payload handler which we don't care about. - def handle_worker_exception(exception, redis_item) - return if redis_item.nil? - if !redis_item.started - log "Worker error, requeueing job because it hasn't started", :error - @client.prepend(redis_item.queue_redis_key, redis_item.serialized_payload) + def handle_worker_exception(exception, queue_item) + return if queue_item.nil? + if !queue_item.started + log "Worker error, requeueing message because it hasn't started", :error + @client.prepend(queue_item.queue_redis_key, queue_item.encoded_payload) else - log "Worker error after job was processed, ignoring", :error + log "Worker error after message was processed, ignoring", :error end log "#{exception.class}: #{exception.message}", :error log exception.backtrace.join("\n"), :error end