lib/rabbit_feed/event.rb in rabbit_feed-1.0.2 vs lib/rabbit_feed/event.rb in rabbit_feed-2.0.0

- old
+ new

@@ -1,43 +1,83 @@ module RabbitFeed class Event include ActiveModel::Validations - attr_reader :schema, :payload - validates_presence_of :schema, :payload + SCHEMA_VERSION = '2.0.0' - def initialize schema, payload - @schema = schema - @payload = payload + attr_reader :schema, :payload, :metadata + validates :metadata, presence: true + validates :payload, length: { minimum: 0, allow_nil: false, message: 'can\'t be nil' } + validate :required_metadata + + def initialize metadata, payload={}, schema=nil + @schema = schema + @payload = payload.with_indifferent_access if payload + @metadata = metadata.with_indifferent_access if metadata validate! end def serialize buffer = StringIO.new writer = Avro::DataFile::Writer.new buffer, (Avro::IO::DatumWriter.new schema), schema - writer << payload + writer << { 'metadata' => metadata, 'payload' => payload } writer.close buffer.string end - def self.deserialize event - datum_reader = Avro::IO::DatumReader.new - reader = Avro::DataFile::Reader.new (StringIO.new event), datum_reader - payload = nil - reader.each do |datum| - payload = datum - end - reader.close - Event.new datum_reader.readers_schema, payload + def application + metadata[:application] end - def method_missing(method_name, *args, &block) - payload[method_name.to_s] + def name + metadata[:name] end + def created_at_utc + (Time.iso8601 metadata[:created_at_utc]) if metadata[:created_at_utc].present? + end + + class << self + + def deserialize serialized_event + datum_reader = Avro::IO::DatumReader.new + reader = Avro::DataFile::Reader.new (StringIO.new serialized_event), datum_reader + event_hash = nil + reader.each do |datum| + event_hash = datum + end + reader.close + if (version_1? event_hash) + new_from_version_1 event_hash, datum_reader.readers_schema + else + new event_hash['metadata'], event_hash['payload'], datum_reader.readers_schema + end + end + + private + + def version_1? event_hash + %w(metadata payload).none?{|key| event_hash.has_key? key} + end + + def new_from_version_1 metadata_and_payload, schema + metadata = {} + %w(application name host version environment created_at_utc).each do |field| + metadata[field] = metadata_and_payload.delete field + end + new metadata, metadata_and_payload, schema + end + end + private def validate! - raise Error.new errors.messages if invalid? + raise Error.new "Invalid event: #{errors.messages}" if invalid? + end + + def required_metadata + if metadata + errors.add(:metadata, 'name field is required') if metadata[:name].blank? + end end end end