lib/fluent/plugin/in_sflow.rb in fluent-plugin-sflow-0.2.1 vs lib/fluent/plugin/in_sflow.rb in fluent-plugin-sflow-0.3.0

- old
+ new

@@ -1,49 +1,34 @@ -require 'cool.io' +require 'fluent/plugin/input' require 'fluent/plugin/parser_sflow' -module Fluent +module Fluent::Plugin class SflowInput < Input - Plugin.register_input('sflow', self) + Fluent::Plugin.register_input('sflow', self) + helpers :server + config_param :bind, :string, default: '0.0.0.0' config_param :port, :integer, default: 6343 config_param :tag, :string config_param :max_bytes, :integer, default: 2048 def configure(conf) super - @parser = Fluent::TextParser::SflowParser.new + @parser = Fluent::Plugin::SflowParser.new end def start super - @loop = Coolio::Loop.new - @handler = listen(method(:receive)) - @loop.attach @handler - - @thread = Thread.new(&method(:run)) + server_create(:in_sflow_server, @port, proto: :udp, bind: @bind, max_bytes: @max_bytes) do |data, sock| + receive(data, sock.remote_host) + end end - def shutdown - @loop.watchers.each {|w| w.detach } - @loop.stop - @handler.close - @thread.join - super - end - def run - @loop.run - rescue - log.error 'unexpected error', error_class: $!.class, error: $!.message - log.error_backtrace - end - - protected def receive(raw, exporter) log.on_debug do log.debug 'received sflow datagram', raw: raw, exporter: exporter @@ -58,31 +43,8 @@ router.emit(@tag, time, record) end rescue log.warn 'Unexpected error on parsing', raw: raw, exporter: exporter, error_class: $!.class, error: $!.message - end - - - private - - def listen(callback) - log.info "listening sflow socket on #{@bind}:#{@port}" - @sock = SocketUtil.create_udp_socket(@bind) - @sock.bind @bind, @port - UdpHandler.new @sock, callback - end - - class UdpHandler < Coolio::IO - def initialize(io, callback) - super io - @io = io - @callback = callback - end - - def on_readable - msg, addr = @io.recvfrom_nonblock(4096) - @callback.call msg, addr[3] - end end end end