lib/cloudist/payload_old.rb in cloudist-0.4.4 vs lib/cloudist/payload_old.rb in cloudist-0.5.0
- old
+ new
@@ -1,80 +1,80 @@
module Cloudist
class Payload
include Utils
-
+
attr_reader :body, :publish_opts, :headers, :timestamp
def initialize(body, headers = {}, publish_opts = {})
@publish_opts, @headers = publish_opts, Hashie::Mash.new(headers)
@published = false
-
+
body = parse_message(body) if body.is_a?(String)
-
+
# raise Cloudist::BadPayload, "Expected Hash for payload" unless body.is_a?(Hash)
-
+
@timestamp = Time.now.to_f
-
+
@body = body
# Hashie::Mash.new(body)
-
+
update_headers
end
# Return message formatted as JSON and headers ready for transport in array
def formatted
update_headers
-
+
[encode_message(body), publish_opts]
end
-
+
def id
@id ||= event_hash.to_s
end
-
+
def id=(new_id)
@id = new_id.to_s
update_headers
end
-
+
def frozen?
headers.frozen?
end
-
+
def freeze!
headers.freeze
body.freeze
end
-
+
def update_headers
headers = extract_custom_headers
(publish_opts[:headers] ||= {}).merge!(headers)
end
-
+
def extract_custom_headers
raise StaleHeadersError, "Headers cannot be changed because payload has already been published" if published?
headers[:published_on] ||= body.is_a?(Hash) && body.delete(:published_on) || Time.now.utc.to_i
headers[:ttl] ||= body.is_a?(Hash) && body.delete('ttl') || Cloudist::DEFAULT_TTL
headers[:timestamp] = timestamp
# this is the event hash that gets transferred through various publish/reply actions
headers[:event_hash] ||= id
# this value should be unique for each published/received message pair
headers[:message_id] ||= id
-
+
# We use JSON for message transport exclusively
# headers[:content_type] ||= 'application/json'
-
+
# headers[:headers][:message_type] = 'event'
# ||= body.delete('message_type') || 'reply'
-
+
# headers[:headers] = custom_headers
-
+
# some strange behavior with integers makes it better to
# convert all amqp headers to strings to avoid any problems
headers.each { |k,v| headers[k] = v.to_s }
-
+
headers
end
def parse_custom_headers
return { } unless headers
@@ -86,70 +86,70 @@
h[:ttl] = h[:ttl].to_i rescue -1
h[:ttl] = -1 if h[:ttl] == 0
h
end
-
+
def set_reply_to(queue_name)
headers["reply_to"] = reply_name(queue_name)
set_master_queue_name(queue_name)
end
-
+
def set_master_queue_name(queue_name)
- headers[:master_queue] = queue_name
+ headers[:master_queue] = queue_name
end
-
+
def reply_name(queue_name)
# "#{queue_name}.#{id}"
Utils.reply_prefix(queue_name)
end
-
+
def reply_to
headers["reply_to"]
end
-
+
def message_type
headers["message_type"]
end
-
+
def event_hash
@event_hash ||= headers["event_hash"] || create_event_hash
end
-
+
def create_event_hash
# s = Time.now.to_s + object_id.to_s + rand(100).to_s
# Digest::MD5.hexdigest(s)
UUID.generate
end
-
+
def parse_message(raw)
# return { } unless raw
# decode_json(raw)
decode_message(raw)
end
-
+
def [](key)
body[key]
end
-
+
def published?
@published == true
end
-
+
def publish
return if published?
@published = true
freeze!
end
-
+
def method_missing(meth, *args, &blk)
if body.is_a?(Hash) && body.has_key?(meth)
return body[meth]
elsif key = meth.to_s.match(/(.+)(?:\?$)/).to_a.last
body.has_key?(key.to_sym)
else
super
end
end
-
+
end
-end
\ No newline at end of file
+end