Sha256: f51956268dbf72d60e1c41e12553c2d282136122c95159344cf0df1d31f00417

Contents?: true

Size: 1.93 KB

Versions: 2

Compression:

Stored size: 1.93 KB

Contents

# encoding: utf-8
require "open-uri"
require "avro"
require "logstash/codecs/base"
require "logstash/event"
require "logstash/timestamp"
require "logstash/util"

# Read serialized Avro records as Logstash events
#
# This plugin is used to serialize Logstash events as 
# Avro datums, as well as deserializing Avro datums into 
# Logstash events.
#
# ==== Encoding
# 
# This codec is for serializing individual Logstash events 
# as Avro datums that are Avro binary blobs. It does not encode 
# Logstash events into an Avro file.
#
#
# ==== Decoding
#
# This codec is for deserializing individual Avro records. It is not for reading
# Avro files. Avro files have a unique format that must be handled upon input.
#
#
# ==== Usage
# Example usage with Kafka input.
#
# [source,ruby]
# ----------------------------------
# input {
#   kafka {
#     codec => {
#       avro => {
#         schema_uri => "/tmp/schema.avsc"
#       }
#     }
#   }
# }
# filter {
#   ...
# }
# output {
#   ...
# }
# ----------------------------------
class LogStash::Codecs::Avro < LogStash::Codecs::Base
  config_name "avro"


  # schema path to fetch the schema from.
  # This can be a 'http' or 'file' scheme URI
  # example:
  #
  # * http - `http://example.com/schema.avsc`
  # * file - `/path/to/schema.avsc`
  config :schema_uri, :validate => :string, :required => true

  def open_and_read(uri_string)
    open(uri_string).read
  end

  public
  def register
    @schema = Avro::Schema.parse(open_and_read(schema_uri))
  end

  public
  def decode(data)
    datum = StringIO.new(data)
    decoder = Avro::IO::BinaryDecoder.new(datum)
    datum_reader = Avro::IO::DatumReader.new(@schema)
    yield LogStash::Event.new(datum_reader.read(decoder))
  end

  public
  def encode(event)
    dw = Avro::IO::DatumWriter.new(@schema)
    buffer = StringIO.new
    encoder = Avro::IO::BinaryEncoder.new(buffer)
    dw.write(event.to_hash, encoder)
    @on_event.call(event, buffer.string)
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
logstash-codec-avro-2.0.2 lib/logstash/codecs/avro.rb
logstash-codec-avro-2.0.1 lib/logstash/codecs/avro.rb