Sha256: 098a5cffaa1963b3d6e68372751fc9c11173c40de4351db5ae6c92574aed0b12
Contents?: true
Size: 1.63 KB
Versions: 1
Compression:
Stored size: 1.63 KB
Contents
require "bunny/hop/version" module Bunny module Hop def initialize @connection = Bunny.new( host: ENV.fetch('RABBITMQ_HOST', 'rabbitmq'), user: ENV.fetch('RABBITMQ_USER', 'rabbitmq'), password: ENV.fetch('RABBITMQ_PASSWORD', 'rabbitmq') ) @connection.start @channel = @connection.create_channel end def publish(queue, msg) lock = Mutex.new condition = ConditionVariable.new response = nil call_id = "#{rand}#{rand}#{rand}" exchange = @channel.default_exchange reply_queue = @channel.queue('', exclusive: true) reply_queue.subscribe do |_delivery_info, properties, payload| if properties[:correlation_id] == call_id response = payload lock.synchronize { condition.signal } end end exchange.publish(msg, routing_key: queue, correlation_id: call_id, reply_to: reply_queue.name) Timeout::timeout(5) { lock.synchronize { condition.wait(lock) } } @channel.close @connection.close response rescue Timeout::Error => e @channel.close @connection.close raise e end def subscribe(queue) queue = @channel.queue(queue) exchange = @channel.default_exchange queue.subscribe do |_delivery_info, properties, payload| puts _delivery_info puts properties puts payload result = yield exchange.publish( result, routing_key: properties.reply_to, correlation_id: properties.correlation_id ) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
bunny-hop-0.1.0 | lib/bunny/hop.rb |