Sha256: eabdc121742bcd425f349264bf096f56298d16a8f5c23ab3d5214aaf869cb2d5
Contents?: true
Size: 1.08 KB
Versions: 3
Compression:
Stored size: 1.08 KB
Contents
require 'thread' require 'bunny/consumer' module Vx module Consumer class Subscriber < Bunny::Consumer include Instrument attr_accessor :vx_consumer_name def initialize(*args) super(*args) @lock = Mutex.new end def graceful_shutdown instrument('graceful_shutdown_consumer', consumer: vx_consumer_name) in_progress { cancel } end def in_progress @lock.synchronize do yield end end def running? @lock.locked? end def call(*args) in_progress do @on_delivery.call(*args) if @on_delivery sleep 0 end end def cancel instrument('cancel_consumer', consumer: vx_consumer_name, channel: channel.id) super channel.close unless channel.closed? end def join channel.work_pool.join end def wait_shutdown Thread.new do Thread.current.abort_on_exception = true Consumer.wait_shutdown cancel end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
vx-consumer-0.1.5 | lib/vx/consumer/subscriber.rb |
vx-consumer-0.1.4 | lib/vx/consumer/subscriber.rb |
vx-consumer-0.1.3 | lib/vx/consumer/subscriber.rb |