Sha256: 26f695f52d75d44b6965390db670d3c9e8d0a18c67dc1a1d99ecc15f3d5d5556
Contents?: true
Size: 1.84 KB
Versions: 2
Compression:
Stored size: 1.84 KB
Contents
module Fluent class TaggedUdpOutput < Output # First, register the plugin. NAME is the name of this plugin # and identifies the plugin in the configuration file. Fluent::Plugin.register_output('tagged_udp', self) config_param :host, :string, :default => '127.0.0.1' config_param :port, :integer, :default => 1883 config_param :tag_sep, :string, :default => "\t" config_param :time_key, :string, :default => 'time' config_param :time_format, :string, :default => nil require 'socket' # This method is called before starting. # 'conf' is a Hash that includes configuration parameters. # If the configuration is invalid, raise Fluent::ConfigError. def configure(conf) super # You can also refer raw parameter via conf[name]. @host ||= conf['host'] @port ||= conf['port'] @tag_sep ||= conf['tag_sep'] @socket = UDPSocket.new end def format_time(time) case @time_format when nil then # default format is integer value time when "iso8601" then # iso8601 format Time.at(time).iso8601 else # specified strftime format Time.at(time).strftime(@time_format) end end def timestamp_hash(time) if @time_key.nil? {} else {@time_key => format_time(time)} end end def emit(tag, es, chain) begin es.each {|time,record| $log.debug "#{tag}, #{format_time(time)}, #{record}" @socket.send( # tag is inserted into the head of the message "#{tag}#{@tag_sep}#{record.merge(timestamp_hash(time)).to_json}", Socket::MSG_EOR, @host, @port ) } $log.flush chain.next rescue StandardError => e $log.debug "#{e.class}: #{e.message}" end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-tagged_udp-0.0.5 | lib/fluent/plugin/out_tagged_udp.rb |
fluent-plugin-tagged_udp-0.0.4 | lib/fluent/plugin/out_tagged_udp.rb |