lib/fluent/plugin/in_redis_slowlog.rb in gitlab-fluent-plugin-redis-slowlog-0.0.2 vs lib/fluent/plugin/in_redis_slowlog.rb in gitlab-fluent-plugin-redis-slowlog-0.1.0

- old
+ new

@@ -4,25 +4,21 @@ module Fluent module Plugin class RedisSlowlogInput < Fluent::Plugin::Input Fluent::Plugin.register_input("redis_slowlog", self) + Entry = Struct.new(:id, :timestamp, :exec_time_us, :command) + config_param :tag, :string config_param :url, :string, default: nil config_param :path, :string, default: nil config_param :host, :string, default: "localhost" config_param :port, :integer, default: 6379 config_param :password, :string, default: nil config_param :logsize, :integer, default: 128 config_param :interval, :integer, default: 10 - def initialize - super - - @current_log_id = 0 - end - def configure(conf) super @redis = Redis.new( url: url, @@ -53,41 +49,51 @@ end private attr_reader :redis, :interval - attr_accessor :watcher, :watching, :current_log_id + attr_accessor :watcher, :watching def watch + # Check the current id of the slowlog, and start logging from there + current_log_id = get_slowlogs(1).first&.id || -1 + while watching sleep interval - self.current_log_id = output(current_log_id) + current_log_id = output(current_log_id) end end - def output(last_id = 0) - slowlogs = redis.slowlog("get", logsize) + def output(last_id) + slowlogs = get_slowlogs(logsize) return last_id if slowlogs.empty? - emit_slowlog(slowlogs) + # If the latest entry is smaller than what we last saw, redis was restarted + # Restart logging from the beginning. + last_id = -1 if slowlogs.first.id < last_id + emit_slowlog(slowlogs, last_id) + # Return the id of the last slowlog entry we've logged # The first entry is the one that occurred last - slowlogs.first.first + slowlogs.first.id end - def emit_slowlog(slowlogs) + def emit_slowlog(slowlogs, last_id) slowlogs.reverse_each do |log| # Don't emit logs for entries we've already logged - next if log.first <= current_log_id + next if log.id <= last_id - log_hash = { "id" => log[0], - "timestamp" => log[1].to_i, - "exec_time" => log[2], - "command" => log[3] } - router.emit(tag, Time.now.to_i, log_hash) + log_hash = { "id" => log.id, + "exec_time" => log.exec_time_us, + "command" => log.command } + router.emit(tag, Fluent::EventTime.new(log.timestamp.to_i), log_hash) end + end + + def get_slowlogs(size) + redis.slowlog("get", size).map { |slowlog_entry| Entry.new(*slowlog_entry.first(4)) } end end end end