Sha256: 262b845fc78327e235774e5160b7ba46d66370fb53e6f77c8c51811e43135f1a
Contents?: true
Size: 1.86 KB
Versions: 1
Compression:
Stored size: 1.86 KB
Contents
require "hot_bunnies/consumers/base" module HotBunnies class BlockingCallbackConsumer < CallbackConsumer include JavaConcurrent POISON = :__poison__ def initialize(channel, buffer_size, opts, callback) super(channel, callback) if buffer_size @internal_queue = ArrayBlockingQueue.new(buffer_size) else @internal_queue = LinkedBlockingQueue.new end @opts = opts end def cancel @cancelling.set(true) response = channel.basic_cancel(consumer_tag) @cancelled.set(true) @internal_queue.offer(POISON) @terminated.set(true) response end def start interrupted = false until (@cancelling.get || @cancelled.get) || JavaConcurrent::Thread.current_thread.interrupted? begin pair = @internal_queue.take if pair if pair == POISON @cancelling.set(true) else callback(*pair) end end rescue InterruptedException => e interrupted = true end end while (pair = @internal_queue.poll) if pair if pair == POISON @cancelling.set(true) else callback(*pair) end end end @terminated.set(true) if interrupted JavaConcurrent::Thread.current_thread.interrupt end end def deliver(*pair) if (@cancelling.get || @cancelled.get) || JavaConcurrent::Thread.current_thread.interrupted? @internal_queue.offer(pair) else begin @internal_queue.put(pair) rescue InterruptedException => e JavaConcurrent::Thread.current_thread.interrupt end end end def gracefully_shut_down @cancelling.set(true) @internal_queue.offer(POISON) @terminated.set(true) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
hot_bunnies-2.0.0.pre9-java | lib/hot_bunnies/consumers/blocking.rb |