Sha256: 5410b3e82ceb23a42b6b41565e699c953a1d24c03b2b375a774db8a4277337c5

Contents?: true

Size: 1.9 KB

Versions: 26

Compression:

Stored size: 1.9 KB

Contents

# frozen_string_literal: true

# :reek:NilCheck

module RubyRabbitmqJanus
  module Rabbit
    module Listener
      # Base for listeners
      class Base < RubyRabbitmqJanus::Rabbit::BaseEvent
        # Define an publisher
        #
        # @param [String] rabbit Information connection to RabbitMQ server
        def initialize(rabbit)
          super()
          @rabbit = rabbit.channel
          subscribe_queue
        end

        # Listen a queue and return a body response
        def listen_events
          semaphore.wait
          response = nil
          lock.synchronize do
            response = responses.shift
            check(response)
          end
          yield response.event, response
        end

        private

        attr_accessor :rabbit, :responses

        def binding
          @rabbit.direct('amq.direct')
        end

        def opts_subs
          { block: false, manual_ack: false, arguments: { 'x-priority': 2 } }
        end

        def info_subscribe(info, _prop, payload)
          ::Log.debug info
          ::Log.debug '[X] Message reading'
          ::Log.info payload
        end

        def check(response)
          raise Errors::Rabbit::Listener::ResponseNil, response \
            if response.nil?
          raise Errors::Rabbit::Listener::ResponseEmpty, response \
            if response.to_hash.size.zero?
        end

        def subscribe_queue
          rabbit.prefetch(1)
          reply.bind(binding).subscribe(opts_subs) do |info, prop, payload|
            info_subscribe(info, prop, payload)
            synchronize_response(payload)
          end
        end

        def synchronize_response(payload)
          lock.synchronize do
            response = response_class(payload)
            responses.push(response)
          end
          semaphore.signal
        end
      end
    end
  end
end

require 'rrj/rabbit/listener/from'
require 'rrj/rabbit/listener/from_admin'

Version data entries

26 entries across 26 versions & 1 rubygems

Version Path
ruby_rabbitmq_janus-4.0.1 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.1.pre.1265973744 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.1.pre.1265506307 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.1.pre.1265140558 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.1001345090 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.1001181479 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.1001132533 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.1001107243 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.1001043172 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.949167646 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.946892338 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.946565704 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.939149205 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.939137013 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.939133437 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.939119110 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.939113634 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-4.0.0.pre.850041590 lib/rrj/rabbit/listener/base.rb
ruby_rabbitmq_janus-3.0.1 lib/rrj/rabbit/listener/base.rb