lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-0.1.1 vs lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-0.1.2

- old
+ new

@@ -15,10 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # require 'fluent/input' +require 'fluent/parser' module Fluent class BeatsInput < Input Plugin.register_input('beats', self) @@ -46,19 +47,21 @@ if !@tag && !@metadata_as_tag raise ConfigError, "'tag' or 'metadata_as_tag' parameter is required on beats input" end - @time_parser = TextParser::TimeParser.new('%Y-%m-%dT%H:%M:%S.%N%z') + @time_parser = Fluent::TextParser::TimeParser.new('%Y-%m-%dT%H:%M:%S.%N%z') if @format @parser = Plugin.new_parser(@format) @parser.configure(conf) end @connections = [] end def start + super + @lumberjack = Lumberjack::Beats::Server.new( :address => @bind, :port => @port, :ssl => @use_ssl, :ssl_certificate => @ssl_certificate, :ssl_key => @ssl_key, :ssl_key_passphrase => @ssl_key_passphrase) # Lumberjack::Beats::Server depends on normal accept so we need to launch thread for each connection. # TODO: Re-implement Beats Server with Cool.io for resource control @@ -68,9 +71,11 @@ def shutdown @lumberjack.close rescue nil @thread_pool.shutdown @thread.join + + super end def run until @lumberjack.closed? conn = @lumberjack.accept