lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-io-0.4.0 vs lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-io-0.4.1

- old
+ new

@@ -16,10 +16,15 @@ desc 'Topic rewrite matching pattern.' config_param :topic_rewrite_pattern, :string, default: nil desc 'Topic rewrite replacement string.' config_param :topic_rewrite_replacement, :string, default: nil + desc 'Retain option which publishing' + config_param :retain, :bool, default: false + desc 'QoS option which publishing' + config_param :qos, :integer, default: 1 + config_section :format do desc 'The format to publish' config_param :@type, :string, default: 'single_value' desc 'Add newline' config_param :add_newline, :bool, default: false @@ -34,10 +39,15 @@ config_param :time_type, :string, default: 'string' desc 'Specify time format of send_time (e.g. %FT%T.%N%:z).' config_param :time_format, :string, default: nil end + config_section :buffer, required: false, multi: false do + desc 'Prefer asynchronous buffering' + config_param :async, :bool, default: false + end + # This method is called before starting. # 'conf' is a Hash that includes configuration parameters. # If the configuration is invalid, raise Fluent::ConfigError. def configure(conf) super @@ -62,10 +72,14 @@ def prefer_buffered_processing @has_buffer_section end + def prefer_delayed_commit + @has_buffer_section && @buffer_config.async + end + # This method is called when starting. # Open sockets or files here. def start super start_proxy @@ -73,20 +87,31 @@ # This method is called when shutting down. # Shutdown the thread and close sockets or files here. def shutdown shutdown_proxy + kill_thread super end + def kill_thread + @dummy_thread.kill if !@dummy_thread.nil? + end + def after_connection - @dummy_thread = thread_create(:out_mqtt_dummy) do + #@dummy_thread = thread_create(:out_mqtt_dummy) do + @dummy_thread = Thread.new do Thread.stop end @dummy_thread end + def after_disconnection + kill_thread + super + end + def current_plugin_name :out_mqtt end def add_send_time(record) @@ -97,27 +122,15 @@ record end end def publish_event_stream(tag, es) - if es.class == Fluent::OneEventStream - es = inject_values_to_event_stream(tag, es) - es.each do |time, record| - log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}" - rescue_disconnection do - @client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record))) - end - end - else - es = inject_values_to_event_stream(tag, es) - array = [] - es.each do |time, record| - log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}" - array << add_send_time(record) - end + log.debug "publish_event_stream: #{es.class}" + es = inject_values_to_event_stream(tag, es) + es.each do |time, record| rescue_disconnection do - @client.publish(rewrite_tag(tag), @formatter.format(tag, Fluent::EventTime.now, array)) + publish(tag, time, record) end end log.flush end @@ -132,16 +145,35 @@ def formatted_to_msgpack_binary true end + def publish(tag, time, record) + log.debug "MqttOutput::#{caller_locations(1,1)[0].label}: #{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}" + @client.publish( + rewrite_tag(tag), + @formatter.format(tag, time, add_send_time(record)), + @retain, + @qos + ) + end + def write(chunk) return if chunk.empty? chunk.each do |tag, time, record| rescue_disconnection do - log.debug "MqttOutput#write: #{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}" - @client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record))) + publish(tag, time, record) end + end + end + + def try_write(chunk) + return if chunk.empty? + rescue_disconnection do + chunk.each do |tag, time, record| + publish(tag, time, record) + end + commit_write(chunk.unique_id) end end end end