Sha256: b2e89d1770ede0de0fadf073f8623c3ab99f9aa4a372159011ca22fb72dbfeb6
Contents?: true
Size: 1.75 KB
Versions: 3
Compression:
Stored size: 1.75 KB
Contents
# frozen_string_literal: true module Core # common start/stop module Worker private def worker_loop(queue) loop do msg = queue.pop # block until there's a msg do_work unless worker_stopping? if worker_stopping? do_stop if respond_to?(:do_stop, true) # check for private and protected too return end queue.clear # ignore work that came in while we were working rescue StandardError => e log.fatal('worker') { ["Worker #{self.class} raised an uncaught exception", e] } return if msg == :do_stop end end def run_at_startup false end def worker_start worker_stop queue = Queue.new @worker_thread = Thread.new do log.debug "Worker thread started for #{self.class}" worker_loop queue log.debug "Worker thread exited for #{self.class}" end @worker_thread[:queue] = queue queue.push :do_work if run_at_startup @worker_thread[:timer] = Thread.new do loop do sleep interval_seconds queue.push :do_work end end end def worker_stop return unless (w = @worker_thread) @worker_thread = nil w[:timer].kill w[:stopping] = true w[:queue].push :do_stop # wait a little bit to see if it shuts down then log log.debug "Worker thread draining for #{self.class} #{w.object_id.to_s(32)}" unless w.join(0.2) # wait 15 seconds for worker to finish -- if not, kill it return if w.join(Config.get_f!(:WORKER_SHUTDOWN_GRACE_PERIOD, 15)) w.kill log.warn "Worker thread killed for #{self.class} #{w.object_id.to_s(32)}" end def worker_stopping? Thread.current[:stopping] end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
svcbase-0.1.18 | lib/svcbase/worker.rb |
svcbase-0.1.17 | lib/svcbase/worker.rb |
svcbase-0.1.16 | lib/svcbase/worker.rb |