lib/message_bus/backends/redis.rb in message_bus-2.0.0.beta.4 vs lib/message_bus/backends/redis.rb in message_bus-2.0.0.beta.5

- old
+ new

@@ -8,13 +8,11 @@ # module MessageBus::Redis; end class MessageBus::Redis::ReliablePubSub attr_reader :subscribed - attr_accessor :max_publish_retries, :max_publish_wait, :max_backlog_size, - :max_global_backlog_size, :max_in_memory_publish_backlog, - :max_backlog_age + attr_accessor :max_backlog_size, :max_global_backlog_size, :max_in_memory_publish_backlog, :max_backlog_age UNSUB_MESSAGE = "$$UNSUBSCRIBE" class NoMoreRetries < StandardError; end class BackLogOutOfOrder < StandardError @@ -31,12 +29,10 @@ unless @redis_config[:enable_redis_logger] @redis_config[:logger] = nil end @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 - @max_publish_retries = 10 - @max_publish_wait = 500 #ms @max_in_memory_publish_backlog = 1000 @in_memory_backlog = [] @lock = Mutex.new @flush_backlog_thread = nil # after 7 days inactive backlogs will be removed @@ -149,15 +145,25 @@ raise end end def ensure_backlog_flushed - while true + flushed = false + + while !flushed try_again = false + if is_readonly? + sleep 1 + next + end + @lock.synchronize do - break if @in_memory_backlog.length == 0 + if @in_memory_backlog.length == 0 + flushed = true + break + end begin publish(*@in_memory_backlog[0],false) rescue Redis::CommandError => e if e.message =~ /^READONLY/ @@ -169,17 +175,10 @@ MessageBus.logger.warn("Dropping undeliverable message #{e}") end @in_memory_backlog.delete_at(0) unless try_again end - - if try_again - sleep 0.005 - # in case we are not connected to the correct server - # which can happen when sharing ips - pub_redis.client.reconnect - end end ensure @lock.synchronize do @flush_backlog_thread = nil end @@ -337,9 +336,25 @@ end rescue => error MessageBus.logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace}" sleep 1 retry + end + end + + private + + def is_readonly? + key = "__mb_is_readonly".freeze + + begin + # in case we are not connected to the correct server + # which can happen when sharing ips + pub_redis.client.reconnect + pub_redis.client.call([:set, key, '1']) + false + rescue Redis::CommandError => e + return true if e.message =~ /^READONLY/ end end MessageBus::BACKENDS[:redis] = self end