lib/fluent/plugin/in_sflow.rb in fluent-plugin-sflow-0.2.1 vs lib/fluent/plugin/in_sflow.rb in fluent-plugin-sflow-0.3.0
- old
+ new
@@ -1,49 +1,34 @@
-require 'cool.io'
+require 'fluent/plugin/input'
require 'fluent/plugin/parser_sflow'
-module Fluent
+module Fluent::Plugin
class SflowInput < Input
- Plugin.register_input('sflow', self)
+ Fluent::Plugin.register_input('sflow', self)
+ helpers :server
+
config_param :bind, :string, default: '0.0.0.0'
config_param :port, :integer, default: 6343
config_param :tag, :string
config_param :max_bytes, :integer, default: 2048
def configure(conf)
super
- @parser = Fluent::TextParser::SflowParser.new
+ @parser = Fluent::Plugin::SflowParser.new
end
def start
super
- @loop = Coolio::Loop.new
- @handler = listen(method(:receive))
- @loop.attach @handler
-
- @thread = Thread.new(&method(:run))
+ server_create(:in_sflow_server, @port, proto: :udp, bind: @bind, max_bytes: @max_bytes) do |data, sock|
+ receive(data, sock.remote_host)
+ end
end
- def shutdown
- @loop.watchers.each {|w| w.detach }
- @loop.stop
- @handler.close
- @thread.join
- super
- end
- def run
- @loop.run
- rescue
- log.error 'unexpected error', error_class: $!.class, error: $!.message
- log.error_backtrace
- end
-
-
protected
def receive(raw, exporter)
log.on_debug do
log.debug 'received sflow datagram', raw: raw, exporter: exporter
@@ -58,31 +43,8 @@
router.emit(@tag, time, record)
end
rescue
log.warn 'Unexpected error on parsing',
raw: raw, exporter: exporter, error_class: $!.class, error: $!.message
- end
-
-
- private
-
- def listen(callback)
- log.info "listening sflow socket on #{@bind}:#{@port}"
- @sock = SocketUtil.create_udp_socket(@bind)
- @sock.bind @bind, @port
- UdpHandler.new @sock, callback
- end
-
- class UdpHandler < Coolio::IO
- def initialize(io, callback)
- super io
- @io = io
- @callback = callback
- end
-
- def on_readable
- msg, addr = @io.recvfrom_nonblock(4096)
- @callback.call msg, addr[3]
- end
end
end
end