lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.1.5 vs lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.2.0

- old
+ new

@@ -1,19 +1,22 @@ -require 'fluent/input' +require 'fluent/engine' +require 'fluent/plugin/input' +require 'fluent/compat/parser' -module Fluent - class CatSweepInput < Input - Plugin.register_input('cat_sweep', self) +module Fluent::Plugin + class CatSweepInput < Fluent::Plugin::Input + Fluent::Plugin.register_input('cat_sweep', self) + helpers :compat_parameters, :parser + class OneLineMaxBytesOverError < StandardError end class FormatError < StandardError end config_param :file_path_with_glob, :string - config_param :format, :string config_param :waiting_seconds, :integer # seconds config_param :tag, :string, :default => 'file.cat_sweep' config_param :processing_file_suffix, :string, :default => '.processing' config_param :error_file_suffix, :string, :default => '.error' config_param :line_terminated_by, :string, :default => "\n" @@ -22,26 +25,14 @@ config_param :remove_after_processing, :bool, :default => false config_param :run_interval, :time, :default => 5 config_param :file_event_stream, :bool, :default => false config_param :flock_with_rw_mode, :bool, :default => false - # To support log_level option implemented by Fluentd v0.10.43 - unless method_defined?(:log) - define_method("log") { $log } - end - - # Define `router` method of v0.12 to support v0.10 or earlier - unless method_defined?(:router) - define_method("router") { Fluent::Engine } - end - def configure(conf) + compat_parameters_convert(conf, :parser, :buffer, default_chunk_key: "time") super - # Message for users about supported fluentd versions - supported_versions_information - configure_parser(conf) if @processing_file_suffix.empty? raise Fluent::ConfigError, "in_cat_sweep: `processing_file_suffix` must has some letters." end @@ -62,28 +53,32 @@ raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable." end else begin FileUtils.mkdir_p(dirname) - rescue => e + rescue raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable." end end end @read_bytes_once = 262144 # 256 KB end def start + super + @processing = true @thread = Thread.new(&method(:run_periodic)) end def shutdown @processing = false @thread.join + + super end def run_periodic while @processing sleep @run_interval @@ -107,59 +102,13 @@ end private def configure_parser(conf) - if Plugin.respond_to?(:new_parser) - @parser = Plugin.new_parser(@format) - @parser.configure(conf) - else # For supporting fluentd lower than v0.10.58 - @parser = TextParser.new - @parser.configure(conf) - # In lower version of fluentd than v0.10.50, - # `Fluent::Parser#parse` does not support block based API. - # cf. https://github.com/fluent/fluentd/blob/v0.10.49/lib/fluent/parser.rb#L270 - # On the other hand, in newer version(like v0.14) of fluentd, - # `Fluent::Parser#parse` only supports block based API. - # cf. https://github.com/fluent/fluentd/blob/v0.14.0.rc.3/lib/fluent/plugin/parser_tsv.rb#L33 - # So, lower version of `Fluent::Parser#parse` extends the way to call by block based API. - @parser.extend(Module.new { - def parse(line) - time, record = super - yield(time, record) - return - end - }) - end + @parser = parser_create() end - def supported_versions_information - if current_fluent_version < fluent_version('0.12.0') - log.warn "in_cat_sweep: the support for fluentd v0.10 will end near future. Please upgrade your fluentd or fix this plugin version." - end - if current_fluent_version < fluent_version('0.10.58') - log.warn "in_cat_sweep: fluentd officially supports Plugin.new_parser/Plugin.register_parser APIs from v0.10.58." \ - " The support for v0.10.58 will end near future." \ - " Please upgrade your fluentd or fix this plugin version." - end - if current_fluent_version < fluent_version('0.10.46') - log.warn "in_cat_sweep: fluentd officially supports parser plugin from v0.10.46." \ - " If you use `time_key` parameter and fluentd v0.10.45, doesn't work properly." \ - " The support for v0.10.45 will end near future." \ - " Please upgrade your fluentd or fix this plugin version." - end - end - - def current_fluent_version - parse_version_comparable(Fluent::VERSION) - end - - def parse_version_comparable(v) - Gem::Version.new(v) - end - alias :fluent_version :parse_version_comparable # For the readability - def will_process?(filename) !(processing?(filename) or error_file?(filename) or sufficient_waiting?(filename)) end def processing?(filename) @@ -169,17 +118,17 @@ def error_file?(filename) filename.end_with?(@error_file_suffix) end def sufficient_waiting?(filename) - (Time.now - File.mtime(filename)).to_i < @waiting_seconds + (Time.at(Fluent::EventTime.now.to_r) - File.mtime(filename)).to_i < @waiting_seconds end def get_processing_filename(filename) tmpfile = String.new tmpfile << filename << '.' << Process.pid.to_s << '.' - tmpfile << Time.now.to_i.to_s << @processing_file_suffix + tmpfile << Fluent::EventTime.now.to_s << @processing_file_suffix end def revert_processing_filename(processing_filename) tmpfile = processing_filename.dup tmpfile.chomp!(@processing_file_suffix) @@ -254,10 +203,10 @@ entry = parse_line(line) entries << entry if entry end end unless entries.empty? - es = ArrayEventStream.new(entries) + es = Fluent::ArrayEventStream.new(entries) router.emit_stream(@tag, es) end end def parse_line(line)