lib/fluent/plugin/in_redis_slowlog.rb in gitlab-fluent-plugin-redis-slowlog-0.1.1 vs lib/fluent/plugin/in_redis_slowlog.rb in gitlab-fluent-plugin-redis-slowlog-0.1.2
- old
+ new
@@ -2,10 +2,12 @@
require "redis"
module Fluent
module Plugin
class RedisSlowlogInput < Fluent::Plugin::Input
+ helpers :thread
+
Fluent::Plugin.register_input("redis_slowlog", self)
Entry = Struct.new(:id, :timestamp, :exec_time_us, :command)
config_param :tag, :string
@@ -14,20 +16,22 @@
config_param :host, :string, default: "localhost"
config_param :port, :integer, default: 6379
config_param :password, :string, default: nil
config_param :logsize, :integer, default: 128
config_param :interval, :integer, default: 10
+ config_param :timeout, :integer, default: 2
def configure(conf)
super
@redis = Redis.new(
url: url,
host: host,
port: port,
path: path,
- password: password
+ password: password,
+ timeout: timeout
)
end
def start
super
@@ -36,11 +40,11 @@
raise Redis::CannotConnectError,
"Could not connect to redis"
end
self.watching = true
- self.watcher = Thread.new(&method(:watch))
+ self.watcher = thread_create(:redis_slowlog_watcher, &method(:watch))
end
def shutdown
super
@@ -55,13 +59,20 @@
def watch
# Check the current id of the slowlog, and start logging from there
current_log_id = get_slowlogs(1).first&.id || -1
- while watching
- sleep interval
+ begin
+ while watching
+ sleep interval
- current_log_id = output(current_log_id)
+ current_log_id = output(current_log_id)
+ end
+ rescue Redis::BaseError => e
+ msg = "Error fetching slowlogs: #{e.inspect}"
+ log.error(msg)
+ router.emit("#{tag}.error", Fluent::EventTime.new(Time.now.to_i), { "message" => msg })
+ retry
end
end
def output(last_id)
slowlogs = get_slowlogs(logsize)