Sha256: f09c39bba3a6c262cb96e8f2713aaa5e0e966bd21b04646ede9f6ff06a1a9327

Contents?: true

Size: 1.53 KB

Versions: 3

Compression:

Stored size: 1.53 KB

Contents

# encoding: utf-8
require "logstash/codecs/base"
require "logstash/codecs/line"
require "logstash/json"

# This codec will decode the http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk format]
# into individual events, plus metadata into the `@metadata` field.
#
# Encoding is not supported at this time as the Elasticsearch
# output submits Logstash events in bulk format.
class LogStash::Codecs::ESBulk < LogStash::Codecs::Base
  config_name "es_bulk"

  public
  def initialize(params={})
    super(params)
    @lines = LogStash::Codecs::Line.new
    @lines.charset = "UTF-8"
  end

  public
  def decode(data)
    state = :initial
    metadata = Hash.new
    @lines.decode(data) do |bulk|
      begin
        line = LogStash::Json.load(bulk.get("message"))
        case state
        when :metadata
          event = LogStash::Event.new(line)
          event.set("@metadata", metadata)
          yield event
          state = :initial
        when :initial
          metadata = line[line.keys[0]]
          metadata["action"] = line.keys[0].to_s
          state = :metadata
          if line.keys[0] == 'delete'
            event = LogStash::Event.new()
            event.set("@metadata", metadata)
            yield event
            state = :initial
          end
        end
      rescue LogStash::Json::ParserError => e
        @logger.error("JSON parse failure. ES Bulk messages must in be UTF-8 JSON", :error => e, :data => data)
      end
    end
  end # def decode

end # class LogStash::Codecs::ESBulk

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
logstash-codec-es_bulk-3.0.2 lib/logstash/codecs/es_bulk.rb
logstash-codec-es_bulk-3.0.1 lib/logstash/codecs/es_bulk.rb
logstash-codec-es_bulk-3.0.0 lib/logstash/codecs/es_bulk.rb