lib/xcflushd/flusher.rb in xcflushd-1.0.0 vs lib/xcflushd/flusher.rb in xcflushd-1.1.0
- old
+ new
@@ -1,31 +1,23 @@
require 'concurrent'
-require 'xcflushd/threading'
module Xcflushd
class Flusher
WAIT_TIME_REPORT_AUTH = 5 # in seconds
private_constant :WAIT_TIME_REPORT_AUTH
XcflushdError = Class.new(StandardError)
- def initialize(reporter, authorizer, storage, auth_ttl, error_handler, threads)
+ def initialize(reporter, authorizer, storage, auth_ttl, error_handler, logger, threads)
@reporter = reporter
@authorizer = authorizer
@storage = storage
@auth_ttl = auth_ttl
@error_handler = error_handler
-
- min_threads, max_threads = if threads
- [threads.min, threads.max]
- else
- Threading.default_threads_value
- end
-
- @thread_pool = Concurrent::ThreadPoolExecutor.new(
- min_threads: min_threads, max_threads: max_threads)
+ @logger = logger
+ @thread_pool = Concurrent::FixedThreadPool.new(threads)
end
def shutdown
@thread_pool.shutdown
end
@@ -38,29 +30,38 @@
@thread_pool.kill
end
# TODO: decide if we want to renew the authorizations every time.
def flush
- reports_to_flush = reports
- report(reports_to_flush)
+ reports_to_flush = run_and_log_time('Getting the reports from Redis') do
+ reports
+ end
+ run_and_log_time('Reporting to 3scale') { report(reports_to_flush) }
+
# Ideally, we would like to ensure that once we start checking
# authorizations, they have taken into account the reports that we just
# performed. However, in 3scale, reports are asynchronous and the current
# API does not provide a way to know whether a report has already been
# processed.
# For now, let's just wait a few seconds. This will greatly mitigate the
# problem.
- sleep(WAIT_TIME_REPORT_AUTH)
+ run_and_log_time('Giving reports some time to be processed') do
+ sleep(WAIT_TIME_REPORT_AUTH)
+ end
- renew(authorizations(reports_to_flush))
+ auths = run_and_log_time('Getting the auths from 3scale') do
+ authorizations(reports_to_flush)
+ end
+
+ run_and_log_time('Renewing the auths in Redis') { renew(auths) }
end
private
attr_reader :reporter, :authorizer, :storage, :auth_ttl,
- :error_handler, :thread_pool
+ :error_handler, :logger, :thread_pool
def reports
storage.reports_to_flush
end
@@ -140,7 +141,13 @@
end
[report, task]
end.to_h
end
+ def run_and_log_time(action, &blk)
+ t = Time.now
+ res = blk.call
+ logger.debug("#{action} took #{(Time.now - t).round(3)} seconds")
+ res
+ end
end
end