Sha256: be71303f15c8dd673d02e5eb025d37f212bcbc1bee3a12b318c8e5b4bdc775bd

Contents?: true

Size: 1.91 KB

Versions: 3

Compression:

Stored size: 1.91 KB

Contents

require "hot_bunnies/consumers/base"

module HotBunnies
  class BlockingCallbackConsumer < CallbackConsumer
    POISON = :__poison__

    def initialize(channel, queue, buffer_size, opts, callback)
      super(channel, queue, opts, callback)
      if buffer_size
        @internal_queue = JavaConcurrent::ArrayBlockingQueue.new(buffer_size)
      else
        @internal_queue = JavaConcurrent::LinkedBlockingQueue.new
      end
    end

    def cancel
      @cancelling.set(true)
      response = channel.basic_cancel(consumer_tag)
      @cancelled.set(true)

      @internal_queue.offer(POISON)
      @terminated.set(true)

      response
    end

    def start
      interrupted = false
      until (@cancelling.get || @cancelled.get) || JavaConcurrent::Thread.current_thread.interrupted?
        begin
          pair = @internal_queue.take
          if pair
            if pair == POISON
              @cancelling.set(true)
            else
              @callback.call(*pair)
            end
          end
        rescue JavaConcurrent::InterruptedException => e
          interrupted = true
        end
      end
      while (pair = @internal_queue.poll)
        if pair
          if pair == POISON
            @cancelling.set(true)
          else
            @callback.call(*pair)
          end
        end
      end
      @terminated.set(true)
      if interrupted
        JavaConcurrent::Thread.current_thread.interrupt
      end
    end

    def deliver(*pair)
      if (@cancelling.get || @cancelled.get) || JavaConcurrent::Thread.current_thread.interrupted?
        @internal_queue.offer(pair)
      else
        begin
          @internal_queue.put(pair)
        rescue JavaConcurrent::InterruptedException => e
          JavaConcurrent::Thread.current_thread.interrupt
        end
      end
    end

    def gracefully_shut_down
      @cancelling.set(true)
      @internal_queue.offer(POISON)

      @terminated.set(true)
    end

  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
hot_bunnies-2.0.0.pre13-java lib/hot_bunnies/consumers/blocking.rb
hot_bunnies-2.0.0.pre12-java lib/hot_bunnies/consumers/blocking.rb
hot_bunnies-2.0.0.pre11-java lib/hot_bunnies/consumers/blocking.rb