lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.0 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.1

- old
+ new

@@ -10,52 +10,52 @@ helpers :thread, :compat_parameters, :parser MQTT_PORT = 1883 desc 'The address to connect to.' - config_param :host, :string, :default => '127.0.0.1' + config_param :host, :string, default: '127.0.0.1' desc 'The port to connect to.' - config_param :port, :integer, :default => MQTT_PORT + config_param :port, :integer, default: MQTT_PORT desc 'The topic to subscribe.' - config_param :topic, :string, :default => '#' + config_param :topic, :string, default: '#' desc 'The format to receive.' - config_param :format, :string, :default => 'json' + config_param :format, :string, default: 'json' desc 'Specify keep alive interval.' - config_param :keep_alive, :integer, :default => 15 + config_param :keep_alive, :integer, default: 15 desc 'Specify initial interval for reconnection.' - config_param :initial_interval, :integer, :default => 1 + config_param :initial_interval, :integer, default: 1 desc 'Specify increasing ratio of reconnection interval.' - config_param :retry_inc_ratio, :integer, :default => 2 + config_param :retry_inc_ratio, :integer, default: 2 # bulk_trans is deprecated # multiple entries must be inputted as an Array - #config_param :bulk_trans, :bool, :default => true - #config_param :bulk_trans_sep, :string, :default => "\t" + #config_param :bulk_trans, :bool, default: true + #config_param :bulk_trans_sep, :string, default: "\t" config_section :security, required: false, multi: false do ### User based authentication desc 'The username for authentication' - config_param :username, :string, :default => nil + config_param :username, :string, default: nil desc 'The password for authentication' - config_param :password, :string, :default => nil + config_param :password, :string, default: nil desc 'Use TLS or not.' - config_param :use_tls, :bool, :default => nil + config_param :use_tls, :bool, default: nil config_section :tls, required: false, multi: true do desc 'Specify TLS ca file.' - config_param :ca_file, :string, :default => nil + config_param :ca_file, :string, default: nil desc 'Specify TLS key file.' - config_param :key_file, :string, :default => nil + config_param :key_file, :string, default: nil desc 'Specify TLS cert file.' - config_param :cert_file, :string, :default => nil + config_param :cert_file, :string, default: nil end end config_section :monitor, required: false, multi: false do desc 'Record received time into message or not.' - config_param :recv_time, :bool, :default => false + config_param :recv_time, :bool, default: false desc 'Specify the attribute name of received time.' - config_param :recv_time_key, :string, :default => "recv_time" + config_param :recv_time_key, :string, default: "recv_time" end def configure(conf) super configure_parser(conf) @@ -75,18 +75,18 @@ def increment_retry_interval @retry_interval = @retry_interval * @retry_inc_ratio end def sleep_retry_interval(e, message) - $log.error "#{message},#{e.class},#{e.message}" - $log.error "Retry in #{@retry_interval} sec" + log.error "#{message},#{e.class},#{e.message}" + log.error "Retry in #{@retry_interval} sec" sleep @retry_interval increment_retry_interval end def start - $log.debug "start mqtt #{@host}" + log.debug "start mqtt #{@host}" opts = { host: @host, port: @port, keep_alive: @keep_alive } @@ -138,23 +138,23 @@ end def add_recv_time(record) if @recv_time # recv_time is recorded in ms - record.merge({@recv_time_key => Fluent::EventTime.now}) + record.merge({"#{@recv_time_key}": Fluent::EventTime.now}) else record end end def parse(message) @parser.parse(message) do |time, record| if time.nil? - $log.debug "Since time_key field is nil, Fluent::EventTime.now is used." + log.debug "Since time_key field is nil, Fluent::EventTime.now is used." time = Fluent::EventTime.now end - $log.debug "#{topic}, #{time}, #{add_recv_time(record)}" + log.debug "#{topic}, #{time}, #{add_recv_time(record)}" return [time, add_recv_time(record)] end end def emit(topic, message) @@ -169,11 +169,11 @@ router.emit_stream(tag, mes) else router.emit(tag, time, record) end rescue Exception => e - $log.error :error => e.to_s - $log.debug_backtrace(e.backtrace) + log.error error: e.to_s + log.debug_backtrace(e.backtrace) end end def shutdown @get_thread.kill