lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.1.5 vs lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.2.0
- old
+ new
@@ -1,19 +1,22 @@
-require 'fluent/input'
+require 'fluent/engine'
+require 'fluent/plugin/input'
+require 'fluent/compat/parser'
-module Fluent
- class CatSweepInput < Input
- Plugin.register_input('cat_sweep', self)
+module Fluent::Plugin
+ class CatSweepInput < Fluent::Plugin::Input
+ Fluent::Plugin.register_input('cat_sweep', self)
+ helpers :compat_parameters, :parser
+
class OneLineMaxBytesOverError < StandardError
end
class FormatError < StandardError
end
config_param :file_path_with_glob, :string
- config_param :format, :string
config_param :waiting_seconds, :integer # seconds
config_param :tag, :string, :default => 'file.cat_sweep'
config_param :processing_file_suffix, :string, :default => '.processing'
config_param :error_file_suffix, :string, :default => '.error'
config_param :line_terminated_by, :string, :default => "\n"
@@ -22,26 +25,14 @@
config_param :remove_after_processing, :bool, :default => false
config_param :run_interval, :time, :default => 5
config_param :file_event_stream, :bool, :default => false
config_param :flock_with_rw_mode, :bool, :default => false
- # To support log_level option implemented by Fluentd v0.10.43
- unless method_defined?(:log)
- define_method("log") { $log }
- end
-
- # Define `router` method of v0.12 to support v0.10 or earlier
- unless method_defined?(:router)
- define_method("router") { Fluent::Engine }
- end
-
def configure(conf)
+ compat_parameters_convert(conf, :parser, :buffer, default_chunk_key: "time")
super
- # Message for users about supported fluentd versions
- supported_versions_information
-
configure_parser(conf)
if @processing_file_suffix.empty?
raise Fluent::ConfigError, "in_cat_sweep: `processing_file_suffix` must has some letters."
end
@@ -62,28 +53,32 @@
raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable."
end
else
begin
FileUtils.mkdir_p(dirname)
- rescue => e
+ rescue
raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable."
end
end
end
@read_bytes_once = 262144 # 256 KB
end
def start
+ super
+
@processing = true
@thread = Thread.new(&method(:run_periodic))
end
def shutdown
@processing = false
@thread.join
+
+ super
end
def run_periodic
while @processing
sleep @run_interval
@@ -107,59 +102,13 @@
end
private
def configure_parser(conf)
- if Plugin.respond_to?(:new_parser)
- @parser = Plugin.new_parser(@format)
- @parser.configure(conf)
- else # For supporting fluentd lower than v0.10.58
- @parser = TextParser.new
- @parser.configure(conf)
- # In lower version of fluentd than v0.10.50,
- # `Fluent::Parser#parse` does not support block based API.
- # cf. https://github.com/fluent/fluentd/blob/v0.10.49/lib/fluent/parser.rb#L270
- # On the other hand, in newer version(like v0.14) of fluentd,
- # `Fluent::Parser#parse` only supports block based API.
- # cf. https://github.com/fluent/fluentd/blob/v0.14.0.rc.3/lib/fluent/plugin/parser_tsv.rb#L33
- # So, lower version of `Fluent::Parser#parse` extends the way to call by block based API.
- @parser.extend(Module.new {
- def parse(line)
- time, record = super
- yield(time, record)
- return
- end
- })
- end
+ @parser = parser_create()
end
- def supported_versions_information
- if current_fluent_version < fluent_version('0.12.0')
- log.warn "in_cat_sweep: the support for fluentd v0.10 will end near future. Please upgrade your fluentd or fix this plugin version."
- end
- if current_fluent_version < fluent_version('0.10.58')
- log.warn "in_cat_sweep: fluentd officially supports Plugin.new_parser/Plugin.register_parser APIs from v0.10.58." \
- " The support for v0.10.58 will end near future." \
- " Please upgrade your fluentd or fix this plugin version."
- end
- if current_fluent_version < fluent_version('0.10.46')
- log.warn "in_cat_sweep: fluentd officially supports parser plugin from v0.10.46." \
- " If you use `time_key` parameter and fluentd v0.10.45, doesn't work properly." \
- " The support for v0.10.45 will end near future." \
- " Please upgrade your fluentd or fix this plugin version."
- end
- end
-
- def current_fluent_version
- parse_version_comparable(Fluent::VERSION)
- end
-
- def parse_version_comparable(v)
- Gem::Version.new(v)
- end
- alias :fluent_version :parse_version_comparable # For the readability
-
def will_process?(filename)
!(processing?(filename) or error_file?(filename) or sufficient_waiting?(filename))
end
def processing?(filename)
@@ -169,17 +118,17 @@
def error_file?(filename)
filename.end_with?(@error_file_suffix)
end
def sufficient_waiting?(filename)
- (Time.now - File.mtime(filename)).to_i < @waiting_seconds
+ (Time.at(Fluent::EventTime.now.to_r) - File.mtime(filename)).to_i < @waiting_seconds
end
def get_processing_filename(filename)
tmpfile = String.new
tmpfile << filename << '.' << Process.pid.to_s << '.'
- tmpfile << Time.now.to_i.to_s << @processing_file_suffix
+ tmpfile << Fluent::EventTime.now.to_s << @processing_file_suffix
end
def revert_processing_filename(processing_filename)
tmpfile = processing_filename.dup
tmpfile.chomp!(@processing_file_suffix)
@@ -254,10 +203,10 @@
entry = parse_line(line)
entries << entry if entry
end
end
unless entries.empty?
- es = ArrayEventStream.new(entries)
+ es = Fluent::ArrayEventStream.new(entries)
router.emit_stream(@tag, es)
end
end
def parse_line(line)