Sha256: f09c39bba3a6c262cb96e8f2713aaa5e0e966bd21b04646ede9f6ff06a1a9327
Contents?: true
Size: 1.53 KB
Versions: 3
Compression:
Stored size: 1.53 KB
Contents
# encoding: utf-8 require "logstash/codecs/base" require "logstash/codecs/line" require "logstash/json" # This codec will decode the http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk format] # into individual events, plus metadata into the `@metadata` field. # # Encoding is not supported at this time as the Elasticsearch # output submits Logstash events in bulk format. class LogStash::Codecs::ESBulk < LogStash::Codecs::Base config_name "es_bulk" public def initialize(params={}) super(params) @lines = LogStash::Codecs::Line.new @lines.charset = "UTF-8" end public def decode(data) state = :initial metadata = Hash.new @lines.decode(data) do |bulk| begin line = LogStash::Json.load(bulk.get("message")) case state when :metadata event = LogStash::Event.new(line) event.set("@metadata", metadata) yield event state = :initial when :initial metadata = line[line.keys[0]] metadata["action"] = line.keys[0].to_s state = :metadata if line.keys[0] == 'delete' event = LogStash::Event.new() event.set("@metadata", metadata) yield event state = :initial end end rescue LogStash::Json::ParserError => e @logger.error("JSON parse failure. ES Bulk messages must in be UTF-8 JSON", :error => e, :data => data) end end end # def decode end # class LogStash::Codecs::ESBulk
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
logstash-codec-es_bulk-3.0.2 | lib/logstash/codecs/es_bulk.rb |
logstash-codec-es_bulk-3.0.1 | lib/logstash/codecs/es_bulk.rb |
logstash-codec-es_bulk-3.0.0 | lib/logstash/codecs/es_bulk.rb |