# -*- coding: utf-8 -*- module Fluent class OnlineuserOutput < BufferedOutput Fluent::Plugin.register_output('online_user', self) config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 6379 config_param :db_index, :integer, :default => 0 config_param :redis_retry, :integer, :default => 3 config_param :session_timeout, :integer, :default => 1800 config_param :redis_key_prefix, :string, :default => 'counter:online_user:' config_param :user_identify, :string, :default => 'uid' config_param :segment, :string, :default => nil config_param :tag, :string, :default => "online_user" config_param :silent, :bool, :default => false def initialize super require 'redis' require 'msgpack' require 'rubygems' end def configure(conf) super @host = conf.has_key?('host') ? conf['host'] : 'localhost' @port = conf.has_key?('port') ? conf['port'].to_i : 6379 @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil @user_identify = @user_identify.split '|' if @segment s = @segment.split /\s+/ case s.first when 'tag' @cal_segment = lambda { |tag, record| 'tag:' + tag } when 'capture' if s.length == 3 format = s[2] if format[0] == ?/ and format[-1] == ?/ format = format[1..-2] end regexp = Regexp.new(format) @cal_segment = lambda { |tag, record| if record[s[1]] and record[s[1]].is_a? String if c = record[s[1]].match(regexp) 'captured_' + s[1] + ':' + (c.captures.join '_') end end } else raise Fluent::ConfigError, "wrong segment capture, specify it like 'capture __FIELD_NAME__ __CAPTURE_REGEX__'" end else @cal_segment = lambda { |tag, record| if record[@segment] @segment + ':' + record[@segment] end } end end end def start super begin gem "hiredis" @redis = Redis.new( :host => @host, :port => @port, :driver => :hiredis, :thread_safe => true, :db => @db_index ) rescue LoadError @redis = Redis.new( :host => @host, :port => @port, :thread_safe => true, :db => @db_index ) end @segments = {} start_watch end def shutdown @redis.quit end def to_redis_key(segment_str) @redis_key_prefix + segment_str end def tryOnRedis(method, *args) tries = 0 begin @redis.send(method, *args) if @redis.respond_to? method rescue Redis::CommandError => e tries += 1 # retry 3 times retry if tries <= @redis_retry $log.warn %Q[redis command retry failed : #{method}(#{args.join(', ')})] raise e.message end end def start_watch @watcher = Thread.new(&method(:watch)) end def watch @last_checked = Fluent::Engine.now tick = 60 while true sleep 0.5 if Fluent::Engine.now - @last_checked >= tick now = Fluent::Engine.now @segments.each_key do |segment| user_num = user_count segment if user_num == 0 @segments.delete segment elsif not @silent Fluent::Engine.emit @tag + '.' + segment, Fluent::Engine.now, {"online_user" => user_num} end end @last_checked = now end end end def user_count(segment) to_expire = Fluent::Engine.now - @session_timeout key = to_redis_key segment tryOnRedis 'zremrangebyscore', key, '-inf', to_expire tryOnRedis 'zcard', key end def extract_uid(record) i = @user_identify.detect { |id| v = record[id] (v.is_a?(String) and !v.empty?) or (v.is_a?(Numeric) and v != 0) } i ? record[i] : nil end def format(tag, time, record) [tag, time, record].to_msgpack end def write(chunk) online_user = Hash.new { |hash, key| hash[key] = Hash.new } begin chunk.msgpack_each do |(tag, time, record)| if (uid = extract_uid record) online_user['all'][uid] = time if @cal_segment and @cal_segment != '' and segment = @cal_segment.call(tag, record) online_user[segment][uid] = time end end end ## write to redis online_user.each { |segment, users| @segments[segment] = true key = to_redis_key segment users.each { |uid, ts| tryOnRedis 'zadd', key, ts, uid } } end end end end