lib/fluent/plugin/out_query_combiner.rb in fluent-plugin-querycombiner-0.0.1 vs lib/fluent/plugin/out_query_combiner.rb in fluent-plugin-querycombiner-0.0.2

- old
+ new

@@ -12,30 +12,46 @@ config_param :redis_key_prefix, :string, :default => 'query_combiner:' config_param :query_identify, :string, :default => 'session-id' config_param :query_ttl, :integer, :default => 1800 config_param :buffer_size, :integer, :default => 1000 + config_param :time_format, :string, :default => '$time' + config_param :flush_interval, :integer, :default => 60 config_param :remove_interval, :integer, :default => 10 config_param :tag, :string, :default => "query_combiner" def initialize super require 'redis' require 'msgpack' require 'json' require 'rubygems' + require 'time' end def configure(conf) super @host = conf.has_key?('host') ? conf['host'] : 'localhost' @port = conf.has_key?('port') ? conf['port'].to_i : 6379 @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil @query_identify = @query_identify.split(',').map { |qid| qid.strip } + # functions for time format + def create_time_formatter(expr) + begin + f = eval('lambda {|__arg_time__| ' + expr.gsub("$time", "__arg_time__") + '}') + return f + rescue SyntaxError + raise Fluent::ConfigError, "SyntaxError at time_format `#{expr}`" + end + end + @_time_formatter = create_time_formatter(@time_format) + + @_time_keys = {} + # Create functions for each conditions @_cond_funcs = {} @_replace_keys = { 'catch' => {}, 'dump' => {}, @@ -86,10 +102,20 @@ if %w{catch dump}.include? element.name @_replace_keys[element.name] = parse_replace_expr(element.name, var, expr) else raise Fluent::ConfigError, "`replace` configuration in #{element.name}: only allowed in `catch` and `dump`" end + + elsif var == 'time' + if %w{catch dump}.include? element.name + @_time_keys[element.name] = expr + else + raise Fluent::ConfigError, "`time` configuration in #{element.name}: only allowed in `catch` and `dump`" + end + + else + raise Fluent::ConfigError, "Unknown configuration `#{var}` in #{element.name}" end } } if not (@_cond_funcs.has_key?('catch') and @_cond_funcs.has_key?('dump')) @@ -162,10 +188,15 @@ # replace record keys @_replace_keys['catch'].each_pair { |before, after| record[after] = record[before] record.delete(before) } + # add time key if configured + if @_time_keys.has_key? 'catch' + record[@_time_keys['catch']] = @_time_formatter.call(time) + end + # save record tryOnRedis 'set', @redis_key_prefix + qid, JSON.dump(record) # update qid's timestamp tryOnRedis 'zadd', @redis_key_prefix, time, qid tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl @@ -177,18 +208,23 @@ tryOnRedis 'zadd', @redis_key_prefix, time, qid tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl end end - def do_dump(qid, record) + def do_dump(qid, record, time) if (tryOnRedis 'exists', @redis_key_prefix + qid) # replace record keys @_replace_keys['dump'].each_pair { |before, after| record[after] = record[before] record.delete(before) } + # add time key if configured + if @_time_keys.has_key? 'dump' + record[@_time_keys['dump']] = @_time_formatter.call(time) + end + # emit catched_record = JSON.load(tryOnRedis('get', @redis_key_prefix + qid)) combined_record = catched_record.merge(record) Fluent::Engine.emit @tag, Fluent::Engine.now, combined_record @@ -227,10 +263,10 @@ when "catch" do_catch(qid, record, time) when "prolong" do_prolong(qid, time) when "dump" - do_dump(qid, record) + do_dump(qid, record, time) when "release" do_release(qid) end break # very important! end