lib/sqewer/worker.rb in sqewer-2.0.1 vs lib/sqewer/worker.rb in sqewer-2.0.2
- old
+ new
@@ -122,23 +122,12 @@
Thread.pass
end
end
end
- # It makes sense to have one GC caller per process, since a GC cuts across threads.
- # We will perform a full GC cycle after the same number of jobs as our consumer thread
- # count - so not on every job, but still as often as we can to keep the memory use in check.
- gc = Thread.new do
- loop do
- break if stopping?
- GC.start if (@execution_counter.to_i % @num_threads).zero?
- sleep 0.5
- end
- end
+ @threads = consumers + [provider]
- @threads = [provider, gc] + consumers
-
# If any of our threads are already dead, it means there is some misconfiguration and startup failed
if @threads.any?{|t| !t.alive? }
@threads.map(&:kill)
@state.transition! :failed
@logger.fatal { '[worker] Failed to start (one or more threads died on startup)' }
@@ -151,9 +140,19 @@
# Attempts to softly stop the running consumers and the producer. Once the call is made,
# all the threads will stop at their next loop iteration.
def stop
@state.transition! :stopping
@logger.info { '[worker] Stopping (clean shutdown), will wait for threads to terminate'}
+ loop do
+ n_live = @threads.select(&:alive?).length
+ break if n_live.zero?
+
+ n_dead = @threads.length - n_live
+ @logger.info {"Waiting on threads to terminate, %d still alive, %d quit" % [n_live, n_dead] }
+
+ sleep 2
+ end
+
@threads.map(&:join)
@logger.info { '[worker] Stopped'}
@state.transition! :stopped
end