lib/cloudist/payload.rb in cloudist-0.2.1 vs lib/cloudist/payload.rb in cloudist-0.4.1

- old
+ new

@@ -1,148 +1,88 @@ module Cloudist - DEFAULT_TTL = 300 - class Payload include Utils + include Encoding - attr_reader :body, :publish_opts, :headers - - def initialize(body, headers = {}, publish_opts = {}) - @publish_opts, @headers = publish_opts, headers + attr_reader :body, :headers, :amqp_headers, :timestamp + + def initialize(body, headers = {}) @published = false + @timestamp = Time.now.to_f - body = parse_message(body) if body.is_a?(String) + body = decode(body) if body.is_a?(String) + @body = Hashie::Mash.new(decode(body)) + @headers = Hashie::Mash.new(headers) + @amqp_headers = {} + # puts "Initialised Payload: #{id}" - # raise Cloudist::BadPayload, "Expected Hash for payload" unless body.is_a?(Hash) - - @body = body - # HashWithIndifferentAccess.new(body) - update_headers + parse_headers! end - - # Return message formatted as JSON and headers ready for transport in array - def formatted - update_headers - - [encode_message(body), publish_opts] + + def find_or_create_id + if headers["message_id"].present? + headers.message_id + else + UUID.generate + end end def id - @id ||= event_hash.to_s + find_or_create_id end - def id=(new_id) - @id = new_id.to_s - update_headers + def to_a + [encode(body), {:headers => encoded_headers}] end - def frozen? - headers.frozen? - end - - def freeze! - headers.freeze - body.freeze - end - - def update_headers - headers = extract_custom_headers - (publish_opts[:headers] ||= {}).merge!(headers) - end - - def extract_custom_headers - raise StaleHeadersError, "Headers cannot be changed because payload has already been published" if published? - headers[:published_on] ||= body.is_a?(Hash) && body.delete(:published_on) || Time.now.utc.to_i - headers[:ttl] ||= body.is_a?(Hash) && body.delete('ttl') || Cloudist::DEFAULT_TTL - - # this is the event hash that gets transferred through various publish/reply actions - headers[:event_hash] ||= id - - # this value should be unique for each published/received message pair - headers[:message_id] ||= id + def parse_headers! + headers[:published_on] ||= body.delete("timestamp") || timestamp + headers[:message_type] ||= body.delete("message_type") || 'reply' - # We use JSON for message transport exclusively - # headers[:content_type] ||= 'application/json' + headers[:ttl] ||= Cloudist::DEFAULT_TTL + headers[:message_id] = id - # headers[:headers][:message_type] = 'event' - # ||= body.delete('message_type') || 'reply' + headers[:published_on] = headers[:published_on].to_f - # headers[:headers] = custom_headers + headers[:ttl] = headers[:ttl].to_i rescue -1 + headers[:ttl] = -1 if headers[:ttl] == 0 - # some strange behavior with integers makes it better to - # convert all amqp headers to strings to avoid any problems - headers.each { |k,v| headers[k] = v.to_s } + # If this payload was received with a timestamp, + # we don't want to override it on #timestamp + if timestamp > headers[:published_on] + @timestamp = headers[:published_on] + end headers end - - def parse_custom_headers - return { } unless headers - + + def encoded_headers h = headers.dup - - h[:published_on] = h[:published_on].to_i - - h[:ttl] = h[:ttl].to_i rescue -1 - h[:ttl] = -1 if h[:ttl] == 0 - - h + h.each { |k,v| h[k] = v.to_s } + return h end def set_reply_to(queue_name) - headers[:reply_to] = reply_name(queue_name) - set_master_queue_name(queue_name) + headers[:reply_to] = reply_prefix(queue_name) end - def set_master_queue_name(queue_name) - headers[:master_queue] = queue_name - end - - def reply_name(queue_name) - # "#{queue_name}.#{id}" - Utils.reply_prefix(queue_name) - end - def reply_to - headers[:reply_to] + headers.reply_to end def message_type - headers[:message_type] + headers.message_type end - def event_hash - @event_hash ||= headers[:event_hash] || body.is_a?(Hash) && body.delete(:event_hash) || create_event_hash - end - - def create_event_hash - s = Time.now.to_s + object_id.to_s + rand(100).to_s - Digest::MD5.hexdigest(s) - end - - def parse_message(raw) - # return { } unless raw - # decode_json(raw) - decode_message(raw) - end - def [](key) - body[key] + self.body[key.to_s] end - def published? - @published == true - end - - def publish - return if published? - @published = true - freeze! - end - def method_missing(meth, *args, &blk) - if body.is_a?(Hash) && body.has_key?(meth) + if body.has_key?(meth.to_s) return body[meth] + elsif key = meth.to_s.match(/(.+)(?:\?$)/).to_a.last + body.has_key?(key.to_s) else super end end \ No newline at end of file