Sha256: 7eb4275e959f5fb248bf0d860f9d893f13eda478fcfe98cfcbba43cf75b1907d
Contents?: true
Size: 1.88 KB
Versions: 2
Compression:
Stored size: 1.88 KB
Contents
require 'cool.io' require 'fluent/plugin/parser_sflow' module Fluent class SflowInput < Input Plugin.register_input('sflow', self) config_param :bind, :string, default: '0.0.0.0' config_param :port, :integer, default: 6343 config_param :tag, :string config_param :max_bytes, :integer, default: 2048 def configure(conf) super @parser = Fluent::TextParser::SflowParser.new end def start super @loop = Coolio::Loop.new @handler = listen(method(:receive)) @loop.attach @handler @thread = Thread.new(&method(:run)) end def shutdown @loop.watchers.each {|w| w.detach } @loop.stop @handler.close @thread.join super end def run @loop.run rescue log.error 'unexpected error', error_class: $!.class, error: $!.message log.error_backtrace end protected def receive(raw, exporter) log.on_debug do log.debug 'received sflow datagram', raw: raw, exporter: exporter end @parser.parse(raw, exporter) do |time, record| if !time || !record log.warn 'Failed to parse', raw: raw, exporter: exporter return end router.emit(@tag, time, record) end rescue log.warn 'Unexpected error on parsing', raw: raw, exporter: exporter, error_class: $!.class, error: $!.message end private def listen(callback) log.info "listening sflow socket on #{@bind}:#{@port}" @sock = SocketUtil.create_udp_socket(@bind) @sock.bind @bind, @port UdpHandler.new @sock, callback end class UdpHandler < Coolio::IO def initialize(io, callback) super io @io = io @callback = callback end def on_readable msg, addr = @io.recvfrom_nonblock(4096) @callback.call msg, addr[3] end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-sflow-0.2.1 | lib/fluent/plugin/in_sflow.rb |
fluent-plugin-sflow-0.2.0 | lib/fluent/plugin/in_sflow.rb |