lib/fluent/plugin/in_http_cwm.rb in fluent-plugin-http-cwm-0.1.0 vs lib/fluent/plugin/in_http_cwm.rb in fluent-plugin-http-cwm-0.2.0
- old
+ new
@@ -48,10 +48,13 @@
config_param :host, :string, default: 'localhost'
desc 'The port of Redis server.'
config_param :port, :integer, default: 6379
+ desc 'The db to use.'
+ config_param :db, :integer, default: 0
+
desc 'The grace period for last update.'
config_param :grace_period, :time, default: '300s'
desc 'The flush interval to send metrics.'
config_param :flush_interval, :time, default: '300s'
@@ -61,41 +64,17 @@
desc 'The prefix for metrics key.'
config_param :metrics_prefix, :string, default: 'deploymentid:minio-metrics'
end
- def default_api_metrics_hash
- Hash.new do |h, k|
- h[k] = {
- 'bytes_in' => 0, 'bytes_out' => 0,
- 'num_requests_in' => 0, 'num_requests_out' => 0, 'num_requests_misc' => 0
- }
- end
- end
-
def initialize
super
@redis = nil
@deployment_api_metrics = default_api_metrics_hash
end
- def set_up_redis
- log.info("Connecting with Redis [#{@redis_config.host}:#{@redis_config.port}]")
- @redis = Redis.new(host: @redis_config.host, port: @redis_config.port)
- ready = false
- until ready
- sleep(1)
- begin
- @redis.ping
- ready = true
- rescue StandardError => e
- log.error("Unable to connect to Redis server! ERROR: '#{e}'. Retrying...")
- end
- end
- end
-
def configure(conf)
super
set_up_redis
end
@@ -110,20 +89,47 @@
log.info("Starting HTTP server [#{@host}:#{@port}]...")
http_server_create_http_server(:http_server, addr: @host, port: @port, logger: log) do |server|
server.post("/#{tag}") do |req|
data = parse_data(req.body)
- if data
- route(data) if update_deployment_metrics(data)
- end
+ route(data) if update_deployment_metrics(data)
# return HTTP 200 OK response to MinIO
[200, { 'Content-Type' => 'text/plain' }, nil]
end
end
end
+ private
+
+ def default_api_metrics_hash
+ Hash.new do |h, k|
+ h[k] = {
+ 'bytes_in' => 0, 'bytes_out' => 0,
+ 'num_requests_in' => 0, 'num_requests_out' => 0, 'num_requests_misc' => 0
+ }
+ end
+ end
+
+ def set_up_redis
+ host = @redis_config.host
+ port = @redis_config.port
+ db = @redis_config.db
+ log.info("Connecting with Redis [address: #{host}:#{port}, db: #{db}]")
+ @redis = Redis.new(host: host, port: port, db: db)
+ ready = false
+ until ready
+ sleep(1)
+ begin
+ @redis.ping
+ ready = true
+ rescue StandardError => e
+ log.error("Unable to connect to Redis server! ERROR: '#{e}'. Retrying...")
+ end
+ end
+ end
+
def parse_data(data)
JSON.parse(data)
rescue StandardError => e
log.debug("ERROR: #{e}")
nil
@@ -144,16 +150,30 @@
key = "#{@redis_config.last_update_prefix}:#{deploymentid}"
curdt = DateTime.now
begin
+ log.debug("checking existing last action entry [key: #{key}]")
lastval = @redis.get(key)
- lastdt = DateTime.parse(lastval, FMT_DATETIME) if lastval
- if lastdt.nil? || days_to_seconds((curdt - lastdt).to_i) >= @redis_config.grace_period
- log.debug('Setting last action')
- @redis.set(key, curdt.strftime(FMT_DATETIME))
+
+ is_grace_period_expired = false
+ if lastval
+ lastdt = DateTime.parse(lastval, FMT_DATETIME)
+ dt_diff_secs = days_to_seconds((curdt - lastdt).to_f)
+ if dt_diff_secs > @redis_config.grace_period
+ is_grace_period_expired = true
+ log.debug("grace period [#{@redis_config.grace_period}] expired [#{lastdt} => #{curdt}]")
+ end
+ else
+ log.debug('last action entry not found. going to be set for the first time.')
end
+
+ if lastdt.nil? || is_grace_period_expired
+ last_action = curdt.strftime(FMT_DATETIME)
+ @redis.set(key, last_action)
+ log.debug("Updated last action entry [#{key} => #{last_action}]")
+ end
rescue StandardError => e
log.error("Unable to update last action! ERROR: '#{e}'.")
end
end
@@ -162,9 +182,11 @@
log.debug("missing '#{key}': #{data_hash.to_json}") unless value
value
end
def update_deployment_metrics(data)
+ return false unless data
+
log.debug('Updating deployment metrics')
deploymentid = validate_and_get_value(data, 'deploymentid')
return false unless deploymentid