lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-0.1.4 vs lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-1.0.0
- old
+ new
@@ -13,48 +13,61 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
+require "lumberjack/beats"
+require "lumberjack/beats/server"
+require 'concurrent/executor/cached_thread_pool'
-require 'fluent/input'
-require 'fluent/parser'
+require 'fluent/plugin/input'
+require 'fluent/plugin/parser'
+require 'fluent/time'
-module Fluent
+module Fluent::Plugin
class BeatsInput < Input
- Plugin.register_input('beats', self)
+ Fluent::Plugin.register_input('beats', self)
- def initialize
- super
+ include Fluent::TimeMixin::Parser
- require "lumberjack/beats"
- require "lumberjack/beats/server"
- require 'concurrent/executor/cached_thread_pool'
- end
+ helpers :compat_parameters, :parser, :thread
+ DEFAULT_PARSER = 'none'.freeze
+
config_param :port, :integer, :default => 5044
config_param :bind, :string, :default => '0.0.0.0'
config_param :tag, :string, :default => nil
config_param :metadata_as_tag, :bool, :default => nil
- config_param :format, :string, :default => nil
config_param :max_connections, :integer, :default => nil # CachedThreadPool can't limit the number of threads
config_param :use_ssl, :string, :default => false
config_param :ssl_certificate, :string, :default => nil
config_param :ssl_key, :string, :default => nil
config_param :ssl_key_passphrase, :string, :default => nil
+ config_section :parse do
+ config_set_default :@type, DEFAULT_PARSER
+ end
+
+ def multi_workers_ready?
+ true
+ end
+
def configure(conf)
+ compat_parameters_convert(conf, :parser)
+
super
if !@tag && !@metadata_as_tag
- raise ConfigError, "'tag' or 'metadata_as_tag' parameter is required on beats input"
+ raise Fluent::ConfigError, "'tag' or 'metadata_as_tag' parameter is required on beats input"
end
- @time_parser = Fluent::TextParser::TimeParser.new('%Y-%m-%dT%H:%M:%S.%N%z')
- if @format
- @parser = Plugin.new_parser(@format)
- @parser.configure(conf)
+ @port += fluentd_worker_id
+ @time_parser = time_parser_create(format: '%Y-%m-%dT%H:%M:%S.%N%z')
+
+ @parser_config = conf.elements('parse').first
+ if @parser_config
+ @parser = parser_create
end
@connections = []
end
def start
@@ -64,17 +77,16 @@
:address => @bind, :port => @port, :ssl => @use_ssl, :ssl_certificate => @ssl_certificate,
:ssl_key => @ssl_key, :ssl_key_passphrase => @ssl_key_passphrase)
# Lumberjack::Beats::Server depends on normal accept so we need to launch thread for each connection.
# TODO: Re-implement Beats Server with Cool.io for resource control
@thread_pool = Concurrent::CachedThreadPool.new(:idletime => 15) # idletime setting is based on logstash beats input
- @thread = Thread.new(&method(:run))
+ thread_create(:in_beats_runner, &method(:run))
end
def shutdown
@lumberjack.close rescue nil
@thread_pool.shutdown
- @thread.join
super
end
def run
@@ -86,19 +98,19 @@
@connections.reject! { |c| c.closed? }
if @connections.size >= @max_connections
conn.close # close for retry on beats side
sleep 1
next
- end
+ end
@connections << conn
end
@thread_pool.post {
begin
conn.run { |map|
tag = @metadata_as_tag ? map['@metadata']['beat'] : @tag
- if map.has_key?('message') && @format
+ if map.has_key?('message') && @parser_config
message = map.delete('message')
@parser.parse(message) { |time, record|
record['@timestamp'] = map['@timestamp']
map.each { |k, v|
record[k] = v