lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-0.0.7 vs lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-0.0.8
- old
+ new
@@ -11,16 +11,18 @@
config_param :port, :integer, :default => 1883
config_param :bind, :string, :default => '127.0.0.1'
config_param :topic, :string, :default => 'td-agent'
config_param :format, :string, :default => 'none'
+ config_param :client_id, :string, :default => nil
config_param :username, :string, :default => nil
config_param :password, :string, :default => nil
config_param :ssl, :bool, :default => nil
config_param :ca, :string, :default => nil
config_param :key, :string, :default => nil
config_param :cert, :string, :default => nil
+ config_param :retain, :string, :default => true
require 'mqtt'
unless method_defined?(:log)
define_method(:log) { $log }
@@ -36,21 +38,24 @@
end
def configure(conf)
super
@bind ||= conf['bind']
- @topic ||= conf['topic']
- @port ||= conf['port']
+ @topic ||= conf['topic']
+ @port ||= conf['port']
+ @formatter = Plugin.new_formatter(@format)
+ @formatter.configure(conf)
end
def start
#check buffer_size
@buffer.buffer_chunk_limit = available_buffer_chunk_limit
$log.debug "start mqtt #{@bind}"
opts = {host: @bind,
port: @port}
+ opts[:client_id] = @client_id if @client_id
opts[:username] = @username if @username
opts[:password] = @password if @password
opts[:ssl] = @ssl if @ssl
opts[:ca_file] = @ca if @ca
opts[:cert_file] = @cert if @cert
@@ -67,12 +72,12 @@
def format(tag, time, record)
[tag, time, record].to_msgpack
end
def write(chunk)
- $log.debug "write"
chunk.msgpack_each { |tag, time, record|
- @connect.publish(tag, record , retain=true)
+ log.debug "write #{@topic} #{@formatter.format(tag,time,record)}"
+ @connect.publish(@topic, @formatter.format(tag,time,record), retain=@retain)
}
end
private
# Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.