lib/cloudist/payload.rb in cloudist-0.2.1 vs lib/cloudist/payload.rb in cloudist-0.4.1
- old
+ new
@@ -1,148 +1,88 @@
module Cloudist
- DEFAULT_TTL = 300
-
class Payload
include Utils
+ include Encoding
- attr_reader :body, :publish_opts, :headers
-
- def initialize(body, headers = {}, publish_opts = {})
- @publish_opts, @headers = publish_opts, headers
+ attr_reader :body, :headers, :amqp_headers, :timestamp
+
+ def initialize(body, headers = {})
@published = false
+ @timestamp = Time.now.to_f
- body = parse_message(body) if body.is_a?(String)
+ body = decode(body) if body.is_a?(String)
+ @body = Hashie::Mash.new(decode(body))
+ @headers = Hashie::Mash.new(headers)
+ @amqp_headers = {}
+ # puts "Initialised Payload: #{id}"
- # raise Cloudist::BadPayload, "Expected Hash for payload" unless body.is_a?(Hash)
-
- @body = body
- # HashWithIndifferentAccess.new(body)
- update_headers
+ parse_headers!
end
-
- # Return message formatted as JSON and headers ready for transport in array
- def formatted
- update_headers
-
- [encode_message(body), publish_opts]
+
+ def find_or_create_id
+ if headers["message_id"].present?
+ headers.message_id
+ else
+ UUID.generate
+ end
end
def id
- @id ||= event_hash.to_s
+ find_or_create_id
end
- def id=(new_id)
- @id = new_id.to_s
- update_headers
+ def to_a
+ [encode(body), {:headers => encoded_headers}]
end
- def frozen?
- headers.frozen?
- end
-
- def freeze!
- headers.freeze
- body.freeze
- end
-
- def update_headers
- headers = extract_custom_headers
- (publish_opts[:headers] ||= {}).merge!(headers)
- end
-
- def extract_custom_headers
- raise StaleHeadersError, "Headers cannot be changed because payload has already been published" if published?
- headers[:published_on] ||= body.is_a?(Hash) && body.delete(:published_on) || Time.now.utc.to_i
- headers[:ttl] ||= body.is_a?(Hash) && body.delete('ttl') || Cloudist::DEFAULT_TTL
-
- # this is the event hash that gets transferred through various publish/reply actions
- headers[:event_hash] ||= id
-
- # this value should be unique for each published/received message pair
- headers[:message_id] ||= id
+ def parse_headers!
+ headers[:published_on] ||= body.delete("timestamp") || timestamp
+ headers[:message_type] ||= body.delete("message_type") || 'reply'
- # We use JSON for message transport exclusively
- # headers[:content_type] ||= 'application/json'
+ headers[:ttl] ||= Cloudist::DEFAULT_TTL
+ headers[:message_id] = id
- # headers[:headers][:message_type] = 'event'
- # ||= body.delete('message_type') || 'reply'
+ headers[:published_on] = headers[:published_on].to_f
- # headers[:headers] = custom_headers
+ headers[:ttl] = headers[:ttl].to_i rescue -1
+ headers[:ttl] = -1 if headers[:ttl] == 0
- # some strange behavior with integers makes it better to
- # convert all amqp headers to strings to avoid any problems
- headers.each { |k,v| headers[k] = v.to_s }
+ # If this payload was received with a timestamp,
+ # we don't want to override it on #timestamp
+ if timestamp > headers[:published_on]
+ @timestamp = headers[:published_on]
+ end
headers
end
-
- def parse_custom_headers
- return { } unless headers
-
+
+ def encoded_headers
h = headers.dup
-
- h[:published_on] = h[:published_on].to_i
-
- h[:ttl] = h[:ttl].to_i rescue -1
- h[:ttl] = -1 if h[:ttl] == 0
-
- h
+ h.each { |k,v| h[k] = v.to_s }
+ return h
end
def set_reply_to(queue_name)
- headers[:reply_to] = reply_name(queue_name)
- set_master_queue_name(queue_name)
+ headers[:reply_to] = reply_prefix(queue_name)
end
- def set_master_queue_name(queue_name)
- headers[:master_queue] = queue_name
- end
-
- def reply_name(queue_name)
- # "#{queue_name}.#{id}"
- Utils.reply_prefix(queue_name)
- end
-
def reply_to
- headers[:reply_to]
+ headers.reply_to
end
def message_type
- headers[:message_type]
+ headers.message_type
end
- def event_hash
- @event_hash ||= headers[:event_hash] || body.is_a?(Hash) && body.delete(:event_hash) || create_event_hash
- end
-
- def create_event_hash
- s = Time.now.to_s + object_id.to_s + rand(100).to_s
- Digest::MD5.hexdigest(s)
- end
-
- def parse_message(raw)
- # return { } unless raw
- # decode_json(raw)
- decode_message(raw)
- end
-
def [](key)
- body[key]
+ self.body[key.to_s]
end
- def published?
- @published == true
- end
-
- def publish
- return if published?
- @published = true
- freeze!
- end
-
def method_missing(meth, *args, &blk)
- if body.is_a?(Hash) && body.has_key?(meth)
+ if body.has_key?(meth.to_s)
return body[meth]
+ elsif key = meth.to_s.match(/(.+)(?:\?$)/).to_a.last
+ body.has_key?(key.to_s)
else
super
end
end
\ No newline at end of file