Sha256: a5e9b22a24ad9b0e42267de91ef30bb59512cb6f1af79217161b5aa50fceb342

Contents?: true

Size: 645 Bytes

Versions: 1

Compression:

Stored size: 645 Bytes

Contents

require_relative '../river'

module RapidsRivers

  # Understands a filtered message stream based on RabbitMQ
  class RabbitMqRiver < RapidsRivers::River

    alias_method :parent_register, :register
    def register service
      super
      begin
        queue(service).subscribe(:block => true)  do |delivery_info, metadata, payload|
          message @rapids_connection, payload
        end
      rescue Interrupt => _
        @rapids_connection.close
        exit(0)
      end
    end

    private

      def queue service
        @queue ||= @rapids_connection.queue service_name(service)
      end

  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rapids_rivers-0.2.9 lib/rapids_rivers/rabbit_mq/rabbit_mq_river.rb