Sha256: a34d74714f400cb3157097a2a774391718227fb87bad436859f405deaaaaaa98
Contents?: true
Size: 1.97 KB
Versions: 2
Compression:
Stored size: 1.97 KB
Contents
# frozen_string_literal: true # Copyright 2018- Zhimin (Gimi) Liang (https://github.com/Gimi) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'fluent/plugin/output' module Fluent::Plugin class JqOutput < Output Fluent::Plugin.register_output('jq', self) helpers :event_emitter desc 'The jq filter used to transform the input. The result of the filter should return an object.' config_param :jq, :string desc 'The prefix to be removed from the input tag when outputting a new record.' config_param :remove_tag_prefix, :string, default: '' def initialize super require "jq" end def configure(conf) super @jq_filter = JQ::Core.new @jq rescue JQ::Error raise Fluent::ConfigError, "Could not parse jq filter: #{@jq}, error: #{$!.message}" end def multi_workers_ready? true end def process(tag, es) new_es = Fluent::MultiEventStream.new es.each do |time, record| begin @jq_filter.update(MultiJson.dump(tag: tag, time: time, record: record), false) { |r| # the filter could return an array new_records = [MultiJson.load("[#{r}]").first] new_records.flatten! new_records.each { |new_record| new_es.add time, new_record } } rescue JQ::Error log.error "Failed to transform #{MultiJson.dump record} with #{@jq}, error: #{$!.message}" end end new_tag = tag.sub(/^#{Regexp.escape(@remove_tag_prefix)}\./, '') router.emit_stream(new_tag, new_es) end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-jq-0.4.0 | lib/fluent/plugin/out_jq.rb |
fluent-plugin-jq-0.3.0 | lib/fluent/plugin/out_jq.rb |