Sha256: 94ec897274e60af41127633cff3cf0a00bdc3a05ea14847964b01ee8776a0c7a

Contents?: true

Size: 1.2 KB

Versions: 2

Compression:

Stored size: 1.2 KB

Contents

require 'bunny'

# require 'pry'
# require 'pry-nav'

require_relative '../rapids_connection'
require_relative './rabbit_mq_river'

module RapidsRivers

  # Understands an event bus based on RabbitMQ
  class RabbitMqRapids
    include RapidsRivers::RapidsConnection

    RAPIDS = 'rapids'

    def initialize(host_ip, port)
      host_ip = host_ip || ENV['RABBITMQ_IP'] || throw("Need IP address for RabbitMQ")
      port = port || ENV['RABBITMQ_PORT'] || 5672
      @connection = Bunny.new(
        :host => host_ip,
        :port => port.to_i,
        :automatically_recover => false)
    end

    def publish(packet)
      exchange.publish packet.to_json
    end

    def queue queue_name = ""
      channel.queue(queue_name || "", exclusive: true, auto_delete: true).tap do |queue|
        queue.bind exchange
      end
    end

    def close
      channel.close
      @connection.close
    end

    private

      def channel
        return @channel if @channel
        @connection.start
        @channel = @connection.create_channel
      end

      def exchange
        @exchange ||= channel.fanout(RAPIDS, durable: true, auto_delete: true)
      end

  end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rapids_rivers-0.2.14 lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb
rapids_rivers-0.2.9 lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb