lib/sqewer/worker.rb in sqewer-5.0.8 vs lib/sqewer/worker.rb in sqewer-5.0.9
- old
+ new
@@ -31,10 +31,13 @@
attr_reader :threads
# @return [Fixnum] the number of worker threads set up for this Worker
attr_reader :num_threads
+ # @return [Symbol] the current state of this Worker
+ attr_reader :state
+
# Returns a Worker instance, configured based on the default components
#
# @return [Sqewer::Worker]
def self.default
new
@@ -92,26 +95,32 @@
end
end
# Create the provider thread. When the execution queue is exhausted,
# grab new messages and place them on the local queue.
+ owning_worker = self # self won't be self anymore in the thread
provider = Thread.new do
loop do
- break if stopping?
+ begin
+ break if stopping?
- if queue_has_capacity?
- messages = @connection.receive_messages
- if messages.any?
- messages.each {|m| @execution_queue << m }
- @logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any?
+ if queue_has_capacity?
+ messages = @connection.receive_messages
+ if messages.any?
+ messages.each {|m| @execution_queue << m }
+ @logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any?
+ else
+ @logger.debug { "[worker] No messages received" }
+ Thread.pass
+ end
else
- @logger.debug { "[worker] No messages received" }
- Thread.pass
+ @logger.debug { "[worker] Cache is full (%d items), postponing receive" % @execution_queue.length }
+ sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
end
- else
- @logger.debug { "[worker] Cache is full (%d items), postponing receive" % @execution_queue.length }
- sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
+ rescue StandardError => e
+ @logger.fatal "Exiting because message receiving thread died. Exception causing this: #{e.inspect}"
+ owning_worker.stop # allow any queues and/or running jobs to complete
end
end
end
# Register the provider separately for the situation where it hangs in `receive_messages` and doesn't
@@ -156,10 +165,10 @@
@logger.info { '[worker] Stopped'}
@state.transition! :stopped
true
end
- # Peforms a hard shutdown by killing all the threads
+ # Performs a hard shutdown by killing all the threads
def kill
@state.transition! :stopping
@logger.info { '[worker] Killing (unclean shutdown), will kill all threads'}
@threads.map(&:kill)
@logger.info { '[worker] Stopped'}