lib/fluent/plugin/out_query_combiner.rb in fluent-plugin-querycombiner-0.0.1 vs lib/fluent/plugin/out_query_combiner.rb in fluent-plugin-querycombiner-0.0.2
- old
+ new
@@ -12,30 +12,46 @@
config_param :redis_key_prefix, :string, :default => 'query_combiner:'
config_param :query_identify, :string, :default => 'session-id'
config_param :query_ttl, :integer, :default => 1800
config_param :buffer_size, :integer, :default => 1000
+ config_param :time_format, :string, :default => '$time'
+
config_param :flush_interval, :integer, :default => 60
config_param :remove_interval, :integer, :default => 10
config_param :tag, :string, :default => "query_combiner"
def initialize
super
require 'redis'
require 'msgpack'
require 'json'
require 'rubygems'
+ require 'time'
end
def configure(conf)
super
@host = conf.has_key?('host') ? conf['host'] : 'localhost'
@port = conf.has_key?('port') ? conf['port'].to_i : 6379
@db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil
@query_identify = @query_identify.split(',').map { |qid| qid.strip }
+ # functions for time format
+ def create_time_formatter(expr)
+ begin
+ f = eval('lambda {|__arg_time__| ' + expr.gsub("$time", "__arg_time__") + '}')
+ return f
+ rescue SyntaxError
+ raise Fluent::ConfigError, "SyntaxError at time_format `#{expr}`"
+ end
+ end
+ @_time_formatter = create_time_formatter(@time_format)
+
+ @_time_keys = {}
+
# Create functions for each conditions
@_cond_funcs = {}
@_replace_keys = {
'catch' => {},
'dump' => {},
@@ -86,10 +102,20 @@
if %w{catch dump}.include? element.name
@_replace_keys[element.name] = parse_replace_expr(element.name, var, expr)
else
raise Fluent::ConfigError, "`replace` configuration in #{element.name}: only allowed in `catch` and `dump`"
end
+
+ elsif var == 'time'
+ if %w{catch dump}.include? element.name
+ @_time_keys[element.name] = expr
+ else
+ raise Fluent::ConfigError, "`time` configuration in #{element.name}: only allowed in `catch` and `dump`"
+ end
+
+ else
+ raise Fluent::ConfigError, "Unknown configuration `#{var}` in #{element.name}"
end
}
}
if not (@_cond_funcs.has_key?('catch') and @_cond_funcs.has_key?('dump'))
@@ -162,10 +188,15 @@
# replace record keys
@_replace_keys['catch'].each_pair { |before, after|
record[after] = record[before]
record.delete(before)
}
+ # add time key if configured
+ if @_time_keys.has_key? 'catch'
+ record[@_time_keys['catch']] = @_time_formatter.call(time)
+ end
+
# save record
tryOnRedis 'set', @redis_key_prefix + qid, JSON.dump(record)
# update qid's timestamp
tryOnRedis 'zadd', @redis_key_prefix, time, qid
tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl
@@ -177,18 +208,23 @@
tryOnRedis 'zadd', @redis_key_prefix, time, qid
tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl
end
end
- def do_dump(qid, record)
+ def do_dump(qid, record, time)
if (tryOnRedis 'exists', @redis_key_prefix + qid)
# replace record keys
@_replace_keys['dump'].each_pair { |before, after|
record[after] = record[before]
record.delete(before)
}
+ # add time key if configured
+ if @_time_keys.has_key? 'dump'
+ record[@_time_keys['dump']] = @_time_formatter.call(time)
+ end
+
# emit
catched_record = JSON.load(tryOnRedis('get', @redis_key_prefix + qid))
combined_record = catched_record.merge(record)
Fluent::Engine.emit @tag, Fluent::Engine.now, combined_record
@@ -227,10 +263,10 @@
when "catch"
do_catch(qid, record, time)
when "prolong"
do_prolong(qid, time)
when "dump"
- do_dump(qid, record)
+ do_dump(qid, record, time)
when "release"
do_release(qid)
end
break # very important!
end