Sha256: 2e5c0cede3bfdc9b3e44626d4093bf0e4ee4fb40f0fc82972cf116dec3843c06

Contents?: true

Size: 1.52 KB

Versions: 1

Compression:

Stored size: 1.52 KB

Contents

require 'thread'
require 'bunny/consumer'

module Vx
  module Consumer
    class Subscriber < Bunny::Consumer

      include Instrument

      attr_accessor :vx_consumer_name

      def initialize(*args)
        super(*args)
        @lock = Mutex.new
      end

      def graceful_shutdown
        instrument('try_graceful_shutdown_consumer', consumer: vx_consumer_name)
        in_progress do
          cancel
          instrument('graceful_shutdown_consumer', consumer: vx_consumer_name)
        end
      end

      def try_graceful_shutdown
        if @lock.try_lock
          begin
            instrument('graceful_shutdown_consumer', consumer: vx_consumer_name)
            cancel
          ensure
            @lock.unlock
          end
          true
        else
          false
        end
      end

      def in_progress
        @lock.synchronize do
          yield
        end
      end

      def running?
        @lock.locked?
      end

      def call(*args)
        in_progress do
          @on_delivery.call(*args) if @on_delivery
          sleep 0
        end
      end

      def cancel
        instrument('cancel_consumer', consumer: vx_consumer_name, channel: channel.id)
        unless channel.closed?
          super
          channel.close unless channel.closed?
        end
      end

      def join
        channel.work_pool.join
      end

      def wait_shutdown
        Thread.new do
          Thread.current.abort_on_exception = true
          Consumer.wait_shutdown
          cancel
        end
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
vx-consumer-0.1.7 lib/vx/consumer/subscriber.rb