lib/sqewer/worker.rb in sqewer-5.0.3 vs lib/sqewer/worker.rb in sqewer-5.0.4

- old
+ new

@@ -112,10 +112,13 @@ sleep SLEEP_SECONDS_ON_EMPTY_QUEUE end end end + # Register the provider separately for the situation where it hangs in `receive_messages` and doesn't + # terminate from within it's own run loop. + @provider_thread = provider @threads = consumers + [provider] # If any of our threads are already dead, it means there is some misconfiguration and startup failed if @threads.any?{|t| !t.alive? } @threads.map(&:kill) @@ -134,10 +137,11 @@ # The call will _block_ until all the threads of the worker are terminated # # @return [true] def stop @state.transition! :stopping - @logger.info { '[worker] Stopping (clean shutdown), will wait for local cache to drain' } + @logger.info { '[worker] Stopping (clean shutdown), will wait for local cache to drain. Killing the provider thread now.' } + @provider_thread.kill loop do n_live = @threads.select(&:alive?).length break if n_live.zero? n_dead = @threads.length - n_live