lib/fluent/plugin/in_http_cwm.rb in fluent-plugin-http-cwm-0.2.0 vs lib/fluent/plugin/in_http_cwm.rb in fluent-plugin-http-cwm-0.3.0

- old
+ new

@@ -51,11 +51,11 @@ config_param :port, :integer, default: 6379 desc 'The db to use.' config_param :db, :integer, default: 0 - desc 'The grace period for last update.' + desc 'The grace period for last action update.' config_param :grace_period, :time, default: '300s' desc 'The flush interval to send metrics.' config_param :flush_interval, :time, default: '300s' @@ -69,10 +69,13 @@ def initialize super @redis = nil @deployment_api_metrics = default_api_metrics_hash + + @last_action_queue = Queue.new + @last_action_entry = [] end def configure(conf) super @@ -80,22 +83,27 @@ end def start super - # set up timer for flush interval - timer_execute(:metrics_flush_timer, @redis_config.flush_interval) do + # start interval timer to flush api metrics + timer_execute(:api_metrics_flush_timer, @redis_config.flush_interval) do flush_api_metrics end + # start interval timer to flush last action entry + timer_execute(:last_action_flush_timer, '1s') do + flush_last_action + end + log.info("Starting HTTP server [#{@host}:#{@port}]...") http_server_create_http_server(:http_server, addr: @host, port: @port, logger: log) do |server| server.post("/#{tag}") do |req| data = parse_data(req.body) route(data) if update_deployment_metrics(data) - # return HTTP 200 OK response to MinIO + # return HTTP 200 OK response with emtpy body [200, { 'Content-Type' => 'text/plain' }, nil] end end end @@ -139,46 +147,68 @@ time = Fluent::Engine.now record = { 'message' => data } router.emit(@tag, time, record) end - def days_to_seconds(days) - days * 24 * 60 * 60 + def flush_last_action + if @last_action_entry.empty? + @last_action_entry = @last_action_queue.deq.split('|') + log.debug("Dequed last action entry. #{@last_action_entry}") + else + deploymentid, last_action = @last_action_entry + @last_action_entry = [] if update_deployment_last_action(deploymentid, last_action) + end end - def update_deployment_last_action(deploymentid) - log.debug('Updating deployment last action') + def datetime_diff_in_secs(dt_begin, dt_end) + seconds = ((dt_end - dt_begin) * 24 * 60 * 60) + seconds.to_i + end + def update_deployment_last_action(deploymentid, last_action) key = "#{@redis_config.last_update_prefix}:#{deploymentid}" - curdt = DateTime.now + log.debug("Checking existing last action entry [key: #{key}]") + lastval = @redis.get(key) - begin - log.debug("checking existing last action entry [key: #{key}]") - lastval = @redis.get(key) + is_grace_period_expired = false + if lastval + curdt = DateTime.now + lastdt = DateTime.parse(lastval, FMT_DATETIME) + dt_diff_secs = datetime_diff_in_secs(lastdt, curdt) + log.debug("Current Data/Time: #{curdt}") + log.debug("Previous Date/Time: #{lastdt}") + log.debug("Date/Time diff (s): #{dt_diff_secs}") - is_grace_period_expired = false - if lastval - lastdt = DateTime.parse(lastval, FMT_DATETIME) - dt_diff_secs = days_to_seconds((curdt - lastdt).to_f) - if dt_diff_secs > @redis_config.grace_period - is_grace_period_expired = true - log.debug("grace period [#{@redis_config.grace_period}] expired [#{lastdt} => #{curdt}]") - end - else - log.debug('last action entry not found. going to be set for the first time.') + if dt_diff_secs >= @redis_config.grace_period + is_grace_period_expired = true + log.debug("Grace period expired for last action update. [#{@redis_config.grace_period}]") end + else + log.debug('Last action entry does not exist. It will be set for the first time.') + end - if lastdt.nil? || is_grace_period_expired - last_action = curdt.strftime(FMT_DATETIME) - @redis.set(key, last_action) - log.debug("Updated last action entry [#{key} => #{last_action}]") - end - rescue StandardError => e - log.error("Unable to update last action! ERROR: '#{e}'.") + if lastdt.nil? || is_grace_period_expired + log.debug('Updating deployment last action') + last_action = DateTime.parse(last_action, FMT_DATETIME) + @redis.set(key, last_action) + log.debug("Updated last action entry [#{key} => #{last_action}]") + true + else + false end + rescue StandardError => e + log.error("Unable to update last action! ERROR: '#{e}'.") + false end + def enque_last_action_entry(deploymentid) + last_action = DateTime.now + entry = "#{deploymentid}|#{last_action}" + @last_action_queue.enq(entry) + log.debug("Enqued last action entry. [#{entry}]") + end + def validate_and_get_value(data_hash, key) value = data_hash[key] log.debug("missing '#{key}': #{data_hash.to_json}") unless value value end @@ -189,10 +219,10 @@ log.debug('Updating deployment metrics') deploymentid = validate_and_get_value(data, 'deploymentid') return false unless deploymentid - update_deployment_last_action(deploymentid) + enque_last_action_entry(deploymentid) api_data = validate_and_get_value(data, 'api') return false unless api_data api_name = validate_and_get_value(api_data, 'name')