Sha256: 465c1bfea994db7e45f9bc07010c14dc74b2d4a02756b8b80ea7d6008d2a6def

Contents?: true

Size: 1.08 KB

Versions: 3

Compression:

Stored size: 1.08 KB

Contents

# encoding: utf-8
require "open-uri"
require "avro"
require "logstash/codecs/base"
require "logstash/event"
require "logstash/timestamp"
require "logstash/util"

class LogStash::Codecs::Avro < LogStash::Codecs::Base
  config_name "avro"


  # schema path to fetch the schema from
  # This can be a 'http' or 'file' scheme URI
  # example:
  #     http - "http://example.com/schema.avsc"
  #     file - "/path/to/schema.avsc"
  config :schema_uri, :validate => :string, :required => true

  def open_and_read(uri_string)
    open(uri_string).read
  end

  public
  def register
    @schema = Avro::Schema.parse(open_and_read(schema_uri))
  end

  public
  def decode(data)
    datum = StringIO.new(data)
    decoder = Avro::IO::BinaryDecoder.new(datum)
    datum_reader = Avro::IO::DatumReader.new(@schema)
    yield LogStash::Event.new(datum_reader.read(decoder))
  end

  public
  def encode(event)
    dw = Avro::IO::DatumWriter.new(@schema)
    buffer = StringIO.new
    encoder = Avro::IO::BinaryEncoder.new(buffer)
    dw.write(event.to_hash, encoder)
    @on_event.call(buffer.string)
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
logstash-codec-avro-0.1.3 lib/logstash/codecs/avro.rb
logstash-codec-avro-0.1.2 lib/logstash/codecs/avro.rb
logstash-codec-avro-0.1.1 lib/logstash/codecs/avro.rb