lib/sqewer/worker.rb in sqewer-5.0.8 vs lib/sqewer/worker.rb in sqewer-5.0.9

- old
+ new

@@ -31,10 +31,13 @@ attr_reader :threads # @return [Fixnum] the number of worker threads set up for this Worker attr_reader :num_threads + # @return [Symbol] the current state of this Worker + attr_reader :state + # Returns a Worker instance, configured based on the default components # # @return [Sqewer::Worker] def self.default new @@ -92,26 +95,32 @@ end end # Create the provider thread. When the execution queue is exhausted, # grab new messages and place them on the local queue. + owning_worker = self # self won't be self anymore in the thread provider = Thread.new do loop do - break if stopping? + begin + break if stopping? - if queue_has_capacity? - messages = @connection.receive_messages - if messages.any? - messages.each {|m| @execution_queue << m } - @logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any? + if queue_has_capacity? + messages = @connection.receive_messages + if messages.any? + messages.each {|m| @execution_queue << m } + @logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any? + else + @logger.debug { "[worker] No messages received" } + Thread.pass + end else - @logger.debug { "[worker] No messages received" } - Thread.pass + @logger.debug { "[worker] Cache is full (%d items), postponing receive" % @execution_queue.length } + sleep SLEEP_SECONDS_ON_EMPTY_QUEUE end - else - @logger.debug { "[worker] Cache is full (%d items), postponing receive" % @execution_queue.length } - sleep SLEEP_SECONDS_ON_EMPTY_QUEUE + rescue StandardError => e + @logger.fatal "Exiting because message receiving thread died. Exception causing this: #{e.inspect}" + owning_worker.stop # allow any queues and/or running jobs to complete end end end # Register the provider separately for the situation where it hangs in `receive_messages` and doesn't @@ -156,10 +165,10 @@ @logger.info { '[worker] Stopped'} @state.transition! :stopped true end - # Peforms a hard shutdown by killing all the threads + # Performs a hard shutdown by killing all the threads def kill @state.transition! :stopping @logger.info { '[worker] Killing (unclean shutdown), will kill all threads'} @threads.map(&:kill) @logger.info { '[worker] Stopped'}