lib/cloudist/payload.rb in cloudist-0.0.2 vs lib/cloudist/payload.rb in cloudist-0.0.3

- old
+ new

@@ -2,26 +2,29 @@ DEFAULT_TTL = 300 class Payload include Utils - attr_accessor :body, :headers + attr_reader :body, :publish_opts, :headers - def initialize(data_hash_or_json, headers = {}) + def initialize(data_hash_or_json, headers = {}, publish_opts = {}) + @publish_opts, @headers = publish_opts, headers + @published = false + data_hash_or_json = parse_message(data_hash_or_json) if data_hash_or_json.is_a?(String) raise Cloudist::BadPayload, "Expected Hash for payload" unless data_hash_or_json.is_a?(Hash) - @body, @headers = HashWithIndifferentAccess.new(data_hash_or_json), headers + @body = HashWithIndifferentAccess.new(data_hash_or_json) update_headers end + # Return message formatted as JSON and headers ready for transport in array def formatted - body, headers = apply_custom_headers - - # Return message formatted as JSON and headers ready for transport - [body.to_json, headers] + update_headers + + [body.to_json, publish_opts] end def id @id ||= event_hash.to_s end @@ -39,12 +42,16 @@ headers.freeze body.freeze end def update_headers + headers = extract_custom_headers + (publish_opts[:headers] ||= {}).merge!(headers) + end + + def extract_custom_headers raise StaleHeadersError, "Headers cannot be changed because payload has already been published" if published? - headers[:published_on] ||= body.delete('published_on') || Time.now.utc.to_i headers[:ttl] ||= body.delete('ttl') || Cloudist::DEFAULT_TTL # this is the event hash that gets transferred through various publish/reply actions headers[:event_hash] ||= id @@ -52,20 +59,22 @@ # this value should be unique for each published/received message pair headers[:message_id] ||= id # We use JSON for message transport exclusively headers[:content_type] ||= 'application/json' - + + # headers[:headers][:message_type] = 'event' + # ||= body.delete('message_type') || 'reply' + + # headers[:headers] = custom_headers + # some strange behavior with integers makes it better to # convert all amqp headers to strings to avoid any problems headers.each { |k,v| headers[k] = v.to_s } + + headers end - - def apply_custom_headers - update_headers - [body, headers] - end def parse_custom_headers return { } unless headers h = headers.dup @@ -105,10 +114,10 @@ Digest::MD5.hexdigest(s) end def parse_message(raw) return { } unless raw - JSON.parse(raw) + decode_json(raw) end def [](key) body[key] end \ No newline at end of file