lib/sqewer/worker.rb in sqewer-5.0.3 vs lib/sqewer/worker.rb in sqewer-5.0.4
- old
+ new
@@ -112,10 +112,13 @@
sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
end
end
end
+ # Register the provider separately for the situation where it hangs in `receive_messages` and doesn't
+ # terminate from within it's own run loop.
+ @provider_thread = provider
@threads = consumers + [provider]
# If any of our threads are already dead, it means there is some misconfiguration and startup failed
if @threads.any?{|t| !t.alive? }
@threads.map(&:kill)
@@ -134,10 +137,11 @@
# The call will _block_ until all the threads of the worker are terminated
#
# @return [true]
def stop
@state.transition! :stopping
- @logger.info { '[worker] Stopping (clean shutdown), will wait for local cache to drain' }
+ @logger.info { '[worker] Stopping (clean shutdown), will wait for local cache to drain. Killing the provider thread now.' }
+ @provider_thread.kill
loop do
n_live = @threads.select(&:alive?).length
break if n_live.zero?
n_dead = @threads.length - n_live