lib/cloudist/payload.rb in cloudist-0.0.2 vs lib/cloudist/payload.rb in cloudist-0.0.3
- old
+ new
@@ -2,26 +2,29 @@
DEFAULT_TTL = 300
class Payload
include Utils
- attr_accessor :body, :headers
+ attr_reader :body, :publish_opts, :headers
- def initialize(data_hash_or_json, headers = {})
+ def initialize(data_hash_or_json, headers = {}, publish_opts = {})
+ @publish_opts, @headers = publish_opts, headers
+ @published = false
+
data_hash_or_json = parse_message(data_hash_or_json) if data_hash_or_json.is_a?(String)
raise Cloudist::BadPayload, "Expected Hash for payload" unless data_hash_or_json.is_a?(Hash)
- @body, @headers = HashWithIndifferentAccess.new(data_hash_or_json), headers
+ @body = HashWithIndifferentAccess.new(data_hash_or_json)
update_headers
end
+ # Return message formatted as JSON and headers ready for transport in array
def formatted
- body, headers = apply_custom_headers
-
- # Return message formatted as JSON and headers ready for transport
- [body.to_json, headers]
+ update_headers
+
+ [body.to_json, publish_opts]
end
def id
@id ||= event_hash.to_s
end
@@ -39,12 +42,16 @@
headers.freeze
body.freeze
end
def update_headers
+ headers = extract_custom_headers
+ (publish_opts[:headers] ||= {}).merge!(headers)
+ end
+
+ def extract_custom_headers
raise StaleHeadersError, "Headers cannot be changed because payload has already been published" if published?
-
headers[:published_on] ||= body.delete('published_on') || Time.now.utc.to_i
headers[:ttl] ||= body.delete('ttl') || Cloudist::DEFAULT_TTL
# this is the event hash that gets transferred through various publish/reply actions
headers[:event_hash] ||= id
@@ -52,20 +59,22 @@
# this value should be unique for each published/received message pair
headers[:message_id] ||= id
# We use JSON for message transport exclusively
headers[:content_type] ||= 'application/json'
-
+
+ # headers[:headers][:message_type] = 'event'
+ # ||= body.delete('message_type') || 'reply'
+
+ # headers[:headers] = custom_headers
+
# some strange behavior with integers makes it better to
# convert all amqp headers to strings to avoid any problems
headers.each { |k,v| headers[k] = v.to_s }
+
+ headers
end
-
- def apply_custom_headers
- update_headers
- [body, headers]
- end
def parse_custom_headers
return { } unless headers
h = headers.dup
@@ -105,10 +114,10 @@
Digest::MD5.hexdigest(s)
end
def parse_message(raw)
return { } unless raw
- JSON.parse(raw)
+ decode_json(raw)
end
def [](key)
body[key]
end
\ No newline at end of file