lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.2 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.3
- old
+ new
@@ -28,10 +28,11 @@
config_set_default :timekey, 60
end
def configure(conf)
compat_parameters_convert(conf, :buffer, :inject)
+ @running_multi_workers = system_config.workers > 1
super
if conf.has_key?('namespace')
log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually."
end
@@ -66,16 +67,24 @@
def formatted_to_msgpack_binary
true
end
+ def multi_workers_ready?
+ true
+ end
+
def write(chunk)
tag, time = expand_placeholders(chunk.metadata)
@redis.pipelined {
unless @allow_duplicate_key
stream = chunk.to_msgpack_stream
@unpacker.feed_each(stream).with_index { |record, index|
- identifier = [tag, time].join(".")
+ identifier = if @running_multi_workers
+ [tag, time, fluentd_worker_id].join(".")
+ else
+ [tag, time].join(".")
+ end
@redis.multi do
@redis.mapped_hmset "#{identifier}.#{index}", record[2]
@redis.expire "#{identifier}.#{index}", @ttl if @ttl > 0
end
}