lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-io-0.4.0 vs lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-io-0.4.1
- old
+ new
@@ -16,10 +16,15 @@
desc 'Topic rewrite matching pattern.'
config_param :topic_rewrite_pattern, :string, default: nil
desc 'Topic rewrite replacement string.'
config_param :topic_rewrite_replacement, :string, default: nil
+ desc 'Retain option which publishing'
+ config_param :retain, :bool, default: false
+ desc 'QoS option which publishing'
+ config_param :qos, :integer, default: 1
+
config_section :format do
desc 'The format to publish'
config_param :@type, :string, default: 'single_value'
desc 'Add newline'
config_param :add_newline, :bool, default: false
@@ -34,10 +39,15 @@
config_param :time_type, :string, default: 'string'
desc 'Specify time format of send_time (e.g. %FT%T.%N%:z).'
config_param :time_format, :string, default: nil
end
+ config_section :buffer, required: false, multi: false do
+ desc 'Prefer asynchronous buffering'
+ config_param :async, :bool, default: false
+ end
+
# This method is called before starting.
# 'conf' is a Hash that includes configuration parameters.
# If the configuration is invalid, raise Fluent::ConfigError.
def configure(conf)
super
@@ -62,10 +72,14 @@
def prefer_buffered_processing
@has_buffer_section
end
+ def prefer_delayed_commit
+ @has_buffer_section && @buffer_config.async
+ end
+
# This method is called when starting.
# Open sockets or files here.
def start
super
start_proxy
@@ -73,20 +87,31 @@
# This method is called when shutting down.
# Shutdown the thread and close sockets or files here.
def shutdown
shutdown_proxy
+ kill_thread
super
end
+ def kill_thread
+ @dummy_thread.kill if !@dummy_thread.nil?
+ end
+
def after_connection
- @dummy_thread = thread_create(:out_mqtt_dummy) do
+ #@dummy_thread = thread_create(:out_mqtt_dummy) do
+ @dummy_thread = Thread.new do
Thread.stop
end
@dummy_thread
end
+ def after_disconnection
+ kill_thread
+ super
+ end
+
def current_plugin_name
:out_mqtt
end
def add_send_time(record)
@@ -97,27 +122,15 @@
record
end
end
def publish_event_stream(tag, es)
- if es.class == Fluent::OneEventStream
- es = inject_values_to_event_stream(tag, es)
- es.each do |time, record|
- log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
- rescue_disconnection do
- @client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record)))
- end
- end
- else
- es = inject_values_to_event_stream(tag, es)
- array = []
- es.each do |time, record|
- log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
- array << add_send_time(record)
- end
+ log.debug "publish_event_stream: #{es.class}"
+ es = inject_values_to_event_stream(tag, es)
+ es.each do |time, record|
rescue_disconnection do
- @client.publish(rewrite_tag(tag), @formatter.format(tag, Fluent::EventTime.now, array))
+ publish(tag, time, record)
end
end
log.flush
end
@@ -132,16 +145,35 @@
def formatted_to_msgpack_binary
true
end
+ def publish(tag, time, record)
+ log.debug "MqttOutput::#{caller_locations(1,1)[0].label}: #{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}"
+ @client.publish(
+ rewrite_tag(tag),
+ @formatter.format(tag, time, add_send_time(record)),
+ @retain,
+ @qos
+ )
+ end
+
def write(chunk)
return if chunk.empty?
chunk.each do |tag, time, record|
rescue_disconnection do
- log.debug "MqttOutput#write: #{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}"
- @client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record)))
+ publish(tag, time, record)
end
+ end
+ end
+
+ def try_write(chunk)
+ return if chunk.empty?
+ rescue_disconnection do
+ chunk.each do |tag, time, record|
+ publish(tag, time, record)
+ end
+ commit_write(chunk.unique_id)
end
end
end
end