Sha256: 8929753bbd5b5c729441bedec877b01e1631736874741849d6dac0bd531a4309

Contents?: true

Size: 1.36 KB

Versions: 5

Compression:

Stored size: 1.36 KB

Contents

require 'thread'
require 'bunny/consumer'

module Vx
  module Lib
    module Consumer
      class Subscriber < Bunny::Consumer

        attr_accessor :vx_consumer_name, :queue_name

        def initialize(*args)
          super(*args)
          @lock = Mutex.new
        end

        def graceful_shutdown
          in_progress do
            cancel
          end
        end

        def try_graceful_shutdown
          if @lock.try_lock
            begin
              cancel
            ensure
              @lock.unlock
            end
            true
          else
            false
          end
        end

        def in_progress
          @lock.synchronize do
            yield
          end
        end

        def running?
          @lock.locked?
        end

        def call(*args)
          in_progress do
            @on_delivery.call(*args) if @on_delivery
            sleep 0
          end
        end

        def cancel
          unless closed?
            super
            channel.close unless closed?
          end
        end

        def closed?
          channel.closed?
        end

        def join
          channel.work_pool.join
        end

        def wait_shutdown
          Thread.new do
            Thread.current.abort_on_exception = true
            Consumer.wait_shutdown
            cancel
          end
        end

      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
vx-lib-consumer-0.3.4 lib/vx/lib/consumer/subscriber.rb
vx-lib-consumer-0.3.3 lib/vx/lib/consumer/subscriber.rb
vx-lib-consumer-0.3.2 lib/vx/lib/consumer/subscriber.rb
vx-lib-consumer-0.3.1 lib/vx/lib/consumer/subscriber.rb
vx-lib-consumer-0.3.0 lib/vx/lib/consumer/subscriber.rb