Sha256: aab6e012e8a69c978a6654b5670335c61c8d69d39b12339c711c33a19a3b5b72

Contents?: true

Size: 1.75 KB

Versions: 1

Compression:

Stored size: 1.75 KB

Contents

require "hot_bunnies/consumers/base"

module HotBunnies
  class CallbackConsumer < BaseConsumer
    def initialize(channel, callback)
      raise ArgumentError, "callback must not be nil!" if callback.nil?

      super(channel)
      @callback = callback
      @callback_arity = @callback.arity
    end

    def callback(headers, message)
      if @callback_arity == 2
        @callback.call(headers, message)
      else
        @callback.call(message)
      end
    end
  end

  class AsyncCallbackConsumer < CallbackConsumer
    def initialize(channel, opts, callback, executor)
      super(channel, callback)
      @executor = executor
      @executor_submit = executor.java_method(:submit, [JavaConcurrent::Runnable.java_class])
      @opts = opts
    end

    def deliver(headers, message)
      unless @executor.shutdown?
        @executor_submit.call do
          begin
            callback(headers, message)
          rescue Exception => e
            $stderr.puts "Unhandled exception in consumer #{@consumer_tag}: #{e.message}"
          end
        end
      end
    end

    def cancel
      @cancelling.set(true)
      response = channel.basic_cancel(consumer_tag)
      @cancelled.set(true)
      @terminated.set(true)

      gracefully_shutdown

      response
    end

    def handleCancel(consumer_tag)
      super(consumer_tag)

      gracefully_shutdown
    end

    def shutdown!
      @executor.shutdown_now if @executor
    end
    alias shut_down! shutdown!

    def gracefully_shut_down
      unless @executor.await_termination(1, JavaConcurrent::TimeUnit::SECONDS)
        @executor.shutdown_now
      end
      @terminated.set(true)
    end
    alias maybe_shut_down_executor gracefully_shut_down
    alias gracefully_shutdown      gracefully_shut_down
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
hot_bunnies-2.0.0.pre9-java lib/hot_bunnies/consumers/non_blocking.rb