lib/fluent/plugin/in_sflow.rb in fluent-plugin-sflow-0.1.0 vs lib/fluent/plugin/in_sflow.rb in fluent-plugin-sflow-0.2.0

- old
+ new

@@ -1,45 +1,88 @@ -require 'fluent/plugin/input' +require 'cool.io' +require 'fluent/plugin/parser_sflow' -require 'bindata' -require 'eventmachine' -require 'yaml' -dir = 'sflow/lib/sflow' -['models/ipv4header', 'models/tcpheader', 'models/udpheader', 'models/protocol', 'models/binary_models','parsers/parsers'].each do |req| - require File.join(dir, req) -end - -#$:.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'sflow', 'lib')) -#require 'sflow' - -module Fluent::Plugin +module Fluent class SflowInput < Input - Fluent::Plugin.register_input("sflow", self) + Plugin.register_input('sflow', self) - helpers :server - config_param :bind, :string, default: '0.0.0.0' config_param :port, :integer, default: 6343 config_param :tag, :string + config_param :max_bytes, :integer, default: 2048 + def configure(conf) super - - # dummy data - $switch_hash = {} + @parser = Fluent::TextParser::SflowParser.new end def start super + @loop = Coolio::Loop.new + @handler = listen(method(:receive)) + @loop.attach @handler - server_create(:in_sflow_server, @port, bind: @bind, proto: :udp, max_bytes: 2048) do |data, sock| - sflow = SflowParser.parse_packet(data) - router.emit(@tag, Fluent::EventTime.now, sflow) - end + @thread = Thread.new(&method(:run)) end def shutdown + @loop.watchers.each {|w| w.detach } + @loop.stop + @handler.close + @thread.join super + end + + def run + @loop.run + rescue + log.error 'unexpected error', error_class: $!.class, error: $!.message + log.error_backtrace + end + + + protected + + def receive(raw, exporter) + log.on_debug do + log.debug 'received sflow datagram', raw: raw, exporter: exporter + end + + @parser.parse(raw, exporter) do |time, record| + if !time || !record + log.warn 'Failed to parse', raw: raw, exporter: exporter + return + end + + router.emit(@tag, time, record) + end + rescue + log.warn 'Unexpected error on parsing', + raw: raw, exporter: exporter, error_class: $!.class, error: $!.message + end + + + private + + def listen(callback) + log.info "listening sflow socket on #{@bind}:#{@port}" + @sock = SocketUtil.create_udp_socket(@bind) + @sock.bind @bind, @port + UdpHandler.new @sock, callback + end + + class UdpHandler < Coolio::IO + def initialize(io, callback) + super io + @io = io + @callback = callback + end + + def on_readable + msg, addr = @io.recvfrom_nonblock(4096) + @callback.call msg, addr[3] + end end end end