Sha256: eabdc121742bcd425f349264bf096f56298d16a8f5c23ab3d5214aaf869cb2d5

Contents?: true

Size: 1.08 KB

Versions: 3

Compression:

Stored size: 1.08 KB

Contents

require 'thread'
require 'bunny/consumer'

module Vx
  module Consumer
    class Subscriber < Bunny::Consumer

      include Instrument

      attr_accessor :vx_consumer_name

      def initialize(*args)
        super(*args)
        @lock = Mutex.new
      end

      def graceful_shutdown
        instrument('graceful_shutdown_consumer', consumer: vx_consumer_name)
        in_progress { cancel }
      end

      def in_progress
        @lock.synchronize do
          yield
        end
      end

      def running?
        @lock.locked?
      end

      def call(*args)
        in_progress do
          @on_delivery.call(*args) if @on_delivery
          sleep 0
        end
      end

      def cancel
        instrument('cancel_consumer', consumer: vx_consumer_name, channel: channel.id)
        super
        channel.close unless channel.closed?
      end

      def join
        channel.work_pool.join
      end

      def wait_shutdown
        Thread.new do
          Thread.current.abort_on_exception = true
          Consumer.wait_shutdown
          cancel
        end
      end

    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
vx-consumer-0.1.5 lib/vx/consumer/subscriber.rb
vx-consumer-0.1.4 lib/vx/consumer/subscriber.rb
vx-consumer-0.1.3 lib/vx/consumer/subscriber.rb