lib/xcflushd/priority_auth_renewer.rb in xcflushd-1.1.0 vs lib/xcflushd/priority_auth_renewer.rb in xcflushd-1.2.0
- old
+ new
@@ -147,11 +147,11 @@
# success is false, we could have renewed some auths, so this could
# be more fine grained and ping the subscribers that are not interested
# in the auths that failed. Also, as we do not publish anything when
# there is an error, the subscriber waits until it timeouts.
# This is good enough for now, but there is room for improvement.
- publish_auth_repeatedly(combination, metric_auth) if success
+ publish_auth(combination, metric_auth) if success
end
end
def auth_channel_msg_2_combination(msg)
StorageKeys.pubsub_auth_msg_2_auth_info(msg)
@@ -171,54 +171,38 @@
StorageKeys.pubsub_auths_resp_channel(combination[:service_id],
combination[:credentials],
combination[:metric])
end
- def publish_auth_repeatedly(combination, authorization)
- # There is a race condition here. A renew and publish task is only run
- # when there is not another one renewing the same combination. When there
- # is another, the incoming request does not trigger a new task, but waits
- # for the publish below. The request could miss the published message
- # if events happened in this order:
- # 1) The request publishes the combination it needs in the requests
- # channel.
- # 2) A new task is not executed, because there is another renewing
- # the same combination.
- # 3) That task publishes the result.
- # 4) The request subscribes to receive the result, but now it is
- # too late.
- # I cannot think of an easy way to solve this. There is some time
- # between the moment the requests performs the publish and the
- # subscribe actions. To mitigate the problem we can publish several
- # times during some ms. We will see if this is good enough.
- # Trade-off: Waiting long times between publishing attempts reduces the
- # probability of triggering the problem described. However, it also makes
- # incoming requests slow because tasks accumulate in the thread pool
- # waiting.
- publish_failures = 0
- PUBLISH_WAIT_TIMES.each do |wait_time|
+ def authorization_message(authorization)
+ if authorization.authorized?
+ '1'.freeze
+ else
+ authorization.reason ? "0:#{authorization.reason}" : '0'.freeze
+ end
+ end
+
+ def publish_message(channel, msg)
+ wait = PUBLISH_WAIT_TIMES.each
+
+ begin
+ redis_pub.publish(channel, msg)
+ rescue => e
begin
- publish_auth(combination, authorization)
- rescue
- publish_failures += 1
+ sleep wait.next
+ rescue StopIteration
+ raise e
end
- sleep(wait_time)
+ retry
end
-
- if publish_failures > 0
- logger.warn('There was an error while publishing a response in the '\
- "priority channel. Combination: #{combination}".freeze)
- end
end
def publish_auth(combination, authorization)
- msg = if authorization.authorized?
- '1'.freeze
- else
- authorization.reason ? "0:#{authorization.reason}" : '0'.freeze
- end
-
- redis_pub.publish(channel_for_combination(combination), msg)
+ publish_message channel_for_combination(combination),
+ authorization_message(authorization)
+ rescue => e
+ logger.warn "cannot publish in priority channel " \
+ "for combination #{combination}: #{e}"
end
def currently_authorizing?(channel_msg)
# A simple solution would be something like:
# if !current_auths[channel_msg]