lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-0.1.4 vs lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-1.0.0

- old
+ new

@@ -13,48 +13,61 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # +require "lumberjack/beats" +require "lumberjack/beats/server" +require 'concurrent/executor/cached_thread_pool' -require 'fluent/input' -require 'fluent/parser' +require 'fluent/plugin/input' +require 'fluent/plugin/parser' +require 'fluent/time' -module Fluent +module Fluent::Plugin class BeatsInput < Input - Plugin.register_input('beats', self) + Fluent::Plugin.register_input('beats', self) - def initialize - super + include Fluent::TimeMixin::Parser - require "lumberjack/beats" - require "lumberjack/beats/server" - require 'concurrent/executor/cached_thread_pool' - end + helpers :compat_parameters, :parser, :thread + DEFAULT_PARSER = 'none'.freeze + config_param :port, :integer, :default => 5044 config_param :bind, :string, :default => '0.0.0.0' config_param :tag, :string, :default => nil config_param :metadata_as_tag, :bool, :default => nil - config_param :format, :string, :default => nil config_param :max_connections, :integer, :default => nil # CachedThreadPool can't limit the number of threads config_param :use_ssl, :string, :default => false config_param :ssl_certificate, :string, :default => nil config_param :ssl_key, :string, :default => nil config_param :ssl_key_passphrase, :string, :default => nil + config_section :parse do + config_set_default :@type, DEFAULT_PARSER + end + + def multi_workers_ready? + true + end + def configure(conf) + compat_parameters_convert(conf, :parser) + super if !@tag && !@metadata_as_tag - raise ConfigError, "'tag' or 'metadata_as_tag' parameter is required on beats input" + raise Fluent::ConfigError, "'tag' or 'metadata_as_tag' parameter is required on beats input" end - @time_parser = Fluent::TextParser::TimeParser.new('%Y-%m-%dT%H:%M:%S.%N%z') - if @format - @parser = Plugin.new_parser(@format) - @parser.configure(conf) + @port += fluentd_worker_id + @time_parser = time_parser_create(format: '%Y-%m-%dT%H:%M:%S.%N%z') + + @parser_config = conf.elements('parse').first + if @parser_config + @parser = parser_create end @connections = [] end def start @@ -64,17 +77,16 @@ :address => @bind, :port => @port, :ssl => @use_ssl, :ssl_certificate => @ssl_certificate, :ssl_key => @ssl_key, :ssl_key_passphrase => @ssl_key_passphrase) # Lumberjack::Beats::Server depends on normal accept so we need to launch thread for each connection. # TODO: Re-implement Beats Server with Cool.io for resource control @thread_pool = Concurrent::CachedThreadPool.new(:idletime => 15) # idletime setting is based on logstash beats input - @thread = Thread.new(&method(:run)) + thread_create(:in_beats_runner, &method(:run)) end def shutdown @lumberjack.close rescue nil @thread_pool.shutdown - @thread.join super end def run @@ -86,19 +98,19 @@ @connections.reject! { |c| c.closed? } if @connections.size >= @max_connections conn.close # close for retry on beats side sleep 1 next - end + end @connections << conn end @thread_pool.post { begin conn.run { |map| tag = @metadata_as_tag ? map['@metadata']['beat'] : @tag - if map.has_key?('message') && @format + if map.has_key?('message') && @parser_config message = map.delete('message') @parser.parse(message) { |time, record| record['@timestamp'] = map['@timestamp'] map.each { |k, v| record[k] = v