Sha256: 28f9afc5cf88527ebffabb07da0ce4e9d101737a3292e555403a24118988c3db

Contents?: true

Size: 1.36 KB

Versions: 4

Compression:

Stored size: 1.36 KB

Contents

module Cloudist
  class Request
    include Cloudist::Encoding
    
    attr_reader :queue_header, :qobj, :payload, :start, :headers, :body

    def initialize(queue, encoded_body, queue_header)
      @qobj, @queue_header = queue, queue_header

      @body = decode(encoded_body)
      @headers = parse_custom_headers(queue_header)
      
      @payload = Cloudist::Payload.new(encoded_body, queue_header.headers.dup)
      @headers = @payload.headers

      @start = Time.now.utc.to_f
    end
    
    def parse_custom_headers(amqp_headers)
      h = amqp_headers.headers.dup

      h[:published_on] = h[:published_on].to_i

      h[:ttl] = h[:ttl].to_i rescue -1
      h[:ttl] = -1 if h[:ttl] == 0

      h
    end
    
    def for_message
      [body.dup, queue_header.headers.dup]
    end

    def q
      qobj.queue
    end

    def ex
      qobj.exchange
    end

    def mq
      qobj.channel
    end
    
    def channel
      mq
    end

    def age
      return -1 unless headers[:published_on]
      start - headers[:published_on].to_f
    end

    def ttl
      headers[:ttl] || -1
    end

    def expired?
      return false if ttl == -1
      age > ttl
    end

    def acked?
      @acked == true
    end

    def ack
      return if acked?
      queue_header.ack
      @acked = true
    rescue AMQP::ChannelClosedError => e
      Cloudist.handle_error(e)
    end

  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
cloudist-0.4.4 lib/cloudist/request.rb
cloudist-0.4.3 lib/cloudist/request.rb
cloudist-0.4.2 lib/cloudist/request.rb
cloudist-0.4.1 lib/cloudist/request.rb