Sha256: 0c78ef72548edb661e20ad2616f4c97e0fa63e6c3abc4388fc7a812700d714e5
Contents?: true
Size: 1.99 KB
Versions: 9
Compression:
Stored size: 1.99 KB
Contents
# frozen_string_literal: true # :reek:TooManyStatements module RubyRabbitmqJanus module Rabbit module Listener # Base for listeners class Base < RubyRabbitmqJanus::Rabbit::BaseEvent # Define an publisher # # @param [String] rabbit Information connection to rabbitmq server def initialize(rabbit) super() @responses = [] @rabbit = rabbit.channel subscribe_queue rescue raise Errors::Rabbit::Listener::Initialize end # Listen a queue and return a body response def listen_events semaphore.wait response = nil lock.synchronize do response = @responses.shift end yield response.event, response rescue => exception p exception raise Errors::Rabbit::Listener::ListenEvents end private def binding @rabbit.direct('amq.direct') end def opts_subs { block: false, manual_ack: true, arguments: { 'x-priority': 2 } } end # Counts transmitted messages def log_message_id(propertie) message_id = propertie.message_id Tools::Log.instance.info "[X] Message reading with ID #{message_id}" end # Sending an signal when an response is reading in queue def synchronize_response(info, payload) lock.synchronize do @responses.push(Janus::Responses::Event.new(JSON.parse(payload))) end @rabbit.acknowledge(info.delivery_tag, false) semaphore.signal end def info_subscribe(info, prop, payload) Tools::Log.instance.debug info Tools::Log.instance.info \ "[X] Message reading ##{prop['correlation_id']}" Tools::Log.instance.debug payload end end end end end require 'rrj/rabbit/listener/from' require 'rrj/rabbit/listener/from_admin' require 'rrj/rabbit/listener/janus_instance'
Version data entries
9 entries across 9 versions & 1 rubygems