lib/sqewer/worker.rb in sqewer-1.0.0 vs lib/sqewer/worker.rb in sqewer-2.0.0
- old
+ new
@@ -7,10 +7,34 @@
class Sqewer::Worker
DEFAULT_NUM_THREADS = 4
SLEEP_SECONDS_ON_EMPTY_QUEUE = 1
THROTTLE_FACTOR = 2
+ # @return [Logger] The logger used for job execution
+ attr_reader :logger
+
+ # @return [Sqewer::Connection] The connection for sending and receiving messages
+ attr_reader :connection
+
+ # @return [Sqewer::Serializer] The serializer for unmarshalling and marshalling
+ attr_reader :serializer
+
+ # @return [Sqewer::MiddlewareStack] The stack used when executing the job
+ attr_reader :middleware_stack
+
+ # @return [Class] The class to use when instantiating the execution context
+ attr_reader :execution_context_class
+
+ # @return [Class] The class used to create the Submitter used by jobs to spawn other jobs
+ attr_reader :submitter_class
+
+ # @return [#perform] The isolator to use when executing each job
+ attr_reader :isolator
+
+ # @return [Fixnum] the number of threads to spin up
+ attr_reader :num_threads
+
# Returns the default Worker instance, configured based on the default components
#
# @return [Sqewer::Worker]
def self.default
@default ||= new
@@ -75,33 +99,28 @@
take_and_execute
}
end
end
- # Create a fiber-based provider thread. When the execution queue is exhausted, use
- # the fiber to take a new job and place it on the queue. We use a fiber to have a way
- # to "suspend" the polling loop in the SQS client when the local buffer queue fills up.
+ # Create the provider thread. When the execution queue is exhausted,
+ # grab new messages and place them on the local queue.
provider = Thread.new do
- feeder_fiber = Fiber.new do
- loop do
- break if @state.in_state?(:stopping)
- @connection.poll do |message_id, message_body|
- break if @state.in_state?(:stopping)
- Fiber.yield([message_id, message_body])
- end
- end
- end
-
loop do
- break if !feeder_fiber.alive?
break if stopping?
- if @execution_queue.length < (@num_threads * THROTTLE_FACTOR)
- @execution_queue << feeder_fiber.resume
+ if queue_has_capacity?
+ messages = @connection.receive_messages
+ if messages.any?
+ messages.each {|m| @execution_queue << m }
+ @logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any?
+ else
+ @logger.debug { "[worker] No messages received" }
+ Thread.pass
+ end
else
- @logger.debug "Suspending poller (%d items buffered)" % @execution_queue.length
- sleep 0.2
+ @logger.debug { "[worker] Suspending poller (%d items buffered)" % @execution_queue.length }
+ sleep 1
Thread.pass
end
end
end
@@ -152,49 +171,31 @@
def stopping?
@state.in_state?(:stopping)
end
+ def queue_has_capacity?
+ @execution_queue.length < (@num_threads * THROTTLE_FACTOR)
+ end
+ def handle_message(message)
+ return unless message.receipt_handle
+ return @connection.delete_message(message.receipt_handle) unless message.has_body?
+ @isolator.perform(self, message)
+ @connection.delete_message(message.receipt_handle)
+ end
+
def take_and_execute
- message_id, message_body = @execution_queue.pop(nonblock=true)
- return unless message_id
- return @connection.delete_message(message_id) unless message_body && !message_body.empty?
-
- @isolator.isolate do
- job = @middleware_stack.around_deserialization(@serializer, message_id, message_body) do
- @serializer.unserialize(message_body)
- end
-
- if job # if the serializer returns a nil or false
- t = Time.now
- submitter = @submitter_class.new(@connection, @serializer)
- context = @execution_context_class.new(submitter, {STR_logger => @logger})
-
- begin
- @middleware_stack.around_execution(job, context) do
- job.method(:run).arity.zero? ? job.run : job.run(context)
- end
- @logger.info { "[worker] Finished #{job.inspect} in %0.2fs" % (Time.now - t) }
- rescue => e
- @logger.error { "[worker] Failed #{job.inspect} with a #{e}" }
- raise e
- end
- end
- end
-
- @connection.delete_message(message_id)
+ message = @execution_queue.pop(nonblock=true)
+ handle_message(message)
rescue ThreadError # Queue is empty
sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
Thread.pass
- rescue SystemExit, SignalException, Interrupt => e # Time to quit
- @logger.error { "[worker] Signaled, will quit the consumer" }
- return
rescue => e # anything else, at or below StandardError that does not need us to quit
- @logger.fatal { "[worker] Failed #{message_id} with #{e}" }
- @logger.fatal(e.class)
- @logger.fatal(e.message)
- e.backtrace.each { |s| @logger.fatal{"\t#{s}"} }
+ @logger.error { "[worker] Failed #{message.inspect} with #{e}" }
+ @logger.error(e.class)
+ @logger.error(e.message)
+ e.backtrace.each { |s| @logger.error{"\t#{s}"} }
end
STR_logger = 'logger'
end