lib/sqewer/worker.rb in sqewer-2.0.1 vs lib/sqewer/worker.rb in sqewer-2.0.2

- old
+ new

@@ -122,23 +122,12 @@ Thread.pass end end end - # It makes sense to have one GC caller per process, since a GC cuts across threads. - # We will perform a full GC cycle after the same number of jobs as our consumer thread - # count - so not on every job, but still as often as we can to keep the memory use in check. - gc = Thread.new do - loop do - break if stopping? - GC.start if (@execution_counter.to_i % @num_threads).zero? - sleep 0.5 - end - end + @threads = consumers + [provider] - @threads = [provider, gc] + consumers - # If any of our threads are already dead, it means there is some misconfiguration and startup failed if @threads.any?{|t| !t.alive? } @threads.map(&:kill) @state.transition! :failed @logger.fatal { '[worker] Failed to start (one or more threads died on startup)' } @@ -151,9 +140,19 @@ # Attempts to softly stop the running consumers and the producer. Once the call is made, # all the threads will stop at their next loop iteration. def stop @state.transition! :stopping @logger.info { '[worker] Stopping (clean shutdown), will wait for threads to terminate'} + loop do + n_live = @threads.select(&:alive?).length + break if n_live.zero? + + n_dead = @threads.length - n_live + @logger.info {"Waiting on threads to terminate, %d still alive, %d quit" % [n_live, n_dead] } + + sleep 2 + end + @threads.map(&:join) @logger.info { '[worker] Stopped'} @state.transition! :stopped end