lib/rabbit_feed/event.rb in rabbit_feed-1.0.2 vs lib/rabbit_feed/event.rb in rabbit_feed-2.0.0
- old
+ new
@@ -1,43 +1,83 @@
module RabbitFeed
class Event
include ActiveModel::Validations
- attr_reader :schema, :payload
- validates_presence_of :schema, :payload
+ SCHEMA_VERSION = '2.0.0'
- def initialize schema, payload
- @schema = schema
- @payload = payload
+ attr_reader :schema, :payload, :metadata
+ validates :metadata, presence: true
+ validates :payload, length: { minimum: 0, allow_nil: false, message: 'can\'t be nil' }
+ validate :required_metadata
+
+ def initialize metadata, payload={}, schema=nil
+ @schema = schema
+ @payload = payload.with_indifferent_access if payload
+ @metadata = metadata.with_indifferent_access if metadata
validate!
end
def serialize
buffer = StringIO.new
writer = Avro::DataFile::Writer.new buffer, (Avro::IO::DatumWriter.new schema), schema
- writer << payload
+ writer << { 'metadata' => metadata, 'payload' => payload }
writer.close
buffer.string
end
- def self.deserialize event
- datum_reader = Avro::IO::DatumReader.new
- reader = Avro::DataFile::Reader.new (StringIO.new event), datum_reader
- payload = nil
- reader.each do |datum|
- payload = datum
- end
- reader.close
- Event.new datum_reader.readers_schema, payload
+ def application
+ metadata[:application]
end
- def method_missing(method_name, *args, &block)
- payload[method_name.to_s]
+ def name
+ metadata[:name]
end
+ def created_at_utc
+ (Time.iso8601 metadata[:created_at_utc]) if metadata[:created_at_utc].present?
+ end
+
+ class << self
+
+ def deserialize serialized_event
+ datum_reader = Avro::IO::DatumReader.new
+ reader = Avro::DataFile::Reader.new (StringIO.new serialized_event), datum_reader
+ event_hash = nil
+ reader.each do |datum|
+ event_hash = datum
+ end
+ reader.close
+ if (version_1? event_hash)
+ new_from_version_1 event_hash, datum_reader.readers_schema
+ else
+ new event_hash['metadata'], event_hash['payload'], datum_reader.readers_schema
+ end
+ end
+
+ private
+
+ def version_1? event_hash
+ %w(metadata payload).none?{|key| event_hash.has_key? key}
+ end
+
+ def new_from_version_1 metadata_and_payload, schema
+ metadata = {}
+ %w(application name host version environment created_at_utc).each do |field|
+ metadata[field] = metadata_and_payload.delete field
+ end
+ new metadata, metadata_and_payload, schema
+ end
+ end
+
private
def validate!
- raise Error.new errors.messages if invalid?
+ raise Error.new "Invalid event: #{errors.messages}" if invalid?
+ end
+
+ def required_metadata
+ if metadata
+ errors.add(:metadata, 'name field is required') if metadata[:name].blank?
+ end
end
end
end