lib/sqewer/worker.rb in sqewer-4.0.1 vs lib/sqewer/worker.rb in sqewer-4.1.0
- old
+ new
@@ -28,11 +28,14 @@
attr_reader :submitter_class
# @return [#perform] The isolator to use when executing each job
attr_reader :isolator
- # @return [Fixnum] the number of threads to spin up
+ # @return [Array<Thread>] all the currently running threads of the Worker
+ attr_reader :threads
+
+ # @return [Fixnum] the number of worker threads set up for this Worker
attr_reader :num_threads
# Returns the default Worker instance, configured based on the default components
#
# @return [Sqewer::Worker]
@@ -68,10 +71,12 @@
@execution_context_class = execution_context_class
@submitter_class = submitter_class
@isolator = isolator
@num_threads = num_threads
+ @threads = []
+
raise ArgumentError, "num_threads must be > 0" unless num_threads > 0
@execution_counter = Sqewer::AtomicCounter.new
@state = Sqewer::StateLock.new
@@ -82,21 +87,16 @@
# @param num_threads[Fixnum] the number of consumer/executor threads to spin up
# @return [void]
def start
@state.transition! :starting
- Thread.abort_on_exception = true
-
@logger.info { '[worker] Starting with %d consumer threads' % @num_threads }
@execution_queue = Queue.new
consumers = (1..@num_threads).map do
Thread.new do
- loop {
- take_and_execute
- break if stopping?
- }
+ catch(:goodbye) { loop {take_and_execute} }
end
end
# Create the provider thread. When the execution queue is exhausted,
# grab new messages and place them on the local queue.
@@ -132,27 +132,34 @@
@logger.info { '[worker] Started, %d consumer threads' % consumers.length }
end
end
# Attempts to softly stop the running consumers and the producer. Once the call is made,
- # all the threads will stop at their next loop iteration.
+ # all the threads will stop after the local cache of messages is emptied. This is to ensure that
+ # message drops do not happen just because the worker is about to be terminated.
+ #
+ # The call will _block_ until all the threads of the worker are terminated
+ #
+ # @return [true]
def stop
@state.transition! :stopping
- @logger.info { '[worker] Stopping (clean shutdown), will wait for threads to terminate'}
+ @logger.info { '[worker] Stopping (clean shutdown), will wait for local cache to drain' }
loop do
n_live = @threads.select(&:alive?).length
break if n_live.zero?
n_dead = @threads.length - n_live
- @logger.info { '[worker] Waiting on threads to terminate, %d still alive, %d quit' % [n_live, n_dead] }
+ @logger.info { '[worker] Staged shutdown, %d threads alive, %d have quit, %d jobs in local cache' %
+ [n_live, n_dead, @execution_queue.length] }
sleep 2
end
@threads.map(&:join)
@logger.info { '[worker] Stopped'}
@state.transition! :stopped
+ true
end
# Peforms a hard shutdown by killing all the threads
def kill
@state.transition! :stopping
@@ -160,10 +167,18 @@
@threads.map(&:kill)
@logger.info { '[worker] Stopped'}
@state.transition! :stopped
end
+ # Prints the status and the backtraces of all controlled threads to the logger
+ def debug_thread_information!
+ @threads.each do | t |
+ @logger.debug { t.inspect }
+ @logger.debug { t.backtrace }
+ end
+ end
+
private
def stopping?
@state.in_state?(:stopping)
end
@@ -172,18 +187,20 @@
@execution_queue.length < (@num_threads * THROTTLE_FACTOR)
end
def handle_message(message)
return unless message.receipt_handle
+ Thread.current[:queue_messsage] = '%s...' %message.body[0..32]
return @connection.delete_message(message.receipt_handle) unless message.has_body?
@isolator.perform(self, message)
# The message delete happens within the Isolator
end
def take_and_execute
message = @execution_queue.pop(nonblock=true)
handle_message(message)
rescue ThreadError # Queue is empty
+ throw :goodbye if stopping?
sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
rescue => e # anything else, at or below StandardError that does not need us to quit
@logger.error { '[worker] Failed "%s..." with %s: %s' % [message.inspect[0..32], e.class, e.message] }
e.backtrace.each { |s| @logger.error{"\t#{s}"} }
end