Sha256: 52676823a1865484b896600eefaba4dd90913ad5f563c5f8cd5ef45cb60c0181
Contents?: true
Size: 1020 Bytes
Versions: 1
Compression:
Stored size: 1020 Bytes
Contents
require_relative '../river' require 'pry' require 'pry-nav' module RapidsRivers # Understands a filtered message stream based on RabbitMQ class RabbitMqRiver < RapidsRivers::River # alias_method :parent_register, :register def register service super begin @rapids_connection.publish startup_packet(service) queue(service).subscribe(:block => true) do |delivery_info, metadata, payload| message @rapids_connection, payload end rescue Interrupt => _ @rapids_connection.close exit(0) end end private def queue service @queue ||= @rapids_connection.queue service_name(service) end def startup_packet service RapidsRivers::Packet.new( system: 'log', log_severity: 'informational', event_type: 'service_state', service_state: 'starting', service_name: service_name(service) ) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
rapids_rivers-0.2.14 | lib/rapids_rivers/rabbit_mq/rabbit_mq_river.rb |