Sha256: 94ec897274e60af41127633cff3cf0a00bdc3a05ea14847964b01ee8776a0c7a
Contents?: true
Size: 1.2 KB
Versions: 2
Compression:
Stored size: 1.2 KB
Contents
require 'bunny' # require 'pry' # require 'pry-nav' require_relative '../rapids_connection' require_relative './rabbit_mq_river' module RapidsRivers # Understands an event bus based on RabbitMQ class RabbitMqRapids include RapidsRivers::RapidsConnection RAPIDS = 'rapids' def initialize(host_ip, port) host_ip = host_ip || ENV['RABBITMQ_IP'] || throw("Need IP address for RabbitMQ") port = port || ENV['RABBITMQ_PORT'] || 5672 @connection = Bunny.new( :host => host_ip, :port => port.to_i, :automatically_recover => false) end def publish(packet) exchange.publish packet.to_json end def queue queue_name = "" channel.queue(queue_name || "", exclusive: true, auto_delete: true).tap do |queue| queue.bind exchange end end def close channel.close @connection.close end private def channel return @channel if @channel @connection.start @channel = @connection.create_channel end def exchange @exchange ||= channel.fanout(RAPIDS, durable: true, auto_delete: true) end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
rapids_rivers-0.2.14 | lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb |
rapids_rivers-0.2.9 | lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb |