Sha256: 52676823a1865484b896600eefaba4dd90913ad5f563c5f8cd5ef45cb60c0181

Contents?: true

Size: 1020 Bytes

Versions: 1

Compression:

Stored size: 1020 Bytes

Contents

require_relative '../river'

require 'pry'
require 'pry-nav'

module RapidsRivers

  # Understands a filtered message stream based on RabbitMQ
  class RabbitMqRiver < RapidsRivers::River

    # alias_method :parent_register, :register
    def register service
      super
      begin
        @rapids_connection.publish startup_packet(service)
        queue(service).subscribe(:block => true)  do |delivery_info, metadata, payload|
          message @rapids_connection, payload
        end
      rescue Interrupt => _
        @rapids_connection.close
        exit(0)
      end
    end

    private

      def queue service
        @queue ||= @rapids_connection.queue service_name(service)
      end

      def startup_packet service
        RapidsRivers::Packet.new(
          system: 'log',
          log_severity: 'informational',
          event_type: 'service_state',
          service_state: 'starting',
          service_name: service_name(service) )
      end

  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rapids_rivers-0.2.14 lib/rapids_rivers/rabbit_mq/rabbit_mq_river.rb