Sha256: 5ab83e4ac4889b9c38fea5b022271c8fc080efaf7a68f2889e860e2b91f4200a

Contents?: true

Size: 1.34 KB

Versions: 1

Compression:

Stored size: 1.34 KB

Contents

module Cloudist
  class Request
    include Cloudist::Encoding

    attr_reader :queue_header, :qobj, :payload, :start, :headers, :body

    def initialize(queue, encoded_body, queue_header)
      @qobj, @queue_header = queue, queue_header

      @body = decode(encoded_body)
      @headers = parse_custom_headers(queue_header)

      @payload = Cloudist::Payload.new(encoded_body, queue_header.headers.dup)
      @headers = @payload.headers

      @start = Time.now.utc.to_f
    end

    def parse_custom_headers(amqp_headers)
      h = amqp_headers.headers.dup

      h[:published_on] = h[:published_on].to_i

      h[:ttl] = h[:ttl].to_i rescue -1
      h[:ttl] = -1 if h[:ttl] == 0

      h
    end

    def for_message
      [body.dup, queue_header.headers.dup]
    end

    def q
      qobj.queue
    end

    def ex
      qobj.exchange
    end

    def mq
      qobj.channel
    end

    def channel
      mq
    end

    def age
      return -1 unless headers[:published_on]
      start - headers[:published_on].to_f
    end

    def ttl
      headers[:ttl] || -1
    end

    def expired?
      return false if ttl == -1
      age > ttl
    end

    def acked?
      @acked == true
    end

    def ack
      return if acked?
      queue_header.ack
      @acked = true
    rescue AMQP::ChannelClosedError => e
      Cloudist.handle_error(e)
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
cloudist-0.5.0 lib/cloudist/request.rb