lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.2 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.3

- old
+ new

@@ -28,10 +28,11 @@ config_set_default :timekey, 60 end def configure(conf) compat_parameters_convert(conf, :buffer, :inject) + @running_multi_workers = system_config.workers > 1 super if conf.has_key?('namespace') log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually." end @@ -66,16 +67,24 @@ def formatted_to_msgpack_binary true end + def multi_workers_ready? + true + end + def write(chunk) tag, time = expand_placeholders(chunk.metadata) @redis.pipelined { unless @allow_duplicate_key stream = chunk.to_msgpack_stream @unpacker.feed_each(stream).with_index { |record, index| - identifier = [tag, time].join(".") + identifier = if @running_multi_workers + [tag, time, fluentd_worker_id].join(".") + else + [tag, time].join(".") + end @redis.multi do @redis.mapped_hmset "#{identifier}.#{index}", record[2] @redis.expire "#{identifier}.#{index}", @ttl if @ttl > 0 end }