lib/message_bus/backends/redis.rb in message_bus-2.0.0.beta.4 vs lib/message_bus/backends/redis.rb in message_bus-2.0.0.beta.5
- old
+ new
@@ -8,13 +8,11 @@
#
module MessageBus::Redis; end
class MessageBus::Redis::ReliablePubSub
attr_reader :subscribed
- attr_accessor :max_publish_retries, :max_publish_wait, :max_backlog_size,
- :max_global_backlog_size, :max_in_memory_publish_backlog,
- :max_backlog_age
+ attr_accessor :max_backlog_size, :max_global_backlog_size, :max_in_memory_publish_backlog, :max_backlog_age
UNSUB_MESSAGE = "$$UNSUBSCRIBE"
class NoMoreRetries < StandardError; end
class BackLogOutOfOrder < StandardError
@@ -31,12 +29,10 @@
unless @redis_config[:enable_redis_logger]
@redis_config[:logger] = nil
end
@max_backlog_size = max_backlog_size
@max_global_backlog_size = 2000
- @max_publish_retries = 10
- @max_publish_wait = 500 #ms
@max_in_memory_publish_backlog = 1000
@in_memory_backlog = []
@lock = Mutex.new
@flush_backlog_thread = nil
# after 7 days inactive backlogs will be removed
@@ -149,15 +145,25 @@
raise
end
end
def ensure_backlog_flushed
- while true
+ flushed = false
+
+ while !flushed
try_again = false
+ if is_readonly?
+ sleep 1
+ next
+ end
+
@lock.synchronize do
- break if @in_memory_backlog.length == 0
+ if @in_memory_backlog.length == 0
+ flushed = true
+ break
+ end
begin
publish(*@in_memory_backlog[0],false)
rescue Redis::CommandError => e
if e.message =~ /^READONLY/
@@ -169,17 +175,10 @@
MessageBus.logger.warn("Dropping undeliverable message #{e}")
end
@in_memory_backlog.delete_at(0) unless try_again
end
-
- if try_again
- sleep 0.005
- # in case we are not connected to the correct server
- # which can happen when sharing ips
- pub_redis.client.reconnect
- end
end
ensure
@lock.synchronize do
@flush_backlog_thread = nil
end
@@ -337,9 +336,25 @@
end
rescue => error
MessageBus.logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace}"
sleep 1
retry
+ end
+ end
+
+ private
+
+ def is_readonly?
+ key = "__mb_is_readonly".freeze
+
+ begin
+ # in case we are not connected to the correct server
+ # which can happen when sharing ips
+ pub_redis.client.reconnect
+ pub_redis.client.call([:set, key, '1'])
+ false
+ rescue Redis::CommandError => e
+ return true if e.message =~ /^READONLY/
end
end
MessageBus::BACKENDS[:redis] = self
end