lib/sqewer/worker.rb in sqewer-1.0.0 vs lib/sqewer/worker.rb in sqewer-2.0.0

- old
+ new

@@ -7,10 +7,34 @@ class Sqewer::Worker DEFAULT_NUM_THREADS = 4 SLEEP_SECONDS_ON_EMPTY_QUEUE = 1 THROTTLE_FACTOR = 2 + # @return [Logger] The logger used for job execution + attr_reader :logger + + # @return [Sqewer::Connection] The connection for sending and receiving messages + attr_reader :connection + + # @return [Sqewer::Serializer] The serializer for unmarshalling and marshalling + attr_reader :serializer + + # @return [Sqewer::MiddlewareStack] The stack used when executing the job + attr_reader :middleware_stack + + # @return [Class] The class to use when instantiating the execution context + attr_reader :execution_context_class + + # @return [Class] The class used to create the Submitter used by jobs to spawn other jobs + attr_reader :submitter_class + + # @return [#perform] The isolator to use when executing each job + attr_reader :isolator + + # @return [Fixnum] the number of threads to spin up + attr_reader :num_threads + # Returns the default Worker instance, configured based on the default components # # @return [Sqewer::Worker] def self.default @default ||= new @@ -75,33 +99,28 @@ take_and_execute } end end - # Create a fiber-based provider thread. When the execution queue is exhausted, use - # the fiber to take a new job and place it on the queue. We use a fiber to have a way - # to "suspend" the polling loop in the SQS client when the local buffer queue fills up. + # Create the provider thread. When the execution queue is exhausted, + # grab new messages and place them on the local queue. provider = Thread.new do - feeder_fiber = Fiber.new do - loop do - break if @state.in_state?(:stopping) - @connection.poll do |message_id, message_body| - break if @state.in_state?(:stopping) - Fiber.yield([message_id, message_body]) - end - end - end - loop do - break if !feeder_fiber.alive? break if stopping? - if @execution_queue.length < (@num_threads * THROTTLE_FACTOR) - @execution_queue << feeder_fiber.resume + if queue_has_capacity? + messages = @connection.receive_messages + if messages.any? + messages.each {|m| @execution_queue << m } + @logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any? + else + @logger.debug { "[worker] No messages received" } + Thread.pass + end else - @logger.debug "Suspending poller (%d items buffered)" % @execution_queue.length - sleep 0.2 + @logger.debug { "[worker] Suspending poller (%d items buffered)" % @execution_queue.length } + sleep 1 Thread.pass end end end @@ -152,49 +171,31 @@ def stopping? @state.in_state?(:stopping) end + def queue_has_capacity? + @execution_queue.length < (@num_threads * THROTTLE_FACTOR) + end + def handle_message(message) + return unless message.receipt_handle + return @connection.delete_message(message.receipt_handle) unless message.has_body? + @isolator.perform(self, message) + @connection.delete_message(message.receipt_handle) + end + def take_and_execute - message_id, message_body = @execution_queue.pop(nonblock=true) - return unless message_id - return @connection.delete_message(message_id) unless message_body && !message_body.empty? - - @isolator.isolate do - job = @middleware_stack.around_deserialization(@serializer, message_id, message_body) do - @serializer.unserialize(message_body) - end - - if job # if the serializer returns a nil or false - t = Time.now - submitter = @submitter_class.new(@connection, @serializer) - context = @execution_context_class.new(submitter, {STR_logger => @logger}) - - begin - @middleware_stack.around_execution(job, context) do - job.method(:run).arity.zero? ? job.run : job.run(context) - end - @logger.info { "[worker] Finished #{job.inspect} in %0.2fs" % (Time.now - t) } - rescue => e - @logger.error { "[worker] Failed #{job.inspect} with a #{e}" } - raise e - end - end - end - - @connection.delete_message(message_id) + message = @execution_queue.pop(nonblock=true) + handle_message(message) rescue ThreadError # Queue is empty sleep SLEEP_SECONDS_ON_EMPTY_QUEUE Thread.pass - rescue SystemExit, SignalException, Interrupt => e # Time to quit - @logger.error { "[worker] Signaled, will quit the consumer" } - return rescue => e # anything else, at or below StandardError that does not need us to quit - @logger.fatal { "[worker] Failed #{message_id} with #{e}" } - @logger.fatal(e.class) - @logger.fatal(e.message) - e.backtrace.each { |s| @logger.fatal{"\t#{s}"} } + @logger.error { "[worker] Failed #{message.inspect} with #{e}" } + @logger.error(e.class) + @logger.error(e.message) + e.backtrace.each { |s| @logger.error{"\t#{s}"} } end STR_logger = 'logger' end