Sha256: 098a5cffaa1963b3d6e68372751fc9c11173c40de4351db5ae6c92574aed0b12

Contents?: true

Size: 1.63 KB

Versions: 1

Compression:

Stored size: 1.63 KB

Contents

require "bunny/hop/version"

module Bunny
  module Hop
    def initialize
      @connection = Bunny.new(
        host: ENV.fetch('RABBITMQ_HOST', 'rabbitmq'), 
        user: ENV.fetch('RABBITMQ_USER', 'rabbitmq'), 
        password: ENV.fetch('RABBITMQ_PASSWORD', 'rabbitmq')
      )
      @connection.start
      @channel = @connection.create_channel
    end

    def publish(queue, msg)
      lock = Mutex.new
      condition = ConditionVariable.new
      response = nil
      call_id = "#{rand}#{rand}#{rand}"
      exchange = @channel.default_exchange

      reply_queue = @channel.queue('', exclusive: true)
      reply_queue.subscribe do |_delivery_info, properties, payload|
        if properties[:correlation_id] == call_id
          response = payload

          lock.synchronize { condition.signal }
        end
      end

      exchange.publish(msg,
        routing_key: queue,
        correlation_id: call_id,
        reply_to: reply_queue.name)

      Timeout::timeout(5) {
        lock.synchronize { condition.wait(lock) }
      }
    
      @channel.close
      @connection.close

      response
    rescue Timeout::Error => e
      @channel.close
      @connection.close
      raise e
    end
    
    def subscribe(queue)
      queue = @channel.queue(queue)
      exchange = @channel.default_exchange

      queue.subscribe do |_delivery_info, properties, payload|
        puts _delivery_info
        puts properties
        puts payload
        
        result = yield

        exchange.publish(
          result,
          routing_key: properties.reply_to,
          correlation_id: properties.correlation_id
        )
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
bunny-hop-0.1.0 lib/bunny/hop.rb