Sha256: 0df4c80c73991345825b49bb4870cd6a600d5d0432e2598cc9a19dc7a2b992df

Contents?: true

Size: 1.61 KB

Versions: 3

Compression:

Stored size: 1.61 KB

Contents

# frozen_string_literal: true

# :reek:TooManyStatements

module RubyRabbitmqJanus
  module Rabbit
    module Listener
      # Base for listeners
      class Base < RubyRabbitmqJanus::Rabbit::BaseEvent
        # Define an publisher
        #
        # @param [String] rabbit Information connection to rabbitmq server
        def initialize(rabbit)
          super()
          @rabbit = rabbit.channel
          subscribe_queue
        rescue
          raise Errors::Rabbit::Listener::Base::Initialize
        end

        # Listen a queue and return a body response
        def listen_events
          semaphore.wait
          response = nil
          lock.synchronize do
            response = responses.shift
          end
          yield response.event, response
        rescue
          raise Errors::Rabbit::Listener::Base::ListenEvents
        end

        private

        attr_accessor :rabbit, :responses

        def binding
          @rabbit.direct('amq.direct')
        end

        def opts_subs
          { block: false, manual_ack: true, arguments: { 'x-priority': 2 } }
        end

        # Counts transmitted messages
        def log_message_id(propertie)
          message_id = propertie.message_id
          ::Log.info "[X] Message reading with ID #{message_id}"
        end

        def info_subscribe(info, prop, payload)
          ::Log.debug info
          ::Log.info \
            "[X] Message reading ##{prop['correlation_id']}"
          ::Log.debug payload
        end
      end
    end
  end
end

require 'rrj/rabbit/listener/from'
require 'rrj/rabbit/listener/from_admin'
require 'rrj/rabbit/listener/janus_instance'

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
ruby_rabbitmq_janus-2.7.1 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.7.1.pre.274 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.7.0.pre.272 lib/rrj/rabbit/listener/base.rb