require "fluent/plugin/input" require "redis" module Fluent module Plugin class RedisSlowlogInput < Fluent::Plugin::Input Fluent::Plugin.register_input("redis_slowlog", self) Entry = Struct.new(:id, :timestamp, :exec_time_us, :command) config_param :tag, :string config_param :url, :string, default: nil config_param :path, :string, default: nil config_param :host, :string, default: "localhost" config_param :port, :integer, default: 6379 config_param :password, :string, default: nil config_param :logsize, :integer, default: 128 config_param :interval, :integer, default: 10 def configure(conf) super @redis = Redis.new( url: url, host: host, port: port, path: path, password: password ) end def start super if redis.ping != "PONG" raise Redis::CannotConnectError, "Could not connect to redis" end self.watching = true self.watcher = Thread.new(&method(:watch)) end def shutdown super self.watching = false redis.quit end private attr_reader :redis, :interval attr_accessor :watcher, :watching def watch # Check the current id of the slowlog, and start logging from there current_log_id = get_slowlogs(1).first&.id || -1 while watching sleep interval current_log_id = output(current_log_id) end end def output(last_id) slowlogs = get_slowlogs(logsize) return last_id if slowlogs.empty? # If the latest entry is smaller than what we last saw, redis was restarted # Restart logging from the beginning. last_id = -1 if slowlogs.first.id < last_id emit_slowlog(slowlogs, last_id) # Return the id of the last slowlog entry we've logged # The first entry is the one that occurred last slowlogs.first.id end def emit_slowlog(slowlogs, last_id) slowlogs.reverse_each do |log| # Don't emit logs for entries we've already logged next if log.id <= last_id log_hash = { "id" => log.id, "time" => Time.at(log.timestamp.to_i).utc.iso8601(3), "exec_time" => log.exec_time_us, "command" => log.command } router.emit(tag, Fluent::EventTime.new(log.timestamp.to_i), log_hash) end end def get_slowlogs(size) redis.slowlog("get", size).map { |slowlog_entry| Entry.new(*slowlog_entry.first(4)) } end end end end