require "fluent/plugin/input" require "redis" module Fluent module Plugin class RedisSlowlogInput < Fluent::Plugin::Input Fluent::Plugin.register_input("redis_slowlog", self) config_param :tag, :string config_param :url, :string, default: nil config_param :path, :string, default: nil config_param :host, :string, default: "localhost" config_param :port, :integer, default: 6379 config_param :password, :string, default: nil config_param :logsize, :integer, default: 128 config_param :interval, :integer, default: 10 def initialize super @current_log_id = 0 end def configure(conf) super @redis = Redis.new( url: url, host: host, port: port, path: path, password: password ) end def start super if redis.ping != "PONG" raise Redis::CannotConnectError, "Could not connect to redis" end self.watching = true self.watcher = Thread.new(&method(:watch)) end def shutdown super self.watching = false redis.quit end private attr_reader :redis, :interval attr_accessor :watcher, :watching, :current_log_id def watch while watching sleep interval self.current_log_id = output(current_log_id) end end def output(last_id = 0) slowlogs = redis.slowlog("get", logsize) return last_id if slowlogs.empty? emit_slowlog(slowlogs) # Return the id of the last slowlog entry we've logged # The first entry is the one that occurred last slowlogs.first.first end def emit_slowlog(slowlogs) slowlogs.reverse_each do |log| # Don't emit logs for entries we've already logged next if log.first <= current_log_id log_hash = { "id" => log[0], "timestamp" => Time.at(log[1]), "exec_time" => log[2], "command" => log[3] } router.emit(tag, Time.now.to_i, log_hash) end end end end end