Sha256: bae5cbfa0aac5bf4398d729c69e7b704e51cb01036943be3fae5e4845d77c476

Contents?: true

Size: 1.78 KB

Versions: 39

Compression:

Stored size: 1.78 KB

Contents

require "march_hare/consumers/base"

module MarchHare
  class BlockingCallbackConsumer < CallbackConsumer
    POISON = :__poison__

    def initialize(channel, queue, buffer_size, opts, callback)
      super(channel, queue, opts, callback)
      if buffer_size
        @internal_queue = JavaConcurrent::ArrayBlockingQueue.new(buffer_size)
      else
        @internal_queue = JavaConcurrent::LinkedBlockingQueue.new
      end
    end

    def cancel
      if super
        @internal_queue.offer(POISON)
      end
    end

    def start
      interrupted = false
      until (@cancelling.get || @cancelled.get) || JavaConcurrent::Thread.current_thread.interrupted?
        begin
          pair = @internal_queue.take
          if pair
            if pair == POISON
              @cancelling.set(true)
            else
              @callback.call(*pair)
            end
          end
        rescue JavaConcurrent::InterruptedException => e
          interrupted = true
        end
      end
      while (pair = @internal_queue.poll)
        if pair
          if pair == POISON
            @cancelling.set(true)
          else
            @callback.call(*pair)
          end
        end
      end
      @terminated.set(true)
      if interrupted
        JavaConcurrent::Thread.current_thread.interrupt
      end
    end

    def deliver(*pair)
      if (@cancelling.get || @cancelled.get) || JavaConcurrent::Thread.current_thread.interrupted?
        @internal_queue.offer(pair)
      else
        begin
          @internal_queue.put(pair)
        rescue JavaConcurrent::InterruptedException => e
          JavaConcurrent::Thread.current_thread.interrupt
        end
      end
    end

    def gracefully_shut_down
      @cancelling.set(true)
      @internal_queue.offer(POISON)

      @terminated.set(true)
    end

  end
end

Version data entries

39 entries across 39 versions & 1 rubygems

Version Path
march_hare-4.6.0-java lib/march_hare/consumers/blocking.rb
march_hare-4.5.0-java lib/march_hare/consumers/blocking.rb
march_hare-4.4.0-java lib/march_hare/consumers/blocking.rb
march_hare-4.3.0-java lib/march_hare/consumers/blocking.rb
march_hare-4.2.0-java lib/march_hare/consumers/blocking.rb
march_hare-4.1.1-java lib/march_hare/consumers/blocking.rb
march_hare-4.1.0-java lib/march_hare/consumers/blocking.rb
march_hare-4.0.0-java lib/march_hare/consumers/blocking.rb
march_hare-3.1.1-java lib/march_hare/consumers/blocking.rb
march_hare-3.1.0-java lib/march_hare/consumers/blocking.rb
march_hare-3.0.0-java lib/march_hare/consumers/blocking.rb
march_hare-2.22.0-java lib/march_hare/consumers/blocking.rb
march_hare-2.21.0-java lib/march_hare/consumers/blocking.rb
march_hare-2.20.0-java lib/march_hare/consumers/blocking.rb
march_hare-2.19.0-java lib/march_hare/consumers/blocking.rb
march_hare-2.18.0-java lib/march_hare/consumers/blocking.rb
march_hare-2.17.0-java lib/march_hare/consumers/blocking.rb
march_hare-2.16.0-java lib/march_hare/consumers/blocking.rb
march_hare-2.15.0-java lib/march_hare/consumers/blocking.rb
march_hare-2.13.0-java lib/march_hare/consumers/blocking.rb