lib/sqewer/worker.rb in sqewer-4.0.0 vs lib/sqewer/worker.rb in sqewer-4.0.1

- old
+ new

@@ -72,14 +72,11 @@ raise ArgumentError, "num_threads must be > 0" unless num_threads > 0 @execution_counter = Sqewer::AtomicCounter.new - @state = VeryTinyStateMachine.new(:stopped) - @state.permit_state :starting, :running, :stopping, :stopped, :failed - @state.permit_transition :stopped => :starting, :starting => :running, :running => :stopping, :stopping => :stopped - @state.permit_transition :starting => :failed # Failed to start + @state = Sqewer::StateLock.new end # Start listening on the queue, spin up a number of consumer threads that will execute the jobs. # # @param num_threads[Fixnum] the number of consumer/executor threads to spin up @@ -115,13 +112,12 @@ else @logger.debug { "[worker] No messages received" } Thread.pass end else - @logger.debug { "[worker] Suspending poller (%d items buffered)" % @execution_queue.length } - sleep 1 - Thread.pass + @logger.debug { "[worker] Cache is full (%d items), postponing receive" % @execution_queue.length } + sleep SLEEP_SECONDS_ON_EMPTY_QUEUE end end end @threads = consumers + [provider] @@ -145,11 +141,11 @@ loop do n_live = @threads.select(&:alive?).length break if n_live.zero? n_dead = @threads.length - n_live - @logger.info {"Waiting on threads to terminate, %d still alive, %d quit" % [n_live, n_dead] } + @logger.info { '[worker] Waiting on threads to terminate, %d still alive, %d quit' % [n_live, n_dead] } sleep 2 end @threads.map(&:join) @@ -186,11 +182,10 @@ def take_and_execute message = @execution_queue.pop(nonblock=true) handle_message(message) rescue ThreadError # Queue is empty sleep SLEEP_SECONDS_ON_EMPTY_QUEUE - Thread.pass rescue => e # anything else, at or below StandardError that does not need us to quit - @logger.error { "[worker] Failed %s... with %s: %s" % [message.inspect[0..64], e.class, e.message] } + @logger.error { '[worker] Failed "%s..." with %s: %s' % [message.inspect[0..32], e.class, e.message] } e.backtrace.each { |s| @logger.error{"\t#{s}"} } end end