lib/fluent/plugin/in_http_cwm.rb in fluent-plugin-http-cwm-0.2.0 vs lib/fluent/plugin/in_http_cwm.rb in fluent-plugin-http-cwm-0.3.0
- old
+ new
@@ -51,11 +51,11 @@
config_param :port, :integer, default: 6379
desc 'The db to use.'
config_param :db, :integer, default: 0
- desc 'The grace period for last update.'
+ desc 'The grace period for last action update.'
config_param :grace_period, :time, default: '300s'
desc 'The flush interval to send metrics.'
config_param :flush_interval, :time, default: '300s'
@@ -69,10 +69,13 @@
def initialize
super
@redis = nil
@deployment_api_metrics = default_api_metrics_hash
+
+ @last_action_queue = Queue.new
+ @last_action_entry = []
end
def configure(conf)
super
@@ -80,22 +83,27 @@
end
def start
super
- # set up timer for flush interval
- timer_execute(:metrics_flush_timer, @redis_config.flush_interval) do
+ # start interval timer to flush api metrics
+ timer_execute(:api_metrics_flush_timer, @redis_config.flush_interval) do
flush_api_metrics
end
+ # start interval timer to flush last action entry
+ timer_execute(:last_action_flush_timer, '1s') do
+ flush_last_action
+ end
+
log.info("Starting HTTP server [#{@host}:#{@port}]...")
http_server_create_http_server(:http_server, addr: @host, port: @port, logger: log) do |server|
server.post("/#{tag}") do |req|
data = parse_data(req.body)
route(data) if update_deployment_metrics(data)
- # return HTTP 200 OK response to MinIO
+ # return HTTP 200 OK response with emtpy body
[200, { 'Content-Type' => 'text/plain' }, nil]
end
end
end
@@ -139,46 +147,68 @@
time = Fluent::Engine.now
record = { 'message' => data }
router.emit(@tag, time, record)
end
- def days_to_seconds(days)
- days * 24 * 60 * 60
+ def flush_last_action
+ if @last_action_entry.empty?
+ @last_action_entry = @last_action_queue.deq.split('|')
+ log.debug("Dequed last action entry. #{@last_action_entry}")
+ else
+ deploymentid, last_action = @last_action_entry
+ @last_action_entry = [] if update_deployment_last_action(deploymentid, last_action)
+ end
end
- def update_deployment_last_action(deploymentid)
- log.debug('Updating deployment last action')
+ def datetime_diff_in_secs(dt_begin, dt_end)
+ seconds = ((dt_end - dt_begin) * 24 * 60 * 60)
+ seconds.to_i
+ end
+ def update_deployment_last_action(deploymentid, last_action)
key = "#{@redis_config.last_update_prefix}:#{deploymentid}"
- curdt = DateTime.now
+ log.debug("Checking existing last action entry [key: #{key}]")
+ lastval = @redis.get(key)
- begin
- log.debug("checking existing last action entry [key: #{key}]")
- lastval = @redis.get(key)
+ is_grace_period_expired = false
+ if lastval
+ curdt = DateTime.now
+ lastdt = DateTime.parse(lastval, FMT_DATETIME)
+ dt_diff_secs = datetime_diff_in_secs(lastdt, curdt)
+ log.debug("Current Data/Time: #{curdt}")
+ log.debug("Previous Date/Time: #{lastdt}")
+ log.debug("Date/Time diff (s): #{dt_diff_secs}")
- is_grace_period_expired = false
- if lastval
- lastdt = DateTime.parse(lastval, FMT_DATETIME)
- dt_diff_secs = days_to_seconds((curdt - lastdt).to_f)
- if dt_diff_secs > @redis_config.grace_period
- is_grace_period_expired = true
- log.debug("grace period [#{@redis_config.grace_period}] expired [#{lastdt} => #{curdt}]")
- end
- else
- log.debug('last action entry not found. going to be set for the first time.')
+ if dt_diff_secs >= @redis_config.grace_period
+ is_grace_period_expired = true
+ log.debug("Grace period expired for last action update. [#{@redis_config.grace_period}]")
end
+ else
+ log.debug('Last action entry does not exist. It will be set for the first time.')
+ end
- if lastdt.nil? || is_grace_period_expired
- last_action = curdt.strftime(FMT_DATETIME)
- @redis.set(key, last_action)
- log.debug("Updated last action entry [#{key} => #{last_action}]")
- end
- rescue StandardError => e
- log.error("Unable to update last action! ERROR: '#{e}'.")
+ if lastdt.nil? || is_grace_period_expired
+ log.debug('Updating deployment last action')
+ last_action = DateTime.parse(last_action, FMT_DATETIME)
+ @redis.set(key, last_action)
+ log.debug("Updated last action entry [#{key} => #{last_action}]")
+ true
+ else
+ false
end
+ rescue StandardError => e
+ log.error("Unable to update last action! ERROR: '#{e}'.")
+ false
end
+ def enque_last_action_entry(deploymentid)
+ last_action = DateTime.now
+ entry = "#{deploymentid}|#{last_action}"
+ @last_action_queue.enq(entry)
+ log.debug("Enqued last action entry. [#{entry}]")
+ end
+
def validate_and_get_value(data_hash, key)
value = data_hash[key]
log.debug("missing '#{key}': #{data_hash.to_json}") unless value
value
end
@@ -189,10 +219,10 @@
log.debug('Updating deployment metrics')
deploymentid = validate_and_get_value(data, 'deploymentid')
return false unless deploymentid
- update_deployment_last_action(deploymentid)
+ enque_last_action_entry(deploymentid)
api_data = validate_and_get_value(data, 'api')
return false unless api_data
api_name = validate_and_get_value(api_data, 'name')