# # Fluentd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'fluent/plugin/base' require 'fluent/event' require 'fluent/log' require 'fluent/plugin_id' require 'fluent/plugin_helper' module Fluent module Plugin class Filter < Base include PluginId include PluginLoggerMixin include PluginHelper::Mixin helpers_internal :event_emitter, :metrics attr_reader :has_filter_with_time def initialize super @has_filter_with_time = has_filter_with_time? @emit_records_metrics = nil @emit_size_metrics = nil @counter_mutex = Mutex.new @enable_size_metrics = false end def emit_records @emit_records_metrics.get end def emit_size @emit_size_metrics.get end def configure(conf) super @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records") @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events") @enable_size_metrics = !!system_config.enable_size_metrics end def statistics stats = { 'emit_records' => @emit_records_metrics.get, 'emit_size' => @emit_size_metrics.get, } { 'filter' => stats } end def measure_metrics(es) @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end def filter(tag, time, record) raise NotImplementedError, "BUG: filter plugins MUST implement this method" end def filter_with_time(tag, time, record) raise NotImplementedError, "BUG: filter plugins MUST implement this method" end def filter_stream(tag, es) new_es = MultiEventStream.new if @has_filter_with_time es.each do |time, record| begin filtered_time, filtered_record = filter_with_time(tag, time, record) new_es.add(filtered_time, filtered_record) if filtered_time && filtered_record rescue => e router.emit_error_event(tag, time, record, e) end end else es.each do |time, record| begin filtered_record = filter(tag, time, record) new_es.add(time, filtered_record) if filtered_record rescue => e router.emit_error_event(tag, time, record, e) end end end new_es end private def has_filter_with_time? implmented_methods = self.class.instance_methods(false) # Plugins that override `filter_stream` don't need check, # because they may not call `filter` or `filter_with_time` # for example fluentd/lib/fluent/plugin/filter_record_transformer.rb return nil if implmented_methods.include?(:filter_stream) case when [:filter, :filter_with_time].all? { |e| implmented_methods.include?(e) } raise "BUG: Filter plugins MUST implement either `filter` or `filter_with_time`" when implmented_methods.include?(:filter) false when implmented_methods.include?(:filter_with_time) true else raise NotImplementedError, "BUG: Filter plugins MUST implement either `filter` or `filter_with_time`" end end end end end