lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-0.0.8 vs lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-0.0.9

- old
+ new

@@ -1,16 +1,20 @@ -module Fluent - class OutMqtt < BufferedOutput - Plugin.register_output('mqtt', self) +require 'mqtt' +require 'msgpack' +require 'fluent/plugin/output' - include Fluent::SetTagKeyMixin +module Fluent::Plugin + class OutMqtt < Output + Fluent::Plugin.register_output('mqtt', self) + + helpers :compat_parameters, :inject, :formatter + + DEFAULT_BUFFER_TYPE = "memory" + config_set_default :include_tag_key, false - - include Fluent::SetTimeKeyMixin config_set_default :include_time_key, true - config_param :port, :integer, :default => 1883 config_param :bind, :string, :default => '127.0.0.1' config_param :topic, :string, :default => 'td-agent' config_param :format, :string, :default => 'none' config_param :client_id, :string, :default => nil @@ -20,39 +24,45 @@ config_param :ca, :string, :default => nil config_param :key, :string, :default => nil config_param :cert, :string, :default => nil config_param :retain, :string, :default => true - require 'mqtt' + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + config_set_default :chunk_keys, ['tag'] + end - unless method_defined?(:log) - define_method(:log) { $log } + config_section :inject do + config_set_default :time_key, "time" + config_set_default :time_type, "string" + config_set_default :time_format, "%Y-%m-%dT%H:%M:%S%z" end def initialize super - require 'msgpack' @clients = {} @connection_options = {} @collection_options = {:capped => false} end def configure(conf) + compat_parameters_convert(conf, :buffer, :inject, :formatter) super @bind ||= conf['bind'] @topic ||= conf['topic'] @port ||= conf['port'] - @formatter = Plugin.new_formatter(@format) - @formatter.configure(conf) + @formatter = formatter_create + if conf.has_key?('buffer_chunk_limit') + #check buffer_size + conf['buffer_chunk_limit'] = available_buffer_chunk_limit(conf) + end end def start - #check buffer_size - @buffer.buffer_chunk_limit = available_buffer_chunk_limit - $log.debug "start mqtt #{@bind}" + log.debug "start mqtt #{@bind}" opts = {host: @bind, port: @port} opts[:client_id] = @client_id if @client_id opts[:username] = @username if @username opts[:password] = @password if @password @@ -68,29 +78,39 @@ @connect.disconnect super end def format(tag, time, record) - [tag, time, record].to_msgpack + [time, record].to_msgpack end + def formatted_to_msgpack_binary + true + end + + def multi_workers_ready? + true + end + def write(chunk) - chunk.msgpack_each { |tag, time, record| + tag = chunk.metadata.tag + chunk.msgpack_each { |time, record| + record = inject_values_to_record(tag, time, record) log.debug "write #{@topic} #{@formatter.format(tag,time,record)}" @connect.publish(@topic, @formatter.format(tag,time,record), retain=@retain) } end private # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. LIMIT_MQTT = 2 * 1024 # 2048kb - def available_buffer_chunk_limit - if @buffer.buffer_chunk_limit > LIMIT_MQTT - log.warn ":buffer_chunk_limit(#{@buffer.buffer_chunk_limit}) is large. Reset :buffer_chunk_limit with #{LIMIT_MQTT}" + def available_buffer_chunk_limit(conf) + if conf['buffer_chunk_limit'] > LIMIT_MQTT + log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{LIMIT_MQTT}" LIMIT_MQTT else - @buffer.buffer_chunk_limit + conf['buffer_chunk_limit'] end end end end