lib/xcflushd/flusher.rb in xcflushd-1.0.0 vs lib/xcflushd/flusher.rb in xcflushd-1.1.0

- old
+ new

@@ -1,31 +1,23 @@ require 'concurrent' -require 'xcflushd/threading' module Xcflushd class Flusher WAIT_TIME_REPORT_AUTH = 5 # in seconds private_constant :WAIT_TIME_REPORT_AUTH XcflushdError = Class.new(StandardError) - def initialize(reporter, authorizer, storage, auth_ttl, error_handler, threads) + def initialize(reporter, authorizer, storage, auth_ttl, error_handler, logger, threads) @reporter = reporter @authorizer = authorizer @storage = storage @auth_ttl = auth_ttl @error_handler = error_handler - - min_threads, max_threads = if threads - [threads.min, threads.max] - else - Threading.default_threads_value - end - - @thread_pool = Concurrent::ThreadPoolExecutor.new( - min_threads: min_threads, max_threads: max_threads) + @logger = logger + @thread_pool = Concurrent::FixedThreadPool.new(threads) end def shutdown @thread_pool.shutdown end @@ -38,29 +30,38 @@ @thread_pool.kill end # TODO: decide if we want to renew the authorizations every time. def flush - reports_to_flush = reports - report(reports_to_flush) + reports_to_flush = run_and_log_time('Getting the reports from Redis') do + reports + end + run_and_log_time('Reporting to 3scale') { report(reports_to_flush) } + # Ideally, we would like to ensure that once we start checking # authorizations, they have taken into account the reports that we just # performed. However, in 3scale, reports are asynchronous and the current # API does not provide a way to know whether a report has already been # processed. # For now, let's just wait a few seconds. This will greatly mitigate the # problem. - sleep(WAIT_TIME_REPORT_AUTH) + run_and_log_time('Giving reports some time to be processed') do + sleep(WAIT_TIME_REPORT_AUTH) + end - renew(authorizations(reports_to_flush)) + auths = run_and_log_time('Getting the auths from 3scale') do + authorizations(reports_to_flush) + end + + run_and_log_time('Renewing the auths in Redis') { renew(auths) } end private attr_reader :reporter, :authorizer, :storage, :auth_ttl, - :error_handler, :thread_pool + :error_handler, :logger, :thread_pool def reports storage.reports_to_flush end @@ -140,7 +141,13 @@ end [report, task] end.to_h end + def run_and_log_time(action, &blk) + t = Time.now + res = blk.call + logger.debug("#{action} took #{(Time.now - t).round(3)} seconds") + res + end end end