lib/cloudist/payload.rb in cloudist-0.1.2 vs lib/cloudist/payload.rb in cloudist-0.2.0

- old
+ new

@@ -4,27 +4,28 @@ class Payload include Utils attr_reader :body, :publish_opts, :headers - def initialize(data_hash_or_json, headers = {}, publish_opts = {}) + def initialize(body, headers = {}, publish_opts = {}) @publish_opts, @headers = publish_opts, headers @published = false - data_hash_or_json = parse_message(data_hash_or_json) if data_hash_or_json.is_a?(String) + body = parse_message(body) if body.is_a?(String) - raise Cloudist::BadPayload, "Expected Hash for payload" unless data_hash_or_json.is_a?(Hash) + # raise Cloudist::BadPayload, "Expected Hash for payload" unless body.is_a?(Hash) - @body = HashWithIndifferentAccess.new(data_hash_or_json) + @body = body + # HashWithIndifferentAccess.new(body) update_headers end # Return message formatted as JSON and headers ready for transport in array def formatted update_headers - [body.to_json, publish_opts] + [encode_message(body), publish_opts] end def id @id ||= event_hash.to_s end @@ -48,21 +49,21 @@ (publish_opts[:headers] ||= {}).merge!(headers) end def extract_custom_headers raise StaleHeadersError, "Headers cannot be changed because payload has already been published" if published? - headers[:published_on] ||= body.delete('published_on') || Time.now.utc.to_i - headers[:ttl] ||= body.delete('ttl') || Cloudist::DEFAULT_TTL + headers[:published_on] ||= body.is_a?(Hash) && body.delete(:published_on) || Time.now.utc.to_i + headers[:ttl] ||= body.is_a?(Hash) && body.delete('ttl') || Cloudist::DEFAULT_TTL # this is the event hash that gets transferred through various publish/reply actions headers[:event_hash] ||= id # this value should be unique for each published/received message pair headers[:message_id] ||= id # We use JSON for message transport exclusively - headers[:content_type] ||= 'application/json' + # headers[:content_type] ||= 'application/json' # headers[:headers][:message_type] = 'event' # ||= body.delete('message_type') || 'reply' # headers[:headers] = custom_headers @@ -108,21 +109,22 @@ def message_type headers[:message_type] end def event_hash - @event_hash ||= headers[:event_hash] || body.delete('event_hash') || create_event_hash + @event_hash ||= headers[:event_hash] || body.is_a?(Hash) && body.delete(:event_hash) || create_event_hash end def create_event_hash s = Time.now.to_s + object_id.to_s + rand(100).to_s Digest::MD5.hexdigest(s) end def parse_message(raw) - return { } unless raw - decode_json(raw) + # return { } unless raw + # decode_json(raw) + decode_message(raw) end def [](key) body[key] end @@ -133,9 +135,17 @@ def publish return if published? @published = true freeze! + end + + def method_missing(meth, *args, &blk) + if body.is_a?(Hash) && body.has_key?(meth) + return body[meth] + else + super + end end end end \ No newline at end of file