lib/xcflushd/priority_auth_renewer.rb in xcflushd-1.1.0 vs lib/xcflushd/priority_auth_renewer.rb in xcflushd-1.2.0

- old
+ new

@@ -147,11 +147,11 @@ # success is false, we could have renewed some auths, so this could # be more fine grained and ping the subscribers that are not interested # in the auths that failed. Also, as we do not publish anything when # there is an error, the subscriber waits until it timeouts. # This is good enough for now, but there is room for improvement. - publish_auth_repeatedly(combination, metric_auth) if success + publish_auth(combination, metric_auth) if success end end def auth_channel_msg_2_combination(msg) StorageKeys.pubsub_auth_msg_2_auth_info(msg) @@ -171,54 +171,38 @@ StorageKeys.pubsub_auths_resp_channel(combination[:service_id], combination[:credentials], combination[:metric]) end - def publish_auth_repeatedly(combination, authorization) - # There is a race condition here. A renew and publish task is only run - # when there is not another one renewing the same combination. When there - # is another, the incoming request does not trigger a new task, but waits - # for the publish below. The request could miss the published message - # if events happened in this order: - # 1) The request publishes the combination it needs in the requests - # channel. - # 2) A new task is not executed, because there is another renewing - # the same combination. - # 3) That task publishes the result. - # 4) The request subscribes to receive the result, but now it is - # too late. - # I cannot think of an easy way to solve this. There is some time - # between the moment the requests performs the publish and the - # subscribe actions. To mitigate the problem we can publish several - # times during some ms. We will see if this is good enough. - # Trade-off: Waiting long times between publishing attempts reduces the - # probability of triggering the problem described. However, it also makes - # incoming requests slow because tasks accumulate in the thread pool - # waiting. - publish_failures = 0 - PUBLISH_WAIT_TIMES.each do |wait_time| + def authorization_message(authorization) + if authorization.authorized? + '1'.freeze + else + authorization.reason ? "0:#{authorization.reason}" : '0'.freeze + end + end + + def publish_message(channel, msg) + wait = PUBLISH_WAIT_TIMES.each + + begin + redis_pub.publish(channel, msg) + rescue => e begin - publish_auth(combination, authorization) - rescue - publish_failures += 1 + sleep wait.next + rescue StopIteration + raise e end - sleep(wait_time) + retry end - - if publish_failures > 0 - logger.warn('There was an error while publishing a response in the '\ - "priority channel. Combination: #{combination}".freeze) - end end def publish_auth(combination, authorization) - msg = if authorization.authorized? - '1'.freeze - else - authorization.reason ? "0:#{authorization.reason}" : '0'.freeze - end - - redis_pub.publish(channel_for_combination(combination), msg) + publish_message channel_for_combination(combination), + authorization_message(authorization) + rescue => e + logger.warn "cannot publish in priority channel " \ + "for combination #{combination}: #{e}" end def currently_authorizing?(channel_msg) # A simple solution would be something like: # if !current_auths[channel_msg]