Sha256: 8929753bbd5b5c729441bedec877b01e1631736874741849d6dac0bd531a4309
Contents?: true
Size: 1.36 KB
Versions: 5
Compression:
Stored size: 1.36 KB
Contents
require 'thread' require 'bunny/consumer' module Vx module Lib module Consumer class Subscriber < Bunny::Consumer attr_accessor :vx_consumer_name, :queue_name def initialize(*args) super(*args) @lock = Mutex.new end def graceful_shutdown in_progress do cancel end end def try_graceful_shutdown if @lock.try_lock begin cancel ensure @lock.unlock end true else false end end def in_progress @lock.synchronize do yield end end def running? @lock.locked? end def call(*args) in_progress do @on_delivery.call(*args) if @on_delivery sleep 0 end end def cancel unless closed? super channel.close unless closed? end end def closed? channel.closed? end def join channel.work_pool.join end def wait_shutdown Thread.new do Thread.current.abort_on_exception = true Consumer.wait_shutdown cancel end end end end end end
Version data entries
5 entries across 5 versions & 1 rubygems