Sha256: 26f308c321e4d8f644b5bec571af13e0558ec7e77bf2caa0f5dfcd6831b4b7f2

Contents?: true

Size: 1.34 KB

Versions: 3

Compression:

Stored size: 1.34 KB

Contents

# frozen_string_literal: true

# :reek:TooManyStatements

module RubyRabbitmqJanus
  module Rabbit
    module Listener
      # Base for listeners
      class Base < RubyRabbitmqJanus::Rabbit::BaseEvent
        # Define an publisher
        #
        # @param [String] rabbit Information connection to RabbitMQ server
        def initialize(rabbit)
          super()
          @rabbit = rabbit.channel_pool
          subscribe_queue
        rescue
          raise Errors::Rabbit::Listener::Base::Initialize
        end

        # Listen a queue and return a body response
        def listen_events
          semaphore.wait
          response = nil
          lock.synchronize do
            response = responses.shift
          end
          yield response.event, response
        rescue
          raise Errors::Rabbit::Listener::Base::ListenEvents
        end

        private

        attr_accessor :rabbit, :responses

        def binding
          @rabbit.direct('amq.direct')
        end

        def opts_subs
          { block: false, manual_ack: true, arguments: { 'x-priority': 2 } }
        end

        def info_subscribe(info, _prop, payload)
          ::Log.debug info
          ::Log.info '[X] Message reading'
          ::Log.info payload
        end
      end
    end
  end
end

require 'rrj/rabbit/listener/from'
require 'rrj/rabbit/listener/from_admin'

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
ruby_rabbitmq_janus-2.7.2.pre.320 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.7.2.pre.319 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-2.7.2.pre.318 lib/rrj/rabbit/listener/base.rb