lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-0.0.8 vs lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-0.0.9
- old
+ new
@@ -1,16 +1,20 @@
-module Fluent
- class OutMqtt < BufferedOutput
- Plugin.register_output('mqtt', self)
+require 'mqtt'
+require 'msgpack'
+require 'fluent/plugin/output'
- include Fluent::SetTagKeyMixin
+module Fluent::Plugin
+ class OutMqtt < Output
+ Fluent::Plugin.register_output('mqtt', self)
+
+ helpers :compat_parameters, :inject, :formatter
+
+ DEFAULT_BUFFER_TYPE = "memory"
+
config_set_default :include_tag_key, false
-
- include Fluent::SetTimeKeyMixin
config_set_default :include_time_key, true
-
config_param :port, :integer, :default => 1883
config_param :bind, :string, :default => '127.0.0.1'
config_param :topic, :string, :default => 'td-agent'
config_param :format, :string, :default => 'none'
config_param :client_id, :string, :default => nil
@@ -20,39 +24,45 @@
config_param :ca, :string, :default => nil
config_param :key, :string, :default => nil
config_param :cert, :string, :default => nil
config_param :retain, :string, :default => true
- require 'mqtt'
+ config_section :buffer do
+ config_set_default :@type, DEFAULT_BUFFER_TYPE
+ config_set_default :chunk_keys, ['tag']
+ end
- unless method_defined?(:log)
- define_method(:log) { $log }
+ config_section :inject do
+ config_set_default :time_key, "time"
+ config_set_default :time_type, "string"
+ config_set_default :time_format, "%Y-%m-%dT%H:%M:%S%z"
end
def initialize
super
- require 'msgpack'
@clients = {}
@connection_options = {}
@collection_options = {:capped => false}
end
def configure(conf)
+ compat_parameters_convert(conf, :buffer, :inject, :formatter)
super
@bind ||= conf['bind']
@topic ||= conf['topic']
@port ||= conf['port']
- @formatter = Plugin.new_formatter(@format)
- @formatter.configure(conf)
+ @formatter = formatter_create
+ if conf.has_key?('buffer_chunk_limit')
+ #check buffer_size
+ conf['buffer_chunk_limit'] = available_buffer_chunk_limit(conf)
+ end
end
def start
- #check buffer_size
- @buffer.buffer_chunk_limit = available_buffer_chunk_limit
- $log.debug "start mqtt #{@bind}"
+ log.debug "start mqtt #{@bind}"
opts = {host: @bind,
port: @port}
opts[:client_id] = @client_id if @client_id
opts[:username] = @username if @username
opts[:password] = @password if @password
@@ -68,29 +78,39 @@
@connect.disconnect
super
end
def format(tag, time, record)
- [tag, time, record].to_msgpack
+ [time, record].to_msgpack
end
+ def formatted_to_msgpack_binary
+ true
+ end
+
+ def multi_workers_ready?
+ true
+ end
+
def write(chunk)
- chunk.msgpack_each { |tag, time, record|
+ tag = chunk.metadata.tag
+ chunk.msgpack_each { |time, record|
+ record = inject_values_to_record(tag, time, record)
log.debug "write #{@topic} #{@formatter.format(tag,time,record)}"
@connect.publish(@topic, @formatter.format(tag,time,record), retain=@retain)
}
end
private
# Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
LIMIT_MQTT = 2 * 1024 # 2048kb
- def available_buffer_chunk_limit
- if @buffer.buffer_chunk_limit > LIMIT_MQTT
- log.warn ":buffer_chunk_limit(#{@buffer.buffer_chunk_limit}) is large. Reset :buffer_chunk_limit with #{LIMIT_MQTT}"
+ def available_buffer_chunk_limit(conf)
+ if conf['buffer_chunk_limit'] > LIMIT_MQTT
+ log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{LIMIT_MQTT}"
LIMIT_MQTT
else
- @buffer.buffer_chunk_limit
+ conf['buffer_chunk_limit']
end
end
end
end