Sha256: a5e9b22a24ad9b0e42267de91ef30bb59512cb6f1af79217161b5aa50fceb342
Contents?: true
Size: 645 Bytes
Versions: 1
Compression:
Stored size: 645 Bytes
Contents
require_relative '../river' module RapidsRivers # Understands a filtered message stream based on RabbitMQ class RabbitMqRiver < RapidsRivers::River alias_method :parent_register, :register def register service super begin queue(service).subscribe(:block => true) do |delivery_info, metadata, payload| message @rapids_connection, payload end rescue Interrupt => _ @rapids_connection.close exit(0) end end private def queue service @queue ||= @rapids_connection.queue service_name(service) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
rapids_rivers-0.2.9 | lib/rapids_rivers/rabbit_mq/rabbit_mq_river.rb |