Sha256: 1b5559567385d726a024861455fc45ff1cc4d35c28b653e3c0a80b4c6d1a60bd

Contents?: true

Size: 1.37 KB

Versions: 1

Compression:

Stored size: 1.37 KB

Contents

require 'thread'
require 'bunny/consumer'

module Vx
  module Consumer
    class Subscriber < Bunny::Consumer

      include Instrument

      attr_accessor :vx_consumer_name

      def initialize(*args)
        super(*args)
        @lock = Mutex.new
      end

      def graceful_shutdown
        instrument('graceful_shutdown_consumer', consumer: vx_consumer_name)
        in_progress { cancel }
      end

      def try_graceful_shutdown
        if @lock.try_lock
          begin
            instrument('graceful_shutdown_consumer', consumer: vx_consumer_name)
            cancel
          ensure
            @lock.unlock
          end
          true
        else
          false
        end
      end

      def in_progress
        @lock.synchronize do
          yield
        end
      end

      def running?
        @lock.locked?
      end

      def call(*args)
        in_progress do
          @on_delivery.call(*args) if @on_delivery
          sleep 0
        end
      end

      def cancel
        instrument('cancel_consumer', consumer: vx_consumer_name, channel: channel.id)
        super
        channel.close unless channel.closed?
      end

      def join
        channel.work_pool.join
      end

      def wait_shutdown
        Thread.new do
          Thread.current.abort_on_exception = true
          Consumer.wait_shutdown
          cancel
        end
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
vx-consumer-0.1.6 lib/vx/consumer/subscriber.rb