lib/fluent/plugin/in_http_cwm.rb in fluent-plugin-http-cwm-0.1.0 vs lib/fluent/plugin/in_http_cwm.rb in fluent-plugin-http-cwm-0.2.0

- old
+ new

@@ -48,10 +48,13 @@ config_param :host, :string, default: 'localhost' desc 'The port of Redis server.' config_param :port, :integer, default: 6379 + desc 'The db to use.' + config_param :db, :integer, default: 0 + desc 'The grace period for last update.' config_param :grace_period, :time, default: '300s' desc 'The flush interval to send metrics.' config_param :flush_interval, :time, default: '300s' @@ -61,41 +64,17 @@ desc 'The prefix for metrics key.' config_param :metrics_prefix, :string, default: 'deploymentid:minio-metrics' end - def default_api_metrics_hash - Hash.new do |h, k| - h[k] = { - 'bytes_in' => 0, 'bytes_out' => 0, - 'num_requests_in' => 0, 'num_requests_out' => 0, 'num_requests_misc' => 0 - } - end - end - def initialize super @redis = nil @deployment_api_metrics = default_api_metrics_hash end - def set_up_redis - log.info("Connecting with Redis [#{@redis_config.host}:#{@redis_config.port}]") - @redis = Redis.new(host: @redis_config.host, port: @redis_config.port) - ready = false - until ready - sleep(1) - begin - @redis.ping - ready = true - rescue StandardError => e - log.error("Unable to connect to Redis server! ERROR: '#{e}'. Retrying...") - end - end - end - def configure(conf) super set_up_redis end @@ -110,20 +89,47 @@ log.info("Starting HTTP server [#{@host}:#{@port}]...") http_server_create_http_server(:http_server, addr: @host, port: @port, logger: log) do |server| server.post("/#{tag}") do |req| data = parse_data(req.body) - if data - route(data) if update_deployment_metrics(data) - end + route(data) if update_deployment_metrics(data) # return HTTP 200 OK response to MinIO [200, { 'Content-Type' => 'text/plain' }, nil] end end end + private + + def default_api_metrics_hash + Hash.new do |h, k| + h[k] = { + 'bytes_in' => 0, 'bytes_out' => 0, + 'num_requests_in' => 0, 'num_requests_out' => 0, 'num_requests_misc' => 0 + } + end + end + + def set_up_redis + host = @redis_config.host + port = @redis_config.port + db = @redis_config.db + log.info("Connecting with Redis [address: #{host}:#{port}, db: #{db}]") + @redis = Redis.new(host: host, port: port, db: db) + ready = false + until ready + sleep(1) + begin + @redis.ping + ready = true + rescue StandardError => e + log.error("Unable to connect to Redis server! ERROR: '#{e}'. Retrying...") + end + end + end + def parse_data(data) JSON.parse(data) rescue StandardError => e log.debug("ERROR: #{e}") nil @@ -144,16 +150,30 @@ key = "#{@redis_config.last_update_prefix}:#{deploymentid}" curdt = DateTime.now begin + log.debug("checking existing last action entry [key: #{key}]") lastval = @redis.get(key) - lastdt = DateTime.parse(lastval, FMT_DATETIME) if lastval - if lastdt.nil? || days_to_seconds((curdt - lastdt).to_i) >= @redis_config.grace_period - log.debug('Setting last action') - @redis.set(key, curdt.strftime(FMT_DATETIME)) + + is_grace_period_expired = false + if lastval + lastdt = DateTime.parse(lastval, FMT_DATETIME) + dt_diff_secs = days_to_seconds((curdt - lastdt).to_f) + if dt_diff_secs > @redis_config.grace_period + is_grace_period_expired = true + log.debug("grace period [#{@redis_config.grace_period}] expired [#{lastdt} => #{curdt}]") + end + else + log.debug('last action entry not found. going to be set for the first time.') end + + if lastdt.nil? || is_grace_period_expired + last_action = curdt.strftime(FMT_DATETIME) + @redis.set(key, last_action) + log.debug("Updated last action entry [#{key} => #{last_action}]") + end rescue StandardError => e log.error("Unable to update last action! ERROR: '#{e}'.") end end @@ -162,9 +182,11 @@ log.debug("missing '#{key}': #{data_hash.to_json}") unless value value end def update_deployment_metrics(data) + return false unless data + log.debug('Updating deployment metrics') deploymentid = validate_and_get_value(data, 'deploymentid') return false unless deploymentid