lib/cloudist/payload.rb in cloudist-0.1.2 vs lib/cloudist/payload.rb in cloudist-0.2.0
- old
+ new
@@ -4,27 +4,28 @@
class Payload
include Utils
attr_reader :body, :publish_opts, :headers
- def initialize(data_hash_or_json, headers = {}, publish_opts = {})
+ def initialize(body, headers = {}, publish_opts = {})
@publish_opts, @headers = publish_opts, headers
@published = false
- data_hash_or_json = parse_message(data_hash_or_json) if data_hash_or_json.is_a?(String)
+ body = parse_message(body) if body.is_a?(String)
- raise Cloudist::BadPayload, "Expected Hash for payload" unless data_hash_or_json.is_a?(Hash)
+ # raise Cloudist::BadPayload, "Expected Hash for payload" unless body.is_a?(Hash)
- @body = HashWithIndifferentAccess.new(data_hash_or_json)
+ @body = body
+ # HashWithIndifferentAccess.new(body)
update_headers
end
# Return message formatted as JSON and headers ready for transport in array
def formatted
update_headers
- [body.to_json, publish_opts]
+ [encode_message(body), publish_opts]
end
def id
@id ||= event_hash.to_s
end
@@ -48,21 +49,21 @@
(publish_opts[:headers] ||= {}).merge!(headers)
end
def extract_custom_headers
raise StaleHeadersError, "Headers cannot be changed because payload has already been published" if published?
- headers[:published_on] ||= body.delete('published_on') || Time.now.utc.to_i
- headers[:ttl] ||= body.delete('ttl') || Cloudist::DEFAULT_TTL
+ headers[:published_on] ||= body.is_a?(Hash) && body.delete(:published_on) || Time.now.utc.to_i
+ headers[:ttl] ||= body.is_a?(Hash) && body.delete('ttl') || Cloudist::DEFAULT_TTL
# this is the event hash that gets transferred through various publish/reply actions
headers[:event_hash] ||= id
# this value should be unique for each published/received message pair
headers[:message_id] ||= id
# We use JSON for message transport exclusively
- headers[:content_type] ||= 'application/json'
+ # headers[:content_type] ||= 'application/json'
# headers[:headers][:message_type] = 'event'
# ||= body.delete('message_type') || 'reply'
# headers[:headers] = custom_headers
@@ -108,21 +109,22 @@
def message_type
headers[:message_type]
end
def event_hash
- @event_hash ||= headers[:event_hash] || body.delete('event_hash') || create_event_hash
+ @event_hash ||= headers[:event_hash] || body.is_a?(Hash) && body.delete(:event_hash) || create_event_hash
end
def create_event_hash
s = Time.now.to_s + object_id.to_s + rand(100).to_s
Digest::MD5.hexdigest(s)
end
def parse_message(raw)
- return { } unless raw
- decode_json(raw)
+ # return { } unless raw
+ # decode_json(raw)
+ decode_message(raw)
end
def [](key)
body[key]
end
@@ -133,9 +135,17 @@
def publish
return if published?
@published = true
freeze!
+ end
+
+ def method_missing(meth, *args, &blk)
+ if body.is_a?(Hash) && body.has_key?(meth)
+ return body[meth]
+ else
+ super
+ end
end
end
end
\ No newline at end of file