Sha256: 0c78ef72548edb661e20ad2616f4c97e0fa63e6c3abc4388fc7a812700d714e5

Contents?: true

Size: 1.99 KB

Versions: 9

Compression:

Stored size: 1.99 KB

Contents

# frozen_string_literal: true

# :reek:TooManyStatements

module RubyRabbitmqJanus
  module Rabbit
    module Listener
      # Base for listeners
      class Base < RubyRabbitmqJanus::Rabbit::BaseEvent
        # Define an publisher
        #
        # @param [String] rabbit Information connection to rabbitmq server
        def initialize(rabbit)
          super()
          @responses = []
          @rabbit = rabbit.channel
          subscribe_queue
        rescue
          raise Errors::Rabbit::Listener::Initialize
        end

        # Listen a queue and return a body response
        def listen_events
          semaphore.wait
          response = nil
          lock.synchronize do
            response = @responses.shift
          end
          yield response.event, response
        rescue => exception
          p exception
          raise Errors::Rabbit::Listener::ListenEvents
        end

        private

        def binding
          @rabbit.direct('amq.direct')
        end

        def opts_subs
          { block: false, manual_ack: true, arguments: { 'x-priority': 2 } }
        end

        # Counts transmitted messages
        def log_message_id(propertie)
          message_id = propertie.message_id
          Tools::Log.instance.info "[X] Message reading with ID #{message_id}"
        end

        # Sending an signal when an response is reading in queue
        def synchronize_response(info, payload)
          lock.synchronize do
            @responses.push(Janus::Responses::Event.new(JSON.parse(payload)))
          end
          @rabbit.acknowledge(info.delivery_tag, false)
          semaphore.signal
        end

        def info_subscribe(info, prop, payload)
          Tools::Log.instance.debug info
          Tools::Log.instance.info \
            "[X] Message reading ##{prop['correlation_id']}"
          Tools::Log.instance.debug payload
        end
      end
    end
  end
end

require 'rrj/rabbit/listener/from'
require 'rrj/rabbit/listener/from_admin'
require 'rrj/rabbit/listener/janus_instance'

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
ruby_rabbitmq_janus-2.6.0.pre.244 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.6.0.pre.240 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.6.0.pre.239 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.6.0.pre.238 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.6.0.pre.233 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.6.0.pre.229 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.6.0.pre.228 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.6.0.pre.227 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.6.0.pre.226 lib/rrj/rabbit/listener/base.rb