Sha256: eb81c4d16f27d8ad5b001ed125238fa524940a48d6af27cd25851da040822bb6

Contents?: true

Size: 1.44 KB

Versions: 1

Compression:

Stored size: 1.44 KB

Contents

require "bunny_hop/version"
require "bunny"

class BunnyHop
  def initialize
    @connection = ::Bunny.new(host: ENV.fetch('RABBITMQ_HOST', 'rabbitmq'), user: ENV.fetch('RABBITMQ_USER', 'rabbitmq'), password: ENV.fetch('RABBITMQ_PASSWORD', 'rabbitmq'))
    @connection.start
    @channel = @connection.create_channel
  end

  def publish(queue, msg)
    lock = Mutex.new
    condition = ConditionVariable.new
    response = nil
    call_id = "#{rand}#{rand}#{rand}"
    exchange = @channel.default_exchange

    reply_queue = @channel.queue('', exclusive: true)
    reply_queue.subscribe do |_delivery_info, properties, payload|
      if properties[:correlation_id] == call_id
        response = payload

        lock.synchronize { condition.signal }
      end
    end

    exchange.publish(msg,
      routing_key: queue,
      correlation_id: call_id,
      reply_to: reply_queue.name)

    Timeout::timeout(5) {
      lock.synchronize { condition.wait(lock) }
    }
  
    @channel.close
    @connection.close

    response
  rescue Timeout::Error => e
    @channel.close
    @connection.close
    raise e
  end
  
  def subscribe(queue)
    queue = @channel.queue(queue)
    exchange = @channel.default_exchange

    queue.subscribe do |_delivery_info, properties, payload|     
      result = yield(payload)

      exchange.publish(
        result.to_s,
        routing_key: properties.reply_to,
        correlation_id: properties.correlation_id
      )
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
bunny_hop-0.1.3 lib/bunny_hop.rb