Sha256: b9a8fc0250452eaf197be2d18f2a192cb226cc10e8ff99fa411fc628e9a25fe0

Contents?: true

Size: 2 KB

Versions: 3

Compression:

Stored size: 2 KB

Contents

# frozen_string_literal: true

module RubyRabbitmqJanus
  module Rabbit
    module Publisher
      # @author VAILLANT Jeremy <jeremy.vaillant@dazzl.tv>
      #
      # This publisher don't post message. Is listen just an standard queue to
      # Janus. By default is "from-janus". It's a parameter in config to this
      # gem.
      class Listener < BasePublisher
        # Define an publisher
        #
        # @param [String] rabbit Information connection to rabbitmq server
        def initialize(rabbit)
          super()
          @responses = []
          @rabbit = rabbit.channel
          subscribe_queue
        end

        # Listen a queue and return a body response
        def listen_events
          semaphore.wait
          response = nil
          lock.synchronize do
            response = @responses.shift
          end
          yield response.event, response
        end

        private

        def subscribe_queue
          reply = @rabbit.queue(Tools::Config.instance.queue_from)
          @rabbit.prefetch(1)
          reply.bind(binding).subscribe(opts_subs) do |info, prop, payload|
            Tools::Log.instance.info \
              "[X] Message reading ##{prop['correlation_id']}"
            synchronize_response(info, payload)
          end
        end

        def binding
          @rabbit.direct('amq.direct')
        end

        def opts_subs
          { block: false, manual_ack: true, arguments: { 'x-priority': 2 } }
        end

        # Counts transmitted messages
        def log_message_id(propertie)
          message_id = propertie.message_id
          Tools::Log.instance.info "[X] Message reading with ID #{message_id}"
        end

        # Sending an signal when an response is reading in queue
        def synchronize_response(info, payload)
          lock.synchronize do
            @responses.push(Janus::Responses::Event.new(JSON.parse(payload)))
          end
          @rabbit.acknowledge(info.delivery_tag, false)
          semaphore.signal
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
ruby_rabbitmq_janus-2.0.0.pre.108 lib/rrj/rabbit/publish/listener.rb
ruby_rabbitmq_janus-2.0.0 lib/rrj/rabbit/publish/listener.rb
ruby_rabbitmq_janus-2.0.0.pre.106 lib/rrj/rabbit/publish/listener.rb