Sha256: 5c025ae66f902967f8ab949222577510d693d4aa89838eb06247da3e8dd4ce12

Contents?: true

Size: 1.41 KB

Versions: 2

Compression:

Stored size: 1.41 KB

Contents

module Asynk
  class SyncPublisher
    def initialize(routing_key, params)
      @routing_key    = routing_key
      @params         = params
      @message_id     = (@params.delete(:message_id) || generate_message_id)
      @wait_timeout   = (@params.delete(:timeout) || Asynk.config[:sync_publish_wait_timeout]) * 1000
      @correlation_id = generate_message_id
    end

    def send
      global_start_time = Asynk::Benchmark.start if Asynk.config[:publisher_execution_time]
      Asynk.broker.pool.with do |channel, exchange, reply_queue|
        exchange.publish(@params.to_json, message_id: @message_id, routing_key: @routing_key, correlation_id: @correlation_id, reply_to: reply_queue.name)

        start_time = Asynk::Benchmark.start
        while !@response do
          delivery_info, properties, payload = reply_queue.pop
          @response = payload if payload && properties[:correlation_id] == @correlation_id
          raise(RuntimeError.new('Timeout error reached')) if @wait_timeout <= Asynk::Benchmark.end(start_time)
        end
      end

      message = Asynk::Response.try_to_create_from_hash(@response)
      if Asynk.config[:publisher_execution_time]
        Asynk.logger.info "Sending sync message to #{@routing_key}:#{@message_id}. Completed In: #{Asynk::Benchmark.end(global_start_time)} ms."
      end

      message
    end

    def generate_message_id(legnth = 8)
      SecureRandom.hex(legnth)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
asynk-0.0.2 lib/asynk/sync_publisher.rb
asynk-0.0.1 lib/asynk/sync_publisher.rb