Sha256: dc805bd60009083c17743b93cdbc56e3a554ee5e28b5d1fb86a7392ab3c7d415

Contents?: true

Size: 1.56 KB

Versions: 1

Compression:

Stored size: 1.56 KB

Contents

require 'json'

module Fluent
    module Plugin
        class JsonlArraySplitterFilter < Fluent::Plugin::Filter
            Fluent::Plugin.register_filter("jsonl_array_splitter", self)
      
            desc "Key name that contains JSONL formatted text"
            config_param :key_name, :string
            desc "Keep original key in parsed result."
            config_param :reserve_key, :bool, default: false
            desc 'Enable debug log. Default: false.'
            config_param :debug, :bool, :default => false

            def filter_stream(tag, es)
                new_es = Fluent::MultiEventStream.new
                es.each do |time, record|
                    record_lines = record[@key_name].to_s
                    parsed = []
                    record_lines.each_line do |line|
                        if @debug then
                            log.debug("Read JSON line: #{line}")
                        end
                        parsed << JSON.parse("#{line}")
                    end

                    parsed.each do |r|
                        new_record = record.clone
                        new_record.update(r)

                        if !@reserve_key then
                            new_record.delete(@key_name)
                        end
                        if @debug then
                            log.debug("New record by json line filter: #{new_record}")
                        end
                        new_es.add(time, new_record)
                    end
                end
                new_es
            end
        end
    end
end
    

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-jsonl_array_splitter-0.1.0 lib/fluent/plugin/filter_jsonl_array_splitter.rb