lib/xcflushd/priority_auth_renewer.rb in xcflushd-1.0.0 vs lib/xcflushd/priority_auth_renewer.rb in xcflushd-1.1.0
- old
+ new
@@ -1,7 +1,5 @@
-require 'xcflushd/threading'
-
module Xcflushd
# Apart from flushing all the cached reports and renewing the authorizations
# periodically, we need to provide a mechanism to renew a specific auth at
# any time. The information needed is the combination of service, application
# credentials and metric.
@@ -20,13 +18,12 @@
# combination, a channel of this type is created. The client is
# subscribed to this channel, and xcflushd will publish the authorization
# status once it gets it from 3scale.
class PriorityAuthRenewer
- # Number of times that a response is published
- TIMES_TO_PUBLISH = 5
- private_constant :TIMES_TO_PUBLISH
+ PUBLISH_WAIT_TIMES = [0.002, 0.005, 0.005, 0.01, 0.015].freeze
+ private_constant :PUBLISH_WAIT_TIMES
# We need two separate Redis clients: one for subscribing to a channel and
# the other one to publish to different channels. It is specified in the
# Redis website: http://redis.io/topics/pubsub
def initialize(authorizer, storage, redis_pub, redis_sub,
@@ -45,19 +42,11 @@
# renewing.
# This map is updated from different threads. We use Concurrent::Map to
# ensure thread-safety.
@current_auths = Concurrent::Map.new
- min_threads, max_threads = if threads
- [threads.min, threads.max]
- else
- Threading.default_threads_value
- end
-
- @thread_pool = Concurrent::ThreadPoolExecutor.new(
- min_threads: min_threads,
- max_threads: max_threads)
+ @thread_pool = Concurrent::FixedThreadPool.new(threads)
end
def shutdown
@thread_pool.shutdown
end
@@ -199,19 +188,21 @@
# too late.
# I cannot think of an easy way to solve this. There is some time
# between the moment the requests performs the publish and the
# subscribe actions. To mitigate the problem we can publish several
# times during some ms. We will see if this is good enough.
- # Trade-off: publishing too much increases the Redis load. Waiting too
- # much makes the incoming request slow.
+ # Trade-off: Waiting long times between publishing attempts reduces the
+ # probability of triggering the problem described. However, it also makes
+ # incoming requests slow because tasks accumulate in the thread pool
+ # waiting.
publish_failures = 0
- TIMES_TO_PUBLISH.times do |t|
+ PUBLISH_WAIT_TIMES.each do |wait_time|
begin
publish_auth(combination, authorization)
rescue
publish_failures += 1
end
- sleep((1.0/50)*((t+1)**2))
+ sleep(wait_time)
end
if publish_failures > 0
logger.warn('There was an error while publishing a response in the '\
"priority channel. Combination: #{combination}".freeze)