Sha256: affa48d9c02b4f75cfbe1aee9393b8802ed52e283d361d7a665f3d369cf903c0

Contents?: true

Size: 1.45 KB

Versions: 2

Compression:

Stored size: 1.45 KB

Contents

# encoding: utf-8
require "logstash/codecs/base"
require "logstash/codecs/line"
require "logstash/json"

# This codec will decode the Elasticsearch bulk format into
# individual events, plus metadata into the `@metadata` field.
# 
# Encoding is not supported at this time as the Elasticsearch
# output submits Logstash events in bulk format.
class LogStash::Codecs::ESBulk < LogStash::Codecs::Base
  config_name "es_bulk"

  milestone 1

  public
  def initialize(params={})
    super(params)
    @lines = LogStash::Codecs::Line.new
    @lines.charset = "UTF-8"
  end

  public
  def decode(data)
    state = :initial
    metadata = Hash.new
    @lines.decode(data) do |bulk|
      begin
        line = LogStash::Json.load(bulk["message"])
        case state
        when :metadata
          event = LogStash::Event.new(line)
          event["@metadata"] = metadata
          yield event
          state = :initial
        when :initial
          metadata = line[line.keys[0]]
          metadata["action"] = line.keys[0].to_s
          state = :metadata
          if line.keys[0] == 'delete'
            event = LogStash::Event.new()
            event["@metadata"] = metadata
            yield event
            state = :initial
          end
        end
      rescue LogStash::Json::ParserError => e
        @logger.error("JSON parse failure. ES Bulk messages must in be UTF-8 JSON", :error => e, :data => data)
      end
    end
  end # def decode

end # class LogStash::Codecs::ESBulk

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
logstash-codec-es_bulk-0.1.2 lib/logstash/codecs/es_bulk.rb
logstash-codec-es_bulk-0.1.1 lib/logstash/codecs/es_bulk.rb