lib/qs/daemon.rb in qs-0.3.0 vs lib/qs/daemon.rb in qs-0.4.0
- old
+ new
@@ -7,11 +7,11 @@
require 'qs/client'
require 'qs/daemon_data'
require 'qs/io_pipe'
require 'qs/logger'
require 'qs/payload_handler'
-require 'qs/redis_item'
+require 'qs/queue_item'
module Qs
module Daemon
@@ -31,11 +31,11 @@
attr_reader :daemon_data, :logger
attr_reader :signals_redis_key, :queue_redis_keys
# * Set the size of the client to the max workers + 1. This ensures we
# have 1 connection for fetching work from redis and at least 1
- # connection for each worker to requeue its job when hard-shutdown.
+ # connection for each worker to requeue its message when hard-shutdown.
def initialize
self.class.configuration.validate!
Qs.init
@daemon_data = DaemonData.new(self.class.configuration.to_hash)
@logger = @daemon_data.logger
@@ -94,12 +94,12 @@
wait_for_shutdown if wait
end
private
- def process(redis_item)
- Qs::PayloadHandler.new(self.daemon_data, redis_item).run
+ def process(queue_item)
+ Qs::PayloadHandler.new(self.daemon_data, queue_item).run
end
def work_loop
log "Starting work loop", :debug
setup_redis_and_ios
@@ -126,22 +126,22 @@
def build_worker_pool
wp = DatWorkerPool.new(
self.daemon_data.min_workers,
self.daemon_data.max_workers
- ){ |redis_item| process(redis_item) }
- wp.on_worker_error do |worker, exception, redis_item|
- handle_worker_exception(exception, redis_item)
+ ){ |queue_item| process(queue_item) }
+ wp.on_worker_error do |worker, exception, queue_item|
+ handle_worker_exception(exception, queue_item)
end
wp.on_worker_sleep{ @worker_available_io.write(SIGNAL) }
wp.start
wp
end
# * Shuffle the queue redis keys to avoid queue starvation. Redis will
- # pull jobs off queues in the order they are passed to the command, by
- # shuffling we ensure they are randomly ordered so every queue should
+ # pull messages off queues in the order they are passed to the command,
+ # by shuffling we ensure they are randomly ordered so every queue should
# get a chance.
# * Use 0 for the brpop timeout which means block indefinitely.
# * Rescue runtime errors so the daemon thread doesn't fail if redis is
# temporarily down. Sleep for a second to keep the thread from thrashing
# by repeatedly erroring if redis is down.
@@ -149,13 +149,13 @@
wait_for_available_worker
return unless @worker_pool.worker_available? && @signal.start?
begin
args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten
- redis_key, serialized_payload = @client.block_dequeue(*args)
+ redis_key, encoded_payload = @client.block_dequeue(*args)
if redis_key != @signals_redis_key
- @worker_pool.add_work(RedisItem.new(redis_key, serialized_payload))
+ @worker_pool.add_work(QueueItem.new(redis_key, encoded_payload))
end
rescue RuntimeError => exception
log "Error dequeueing #{exception.message.inspect}", :error
log exception.backtrace.join("\n"), :error
sleep 1
@@ -176,13 +176,13 @@
log "Shutting down, waiting up to #{timeout} seconds for work to finish"
else
log "Shutting down, waiting for work to finish"
end
@worker_pool.shutdown(timeout)
- log "Requeueing #{@worker_pool.work_items.size} job(s)"
+ log "Requeueing #{@worker_pool.work_items.size} message(s)"
@worker_pool.work_items.each do |ri|
- @client.prepend(ri.queue_redis_key, ri.serialized_payload)
+ @client.prepend(ri.queue_redis_key, ri.encoded_payload)
end
end
def wait_for_shutdown
@work_loop_thread.join if @work_loop_thread
@@ -194,21 +194,21 @@
end
# * This only catches errors that happen outside of running the payload
# handler. The only known use-case for this is dat worker pools
# hard-shutdown errors.
- # * If there isn't a redis item (this can happen when an idle worker is
+ # * If there isn't a queue item (this can happen when an idle worker is
# being forced to exit) then we don't need to do anything.
- # * If we never started processing the redis item, its safe to requeue it.
+ # * If we never started processing the queue item, its safe to requeue it.
# Otherwise it happened while processing so the payload handler caught
# it or it happened after the payload handler which we don't care about.
- def handle_worker_exception(exception, redis_item)
- return if redis_item.nil?
- if !redis_item.started
- log "Worker error, requeueing job because it hasn't started", :error
- @client.prepend(redis_item.queue_redis_key, redis_item.serialized_payload)
+ def handle_worker_exception(exception, queue_item)
+ return if queue_item.nil?
+ if !queue_item.started
+ log "Worker error, requeueing message because it hasn't started", :error
+ @client.prepend(queue_item.queue_redis_key, queue_item.encoded_payload)
else
- log "Worker error after job was processed, ignoring", :error
+ log "Worker error after message was processed, ignoring", :error
end
log "#{exception.class}: #{exception.message}", :error
log exception.backtrace.join("\n"), :error
end