lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-0.0.3 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-0.0.4
- old
+ new
@@ -9,53 +9,76 @@
config_set_default :include_time_key, true
config_param :port, :integer, :default => 1883
config_param :bind, :string, :default => '127.0.0.1'
config_param :topic, :string, :default => '#'
+ config_param :format, :string, :default => 'none'
+ config_param :username, :string, :default => nil
+ config_param :password, :string, :default => nil
+ config_param :ssl, :bool, :default => nil
+ config_param :ca, :string, :default => nil
+ config_param :key, :string, :default => nil
+ config_param :cert, :string, :default => nil
require 'mqtt'
def configure(conf)
super
@bind ||= conf['bind']
@topic ||= conf['topic']
@port ||= conf['port']
+
+ configure_parser(conf)
end
+ def configure_parser(conf)
+ @parser = Plugin.new_parser(@format)
+ @parser.configure(conf)
+ end
+
+ # Return [time (if not available return now), message]
+ def parse(message)
+ return @parser.parse(message)[1], @parser.parse(message)[0] || Fluent::Engine.now
+ end
+
def start
$log.debug "start mqtt #{@bind}"
- @connect = MQTT::Client.connect({remote_host: @bind, remote_port: @port})
+ opts = {host: @bind,
+ port: @port,
+ username: @username,
+ password: @password}
+ opts[:ssl] = @ssl if @ssl
+ opts[:ca_file] = @ca if @ca
+ opts[:crt_file] = @crt if @crt
+ opts[:key_file] = @key if @key
+ @connect = MQTT::Client.connect(opts)
@connect.subscribe(@topic)
@thread = Thread.new do
@connect.get do |topic,message|
topic.gsub!("/","\.")
$log.debug "#{topic}: #{message}"
- emit topic, json_parse(message)
+ begin
+ parsed_message = self.parse(message)
+ rescue Exception => e
+ $log.error e
+ end
+ emit topic, parsed_message[0], parsed_message[1]
end
end
end
- def emit topic, message , time = Fluent::Engine.now
+ def emit topic, message, time = Fluent::Engine.now
if message.class == Array
message.each do |data|
$log.debug "#{topic}: #{data}"
Fluent::Engine.emit(topic , time , data)
end
else
Fluent::Engine.emit(topic , time , message)
end
end
- def json_parse message
- begin
- y = Yajl::Parser.new
- y.parse(message)
- rescue
- $log.error "JSON parse error", :error => $!.to_s, :error_class => $!.class.to_s
- $log.warn_backtrace $!.backtrace
- end
- end
def shutdown
@thread.kill
@connect.disconnect
end
end