lib/xcflushd/priority_auth_renewer.rb in xcflushd-1.0.0 vs lib/xcflushd/priority_auth_renewer.rb in xcflushd-1.1.0

- old
+ new

@@ -1,7 +1,5 @@ -require 'xcflushd/threading' - module Xcflushd # Apart from flushing all the cached reports and renewing the authorizations # periodically, we need to provide a mechanism to renew a specific auth at # any time. The information needed is the combination of service, application # credentials and metric. @@ -20,13 +18,12 @@ # combination, a channel of this type is created. The client is # subscribed to this channel, and xcflushd will publish the authorization # status once it gets it from 3scale. class PriorityAuthRenewer - # Number of times that a response is published - TIMES_TO_PUBLISH = 5 - private_constant :TIMES_TO_PUBLISH + PUBLISH_WAIT_TIMES = [0.002, 0.005, 0.005, 0.01, 0.015].freeze + private_constant :PUBLISH_WAIT_TIMES # We need two separate Redis clients: one for subscribing to a channel and # the other one to publish to different channels. It is specified in the # Redis website: http://redis.io/topics/pubsub def initialize(authorizer, storage, redis_pub, redis_sub, @@ -45,19 +42,11 @@ # renewing. # This map is updated from different threads. We use Concurrent::Map to # ensure thread-safety. @current_auths = Concurrent::Map.new - min_threads, max_threads = if threads - [threads.min, threads.max] - else - Threading.default_threads_value - end - - @thread_pool = Concurrent::ThreadPoolExecutor.new( - min_threads: min_threads, - max_threads: max_threads) + @thread_pool = Concurrent::FixedThreadPool.new(threads) end def shutdown @thread_pool.shutdown end @@ -199,19 +188,21 @@ # too late. # I cannot think of an easy way to solve this. There is some time # between the moment the requests performs the publish and the # subscribe actions. To mitigate the problem we can publish several # times during some ms. We will see if this is good enough. - # Trade-off: publishing too much increases the Redis load. Waiting too - # much makes the incoming request slow. + # Trade-off: Waiting long times between publishing attempts reduces the + # probability of triggering the problem described. However, it also makes + # incoming requests slow because tasks accumulate in the thread pool + # waiting. publish_failures = 0 - TIMES_TO_PUBLISH.times do |t| + PUBLISH_WAIT_TIMES.each do |wait_time| begin publish_auth(combination, authorization) rescue publish_failures += 1 end - sleep((1.0/50)*((t+1)**2)) + sleep(wait_time) end if publish_failures > 0 logger.warn('There was an error while publishing a response in the '\ "priority channel. Combination: #{combination}".freeze)