lib/fluent/plugin/in_redis_slowlog.rb in gitlab-fluent-plugin-redis-slowlog-0.0.2 vs lib/fluent/plugin/in_redis_slowlog.rb in gitlab-fluent-plugin-redis-slowlog-0.1.0
- old
+ new
@@ -4,25 +4,21 @@
module Fluent
module Plugin
class RedisSlowlogInput < Fluent::Plugin::Input
Fluent::Plugin.register_input("redis_slowlog", self)
+ Entry = Struct.new(:id, :timestamp, :exec_time_us, :command)
+
config_param :tag, :string
config_param :url, :string, default: nil
config_param :path, :string, default: nil
config_param :host, :string, default: "localhost"
config_param :port, :integer, default: 6379
config_param :password, :string, default: nil
config_param :logsize, :integer, default: 128
config_param :interval, :integer, default: 10
- def initialize
- super
-
- @current_log_id = 0
- end
-
def configure(conf)
super
@redis = Redis.new(
url: url,
@@ -53,41 +49,51 @@
end
private
attr_reader :redis, :interval
- attr_accessor :watcher, :watching, :current_log_id
+ attr_accessor :watcher, :watching
def watch
+ # Check the current id of the slowlog, and start logging from there
+ current_log_id = get_slowlogs(1).first&.id || -1
+
while watching
sleep interval
- self.current_log_id = output(current_log_id)
+ current_log_id = output(current_log_id)
end
end
- def output(last_id = 0)
- slowlogs = redis.slowlog("get", logsize)
+ def output(last_id)
+ slowlogs = get_slowlogs(logsize)
return last_id if slowlogs.empty?
- emit_slowlog(slowlogs)
+ # If the latest entry is smaller than what we last saw, redis was restarted
+ # Restart logging from the beginning.
+ last_id = -1 if slowlogs.first.id < last_id
+ emit_slowlog(slowlogs, last_id)
+
# Return the id of the last slowlog entry we've logged
# The first entry is the one that occurred last
- slowlogs.first.first
+ slowlogs.first.id
end
- def emit_slowlog(slowlogs)
+ def emit_slowlog(slowlogs, last_id)
slowlogs.reverse_each do |log|
# Don't emit logs for entries we've already logged
- next if log.first <= current_log_id
+ next if log.id <= last_id
- log_hash = { "id" => log[0],
- "timestamp" => log[1].to_i,
- "exec_time" => log[2],
- "command" => log[3] }
- router.emit(tag, Time.now.to_i, log_hash)
+ log_hash = { "id" => log.id,
+ "exec_time" => log.exec_time_us,
+ "command" => log.command }
+ router.emit(tag, Fluent::EventTime.new(log.timestamp.to_i), log_hash)
end
+ end
+
+ def get_slowlogs(size)
+ redis.slowlog("get", size).map { |slowlog_entry| Entry.new(*slowlog_entry.first(4)) }
end
end
end
end