# -*- coding: utf-8 -*- module Fluent class QueryCombinerOutput < BufferedOutput Fluent::Plugin.register_output('query_combiner', self) config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 6379 config_param :db_index, :integer, :default => 0 config_param :redis_retry, :integer, :default => 3 config_param :redis_key_prefix, :string, :default => 'query_combiner:' config_param :query_identify, :string, :default => 'session-id' config_param :query_ttl, :integer, :default => 1800 config_param :buffer_size, :integer, :default => 100 config_param :flush_interval, :integer, :default => 60 config_param :remove_interval, :integer, :default => 10 config_param :tag, :string, :default => "query_combiner" def initialize super require 'redis' require 'msgpack' require 'json' require 'rubygems' end def configure(conf) super @host = conf.has_key?('host') ? conf['host'] : 'localhost' @port = conf.has_key?('port') ? conf['port'].to_i : 6379 @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil @query_identify = @query_identify.split(',').map { |qid| qid.strip } # Create functions for each conditions @_cond_funcs = {} @_replace_keys = {} def get_arguments(eval_str) eval_str.scan(/[\"\']?[a-zA-Z][\w\d\.\-\_]*[\"\']?/).uniq.select{|x| not (x.start_with?('\'') or x.start_with?('\"')) and \ not %w{and or xor not}.include? x } end def parse_replace_expr(element_name, condition_name, str) result = {} str.split(',').each{|cond| before, after = cond.split('=>').map{|var| var.strip} result[before] = after if not (before.length > 0 and after.length > 0) raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}" end } if result.none? raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}" end result end def create_func(var, expr) begin f_argv = get_arguments(expr) f = eval('lambda {|' + f_argv.join(',') + '| ' + expr + '}') return [f, f_argv] rescue SyntaxError raise Fluent::ConfigError, "SyntaxError at condition `#{var}`: #{expr}" end end conf.elements.select { |element| %w{catch prolong dump release}.include? element.name }.each { |element| element.each_pair { |var, expr| element.has_key?(var) # to suppress unread configuration warning if var == 'condition' formula, f_argv = create_func(var, expr) @_cond_funcs[element.name] = [f_argv, formula] elsif var == 'replace' if %w{catch dump}.include? element.name @_replace_keys[element.name] = parse_replace_expr(element.name, var, expr) else raise Fluent::ConfigError, "`replace` configuration in #{element.name}: only allowed in `catch` and `dump`" end end } } end def has_all_keys?(record, argv) argv.each {|var| if not record.has_key?(var) return false end } true end def exec_func(record, f_argv, formula) argv = [] f_argv.each {|v| argv.push(record[v]) } return formula.call(*argv) end def start super begin gem "hiredis" @redis = Redis.new( :host => @host, :port => @port, :driver => :hiredis, :thread_safe => true, :db => @db_index ) rescue LoadError @redis = Redis.new( :host => @host, :port => @port, :thread_safe => true, :db => @db_index ) end start_watch end def shutdown @redis.quit end def tryOnRedis(method, *args) tries = 0 begin @redis.send(method, *args) if @redis.respond_to? method rescue Redis::CommandError => e tries += 1 # retry 3 times retry if tries <= @redis_retry $log.warn %Q[redis command retry failed : #{method}(#{args.join(', ')})] raise e.message end end def start_watch @watcher = Thread.new(&method(:watch)) end def format(tag, time, record) [tag, time, record].to_msgpack end def do_catch(qid, record, time) # replace record keys @_replace_keys['catch'].each_pair { |before, after| record[after] = record[before] record.delete(before) } # save record tryOnRedis 'set', @redis_key_prefix + qid, JSON.dump(record) # update qid's timestamp tryOnRedis 'zadd', @redis_key_prefix, time, qid tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl end def do_prolong(qid, time) if (tryOnRedis 'exists', @redis_key_prefix + qid) # update qid's timestamp tryOnRedis 'zadd', @redis_key_prefix, time, qid tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl end end def do_dump(qid, record) if (tryOnRedis 'exists', @redis_key_prefix + qid) # replace record keys @_replace_keys['dump'].each_pair { |before, after| record[after] = record[before] record.delete(before) } # emit catched_record = JSON.load(tryOnRedis('get', @redis_key_prefix + qid)) combined_record = catched_record.merge(record) Fluent::Engine.emit @tag, Fluent::Engine.now, combined_record # remove qid do_release(qid) end end def do_release(qid) tryOnRedis 'del', @redis_key_prefix + qid tryOnRedis 'zrem', @redis_key_prefix, qid end def extract_qid(record) qid = [] @query_identify.each { |attr| if record.has_key?(attr) qid.push(record[attr]) else return nil end } qid.join(':') end def write(chunk) begin chunk.msgpack_each do |(tag, time, record)| if (qid = extract_qid record) @_cond_funcs.each_pair { |cond, argv_and_func| argv, func = argv_and_func if exec_func(record, argv, func) case cond when "catch" do_catch(qid, record, time) when "prolong" do_prolong(qid, time) when "dump" do_dump(qid, record) when "release" do_release(qid) end break # very important! end } end end end end def watch @last_checked = Fluent::Engine.now tick = @remove_interval while true sleep 0.5 if Fluent::Engine.now - @last_checked >= tick now = Fluent::Engine.now to_expire = now - @query_ttl # Delete expired qids tryOnRedis 'zremrangebyscore', @redis_key_prefix, '-inf', to_expire # Delete buffer_size over qids tryOnRedis 'zremrangebyrank', @redis_key_prefix, 0, -@buffer_size @last_checked = now end end end end end