lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.2.3 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.0
- old
+ new
@@ -1,45 +1,75 @@
-module Fluent
+require 'fluent/plugin/input'
+require 'fluent/event'
+require 'fluent/time'
+require 'mqtt'
+
+module Fluent::Plugin
class MqttInput < Input
- Plugin.register_input('mqtt', self)
+ Fluent::Plugin.register_input('mqtt', self)
- config_param :port, :integer, :default => 1883
+ helpers :thread, :compat_parameters, :parser
+
+ MQTT_PORT = 1883
+
+ desc 'The address to connect to.'
config_param :host, :string, :default => '127.0.0.1'
+ desc 'The port to connect to.'
+ config_param :port, :integer, :default => MQTT_PORT
+ desc 'The topic to subscribe.'
config_param :topic, :string, :default => '#'
+ desc 'The format to receive.'
config_param :format, :string, :default => 'json'
- config_param :bulk_trans, :bool, :default => true
- config_param :bulk_trans_sep, :string, :default => "\t"
- config_param :username, :string, :default => nil
- config_param :password, :string, :default => nil
+ desc 'Specify keep alive interval.'
config_param :keep_alive, :integer, :default => 15
- config_param :ssl, :bool, :default => nil
- config_param :ca_file, :string, :default => nil
- config_param :key_file, :string, :default => nil
- config_param :cert_file, :string, :default => nil
- config_param :recv_time, :bool, :default => false
- config_param :recv_time_key, :string, :default => "recv_time"
+ desc 'Specify initial interval for reconnection.'
config_param :initial_interval, :integer, :default => 1
+ desc 'Specify increasing ratio of reconnection interval.'
config_param :retry_inc_ratio, :integer, :default => 2
- require 'mqtt'
+ # bulk_trans is deprecated
+ # multiple entries must be inputted as an Array
+ #config_param :bulk_trans, :bool, :default => true
+ #config_param :bulk_trans_sep, :string, :default => "\t"
- # Define `router` method of v0.12 to support v0.10 or earlier
- unless method_defined?(:router)
- define_method("router") { Fluent::Engine }
+ config_section :security, required: false, multi: false do
+ ### User based authentication
+ desc 'The username for authentication'
+ config_param :username, :string, :default => nil
+ desc 'The password for authentication'
+ config_param :password, :string, :default => nil
+ desc 'Use TLS or not.'
+ config_param :use_tls, :bool, :default => nil
+ config_section :tls, required: false, multi: true do
+ desc 'Specify TLS ca file.'
+ config_param :ca_file, :string, :default => nil
+ desc 'Specify TLS key file.'
+ config_param :key_file, :string, :default => nil
+ desc 'Specify TLS cert file.'
+ config_param :cert_file, :string, :default => nil
+ end
end
+ config_section :monitor, required: false, multi: false do
+ desc 'Record received time into message or not.'
+ config_param :recv_time, :bool, :default => false
+ desc 'Specify the attribute name of received time.'
+ config_param :recv_time_key, :string, :default => "recv_time"
+ end
+
def configure(conf)
super
configure_parser(conf)
init_retry_interval
end
def configure_parser(conf)
- @parser = Plugin.new_parser(conf['format'])
- @parser.configure(conf)
+ compat_parameters_convert(conf, :parser)
+ parser_config = conf.elements('parse').first
+ @parser = parser_create(conf: parser_config)
end
-
+
def init_retry_interval
@retry_interval = @initial_interval
end
def increment_retry_interval
@@ -56,82 +86,90 @@
def start
$log.debug "start mqtt #{@host}"
opts = {
host: @host,
port: @port,
- username: @username,
- password: @password,
keep_alive: @keep_alive
}
- opts[:ssl] = @ssl if @ssl
- opts[:ca_file] = @ca_file if @ca_file
- opts[:cert_file] = @cert_file if @cert_file
- opts[:key_file] = @key_file if @key_file
+ opts[:username] = @security.username if @security.respond_to?(:username)
+ opts[:password] = @security.password if @security.respond_to?(:password)
+ if @security.respond_to?(:use_tls) && @security.use_tls
+ opts[:ssl] = @security.use_tls
+ opts[:ca_file] = @security.tls.ca_file
+ opts[:cert_file] = @security.tls.cert_file
+ opts[:key_file] = @security.tls.key_file
+ end
# In order to handle Exception raised from reading Thread
# in MQTT::Client caused by network disconnection (during read_byte),
# @connect_thread generates connection.
@client = MQTT::Client.new(opts)
- @connect_thread = Thread.new do
- while (true)
- begin
- @client.disconnect if @client.connected?
- @client.connect
- @client.subscribe(@topic)
- @get_thread.kill if !@get_thread.nil? && @get_thread.alive?
- @get_thread = Thread.new do
- @client.get do |topic, message|
- emit(topic, message)
- end
+ @get_thread = nil
+ @connect_thread = Thread.new(&method(:connect_loop))
+ end
+
+ def connect_loop
+ while (true)
+ begin
+ @get_thread.kill if !@get_thread.nil? && @get_thread.alive?
+ @client.disconnect if @client.connected?
+ @client.connect
+ @client.subscribe(@topic)
+ @get_thread = Thread.new do
+ @client.get do |topic, message|
+ emit(topic, message)
end
- init_retry_interval
- sleep
- rescue MQTT::ProtocolException => e
- sleep_retry_interval(e, "Protocol error occurs.")
- next
- rescue Timeout::Error => e
- sleep_retry_interval(e, "Timeout error occurs.")
- next
- rescue SystemCallError => e
- sleep_retry_interval(e, "System call error occurs.")
- next
- rescue StandardError=> e
- sleep_retry_interval(e, "The other error occurs.")
- next
end
+ init_retry_interval
+ sleep
+ rescue MQTT::ProtocolException => e
+ sleep_retry_interval(e, "Protocol error occurs.")
+ next
+ rescue Timeout::Error => e
+ sleep_retry_interval(e, "Timeout error occurs.")
+ next
+ rescue SystemCallError => e
+ sleep_retry_interval(e, "System call error occurs.")
+ next
+ rescue StandardError=> e
+ sleep_retry_interval(e, "The other error occurs.")
+ next
end
end
end
def add_recv_time(record)
if @recv_time
# recv_time is recorded in ms
- record.merge({@recv_time_key => Time.now.instance_eval { self.to_i * 1000 + (usec/1000) }})
+ record.merge({@recv_time_key => Fluent::EventTime.now})
else
record
end
end
def parse(message)
@parser.parse(message) do |time, record|
if time.nil?
- $log.debug "Since time_key field is nil, Fluent::Engine.now is used."
- time = Fluent::Engine.now
+ $log.debug "Since time_key field is nil, Fluent::EventTime.now is used."
+ time = Fluent::EventTime.now
end
$log.debug "#{topic}, #{time}, #{add_recv_time(record)}"
return [time, add_recv_time(record)]
end
end
def emit(topic, message)
begin
- topic.gsub!("/","\.")
- if @bulk_trans
- message.split(@bulk_trans_sep).each do |m|
- router.emit(topic, *parse(m))
+ tag = topic.gsub("/","\.")
+ time, record = parse(message)
+ if record.is_a?(Array)
+ mes = Fluent::MultiEventStream.new
+ record.each do |single_record|
+ mes.add(@parser.parse_time(single_record), single_record)
end
+ router.emit_stream(tag, mes)
else
- router.emit(topic, *parse(message))
+ router.emit(tag, time, record)
end
rescue Exception => e
$log.error :error => e.to_s
$log.debug_backtrace(e.backtrace)
end