Sha256: d2fb5510077a5659d95b6cefb733429d5994aea3673742052914e18e23822726

Contents?: true

Size: 944 Bytes

Versions: 1

Compression:

Stored size: 944 Bytes

Contents

require 'fluent/plugin/out_copy'

module Fluent::Plugin
  class CopyExOutput < CopyOutput
    Fluent::Plugin.register_output('copy_ex', self)

    attr_reader :ignore_errors

    def initialize
      super
      @ignore_errors = []
    end

    def configure(conf)
      super

      conf.elements.select {|e|
        e.name == 'store'
      }.each {|e|
        @ignore_errors << (e.arg == "ignore_error")
      }
    end

    def process(tag, es)
      unless es.repeatable?
        m = Fluent::MultiEventStream.new
        es.each {|time,record|
          m.add(time, record)
        }
        es = m
      end

      outputs.each.with_index do |output, idx|
        begin
          output.emit_events(tag, @deep_copy ? es.dup : es)
        rescue => e
          if @ignore_errors[idx]
            log.error :error_class => e.class, :error => e.message
          else
            raise e
          end
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-copy_ex-0.1.0 lib/fluent/plugin/out_copy_ex/v14.rb