lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.0 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.1
- old
+ new
@@ -10,52 +10,52 @@
helpers :thread, :compat_parameters, :parser
MQTT_PORT = 1883
desc 'The address to connect to.'
- config_param :host, :string, :default => '127.0.0.1'
+ config_param :host, :string, default: '127.0.0.1'
desc 'The port to connect to.'
- config_param :port, :integer, :default => MQTT_PORT
+ config_param :port, :integer, default: MQTT_PORT
desc 'The topic to subscribe.'
- config_param :topic, :string, :default => '#'
+ config_param :topic, :string, default: '#'
desc 'The format to receive.'
- config_param :format, :string, :default => 'json'
+ config_param :format, :string, default: 'json'
desc 'Specify keep alive interval.'
- config_param :keep_alive, :integer, :default => 15
+ config_param :keep_alive, :integer, default: 15
desc 'Specify initial interval for reconnection.'
- config_param :initial_interval, :integer, :default => 1
+ config_param :initial_interval, :integer, default: 1
desc 'Specify increasing ratio of reconnection interval.'
- config_param :retry_inc_ratio, :integer, :default => 2
+ config_param :retry_inc_ratio, :integer, default: 2
# bulk_trans is deprecated
# multiple entries must be inputted as an Array
- #config_param :bulk_trans, :bool, :default => true
- #config_param :bulk_trans_sep, :string, :default => "\t"
+ #config_param :bulk_trans, :bool, default: true
+ #config_param :bulk_trans_sep, :string, default: "\t"
config_section :security, required: false, multi: false do
### User based authentication
desc 'The username for authentication'
- config_param :username, :string, :default => nil
+ config_param :username, :string, default: nil
desc 'The password for authentication'
- config_param :password, :string, :default => nil
+ config_param :password, :string, default: nil
desc 'Use TLS or not.'
- config_param :use_tls, :bool, :default => nil
+ config_param :use_tls, :bool, default: nil
config_section :tls, required: false, multi: true do
desc 'Specify TLS ca file.'
- config_param :ca_file, :string, :default => nil
+ config_param :ca_file, :string, default: nil
desc 'Specify TLS key file.'
- config_param :key_file, :string, :default => nil
+ config_param :key_file, :string, default: nil
desc 'Specify TLS cert file.'
- config_param :cert_file, :string, :default => nil
+ config_param :cert_file, :string, default: nil
end
end
config_section :monitor, required: false, multi: false do
desc 'Record received time into message or not.'
- config_param :recv_time, :bool, :default => false
+ config_param :recv_time, :bool, default: false
desc 'Specify the attribute name of received time.'
- config_param :recv_time_key, :string, :default => "recv_time"
+ config_param :recv_time_key, :string, default: "recv_time"
end
def configure(conf)
super
configure_parser(conf)
@@ -75,18 +75,18 @@
def increment_retry_interval
@retry_interval = @retry_interval * @retry_inc_ratio
end
def sleep_retry_interval(e, message)
- $log.error "#{message},#{e.class},#{e.message}"
- $log.error "Retry in #{@retry_interval} sec"
+ log.error "#{message},#{e.class},#{e.message}"
+ log.error "Retry in #{@retry_interval} sec"
sleep @retry_interval
increment_retry_interval
end
def start
- $log.debug "start mqtt #{@host}"
+ log.debug "start mqtt #{@host}"
opts = {
host: @host,
port: @port,
keep_alive: @keep_alive
}
@@ -138,23 +138,23 @@
end
def add_recv_time(record)
if @recv_time
# recv_time is recorded in ms
- record.merge({@recv_time_key => Fluent::EventTime.now})
+ record.merge({"#{@recv_time_key}": Fluent::EventTime.now})
else
record
end
end
def parse(message)
@parser.parse(message) do |time, record|
if time.nil?
- $log.debug "Since time_key field is nil, Fluent::EventTime.now is used."
+ log.debug "Since time_key field is nil, Fluent::EventTime.now is used."
time = Fluent::EventTime.now
end
- $log.debug "#{topic}, #{time}, #{add_recv_time(record)}"
+ log.debug "#{topic}, #{time}, #{add_recv_time(record)}"
return [time, add_recv_time(record)]
end
end
def emit(topic, message)
@@ -169,11 +169,11 @@
router.emit_stream(tag, mes)
else
router.emit(tag, time, record)
end
rescue Exception => e
- $log.error :error => e.to_s
- $log.debug_backtrace(e.backtrace)
+ log.error error: e.to_s
+ log.debug_backtrace(e.backtrace)
end
end
def shutdown
@get_thread.kill