lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-0.0.7 vs lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-0.0.8

- old
+ new

@@ -11,16 +11,18 @@ config_param :port, :integer, :default => 1883 config_param :bind, :string, :default => '127.0.0.1' config_param :topic, :string, :default => 'td-agent' config_param :format, :string, :default => 'none' + config_param :client_id, :string, :default => nil config_param :username, :string, :default => nil config_param :password, :string, :default => nil config_param :ssl, :bool, :default => nil config_param :ca, :string, :default => nil config_param :key, :string, :default => nil config_param :cert, :string, :default => nil + config_param :retain, :string, :default => true require 'mqtt' unless method_defined?(:log) define_method(:log) { $log } @@ -36,21 +38,24 @@ end def configure(conf) super @bind ||= conf['bind'] - @topic ||= conf['topic'] - @port ||= conf['port'] + @topic ||= conf['topic'] + @port ||= conf['port'] + @formatter = Plugin.new_formatter(@format) + @formatter.configure(conf) end def start #check buffer_size @buffer.buffer_chunk_limit = available_buffer_chunk_limit $log.debug "start mqtt #{@bind}" opts = {host: @bind, port: @port} + opts[:client_id] = @client_id if @client_id opts[:username] = @username if @username opts[:password] = @password if @password opts[:ssl] = @ssl if @ssl opts[:ca_file] = @ca if @ca opts[:cert_file] = @cert if @cert @@ -67,12 +72,12 @@ def format(tag, time, record) [tag, time, record].to_msgpack end def write(chunk) - $log.debug "write" chunk.msgpack_each { |tag, time, record| - @connect.publish(tag, record , retain=true) + log.debug "write #{@topic} #{@formatter.format(tag,time,record)}" + @connect.publish(@topic, @formatter.format(tag,time,record), retain=@retain) } end private # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.