lib/sqewer/worker.rb in sqewer-4.0.1 vs lib/sqewer/worker.rb in sqewer-4.1.0

- old
+ new

@@ -28,11 +28,14 @@ attr_reader :submitter_class # @return [#perform] The isolator to use when executing each job attr_reader :isolator - # @return [Fixnum] the number of threads to spin up + # @return [Array<Thread>] all the currently running threads of the Worker + attr_reader :threads + + # @return [Fixnum] the number of worker threads set up for this Worker attr_reader :num_threads # Returns the default Worker instance, configured based on the default components # # @return [Sqewer::Worker] @@ -68,10 +71,12 @@ @execution_context_class = execution_context_class @submitter_class = submitter_class @isolator = isolator @num_threads = num_threads + @threads = [] + raise ArgumentError, "num_threads must be > 0" unless num_threads > 0 @execution_counter = Sqewer::AtomicCounter.new @state = Sqewer::StateLock.new @@ -82,21 +87,16 @@ # @param num_threads[Fixnum] the number of consumer/executor threads to spin up # @return [void] def start @state.transition! :starting - Thread.abort_on_exception = true - @logger.info { '[worker] Starting with %d consumer threads' % @num_threads } @execution_queue = Queue.new consumers = (1..@num_threads).map do Thread.new do - loop { - take_and_execute - break if stopping? - } + catch(:goodbye) { loop {take_and_execute} } end end # Create the provider thread. When the execution queue is exhausted, # grab new messages and place them on the local queue. @@ -132,27 +132,34 @@ @logger.info { '[worker] Started, %d consumer threads' % consumers.length } end end # Attempts to softly stop the running consumers and the producer. Once the call is made, - # all the threads will stop at their next loop iteration. + # all the threads will stop after the local cache of messages is emptied. This is to ensure that + # message drops do not happen just because the worker is about to be terminated. + # + # The call will _block_ until all the threads of the worker are terminated + # + # @return [true] def stop @state.transition! :stopping - @logger.info { '[worker] Stopping (clean shutdown), will wait for threads to terminate'} + @logger.info { '[worker] Stopping (clean shutdown), will wait for local cache to drain' } loop do n_live = @threads.select(&:alive?).length break if n_live.zero? n_dead = @threads.length - n_live - @logger.info { '[worker] Waiting on threads to terminate, %d still alive, %d quit' % [n_live, n_dead] } + @logger.info { '[worker] Staged shutdown, %d threads alive, %d have quit, %d jobs in local cache' % + [n_live, n_dead, @execution_queue.length] } sleep 2 end @threads.map(&:join) @logger.info { '[worker] Stopped'} @state.transition! :stopped + true end # Peforms a hard shutdown by killing all the threads def kill @state.transition! :stopping @@ -160,10 +167,18 @@ @threads.map(&:kill) @logger.info { '[worker] Stopped'} @state.transition! :stopped end + # Prints the status and the backtraces of all controlled threads to the logger + def debug_thread_information! + @threads.each do | t | + @logger.debug { t.inspect } + @logger.debug { t.backtrace } + end + end + private def stopping? @state.in_state?(:stopping) end @@ -172,18 +187,20 @@ @execution_queue.length < (@num_threads * THROTTLE_FACTOR) end def handle_message(message) return unless message.receipt_handle + Thread.current[:queue_messsage] = '%s...' %message.body[0..32] return @connection.delete_message(message.receipt_handle) unless message.has_body? @isolator.perform(self, message) # The message delete happens within the Isolator end def take_and_execute message = @execution_queue.pop(nonblock=true) handle_message(message) rescue ThreadError # Queue is empty + throw :goodbye if stopping? sleep SLEEP_SECONDS_ON_EMPTY_QUEUE rescue => e # anything else, at or below StandardError that does not need us to quit @logger.error { '[worker] Failed "%s..." with %s: %s' % [message.inspect[0..32], e.class, e.message] } e.backtrace.each { |s| @logger.error{"\t#{s}"} } end