Sha256: fe5579a53c54b4869714d62c09019a002c97472cf11b0ae2af86b8c2321a120c

Contents?: true

Size: 1.95 KB

Versions: 13

Compression:

Stored size: 1.95 KB

Contents

require 'avro_turf/messaging'
require 'avromatic/io'

module Avromatic
  # Subclass AvroTurf::Messaging to use a custom DatumReader and DatumWriter
  class Messaging < AvroTurf::Messaging
    attr_reader :registry

    def decode(data, schema_name: nil, namespace: @namespace)
      readers_schema = schema_name && @schema_store.find(schema_name, namespace)
      stream = StringIO.new(data)
      decoder = Avro::IO::BinaryDecoder.new(stream)

      # The first byte is MAGIC!!!
      magic_byte = decoder.read(1)

      if magic_byte != MAGIC_BYTE
        raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`"
      end

      # The schema id is a 4-byte big-endian integer.
      schema_id = decoder.read(4).unpack('N').first

      writers_schema = @schemas_by_id.fetch(schema_id) do
        schema_json = @registry.fetch(schema_id)
        @schemas_by_id[schema_id] = Avro::Schema.parse(schema_json)
      end

      # The following line differs from the parent class to use a custom DatumReader
      reader = Avromatic::IO::DatumReader.new(writers_schema, readers_schema)
      reader.read(decoder)
    end

    def encode(message, schema_name: nil, namespace: @namespace, subject: nil)
      schema = @schema_store.find(schema_name, namespace)

      # Schemas are registered under the full name of the top level Avro record
      # type, or `subject` if it's provided.
      schema_id = @registry.register(subject || schema.fullname, schema)

      stream = StringIO.new
      encoder = Avro::IO::BinaryEncoder.new(stream)

      # Always start with the magic byte.
      encoder.write(MAGIC_BYTE)

      # The schema id is encoded as a 4-byte big-endian integer.
      encoder.write([schema_id].pack('N'))

      # The following line differs from the parent class to use a custom DatumWriter
      writer = Avromatic::IO::DatumWriter.new(schema)

      # The actual message comes last.
      writer.write(message, encoder)

      stream.string
    end
  end
end

Version data entries

13 entries across 13 versions & 1 rubygems

Version Path
avromatic-1.0.0 lib/avromatic/messaging.rb
avromatic-0.33.0 lib/avromatic/messaging.rb
avromatic-0.32.0 lib/avromatic/messaging.rb
avromatic-0.32.0.rc0 lib/avromatic/messaging.rb
avromatic-0.31.0 lib/avromatic/messaging.rb
avromatic-0.30.0 lib/avromatic/messaging.rb
avromatic-0.29.1 lib/avromatic/messaging.rb
avromatic-0.29.0 lib/avromatic/messaging.rb
avromatic-0.28.1 lib/avromatic/messaging.rb
avromatic-0.27.0 lib/avromatic/messaging.rb
avromatic-0.26.0 lib/avromatic/messaging.rb
avromatic-0.25.0 lib/avromatic/messaging.rb
avromatic-0.24.0 lib/avromatic/messaging.rb