lib/message_bus/backends/redis.rb in message_bus-4.3.7 vs lib/message_bus/backends/redis.rb in message_bus-4.3.8
- old
+ new
@@ -1,9 +1,9 @@
# frozen_string_literal: true
-require 'redis'
-require 'digest'
+require "redis"
+require "digest"
module MessageBus
module Backends
# The Redis backend stores published messages in Redis sorted sets (using
# ZADD, where the score is the message ID), one for each channel (where
@@ -47,49 +47,43 @@
# @param [Integer] max_backlog_size the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.
def initialize(redis_config = {}, max_backlog_size = 1000)
@redis_config = redis_config.dup
@clear_every = redis_config.delete(:clear_every) || 1
@logger = @redis_config[:logger]
- unless @redis_config[:enable_redis_logger]
- @redis_config[:logger] = nil
- end
+ @redis_config[:logger] = nil unless @redis_config[:enable_redis_logger]
@max_backlog_size = max_backlog_size
@max_global_backlog_size = 2000
@max_in_memory_publish_backlog = 1000
@in_memory_backlog = []
@lock = Mutex.new
@flush_backlog_thread = nil
@pub_redis = nil
@subscribed = false
# after 7 days inactive backlogs will be removed
- @max_backlog_age = 604800
+ @max_backlog_age = 604_800
end
# Reconnects to Redis; used after a process fork, typically triggered by a forking webserver
# @see Base#after_fork
def after_fork
@pub_redis&.disconnect!
end
# (see Base#reset!)
def reset!
- pub_redis.keys("__mb_*").each do |k|
- pub_redis.del k
- end
+ pub_redis.keys("__mb_*").each { |k| pub_redis.del k }
end
# (see Base#destroy)
def destroy
@pub_redis&.disconnect!
end
# Deletes all backlogs and their data. Does not delete ID pointers, so new publications will get IDs that continue from the last publication before the expiry. Use with extreme caution.
# @see Base#expire_all_backlogs!
def expire_all_backlogs!
- pub_redis.keys("__mb_*backlog_n").each do |k|
- pub_redis.del k
- end
+ pub_redis.keys("__mb_*backlog_n").each { |k| pub_redis.del k }
end
# Note, the script takes care of all expiry of keys, however
# we do not expire the global backlog key cause we have no simple way to determine what it should be on publish
# we do not provide a mechanism to set a global max backlog age, only a per-channel which we can override on publish
@@ -155,27 +149,29 @@
msg.encode_without_ids,
max_backlog_age,
max_backlog_size,
max_global_backlog_size,
channel,
- clear_every
+ clear_every,
],
keys: [
global_id_key,
backlog_id_key,
backlog_key,
global_backlog_key,
- redis_channel_name
- ]
+ redis_channel_name,
+ ],
)
rescue ::Redis::CommandError => e
if queue_in_memory && e.message =~ /READONLY/
@lock.synchronize do
@in_memory_backlog << [channel, data]
if @in_memory_backlog.length > @max_in_memory_publish_backlog
@in_memory_backlog.delete_at(0)
- @logger.warn("Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}")
+ @logger.warn(
+ "Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}",
+ )
end
end
if @flush_backlog_thread == nil
@lock.synchronize do
@@ -207,13 +203,11 @@
def backlog(channel, last_id = 0)
redis = pub_redis
backlog_key = backlog_key(channel)
items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"
- items.map do |i|
- MessageBus::Message.decode(i)
- end
+ items.map { |i| MessageBus::Message.decode(i) }
end
# (see Base#global_backlog)
def global_backlog(last_id = 0)
items = pub_redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf"
@@ -252,17 +246,13 @@
if last_id
# we need to translate this to a global id, at least give it a shot
# we are subscribing on global and global is always going to be bigger than local
# so worst case is a replay of a few messages
message = get_message(channel, last_id)
- if message
- last_id = message.global_id
- end
+ last_id = message.global_id if message
end
- global_subscribe(last_id) do |m|
- yield m if m.channel == channel
- end
+ global_subscribe(last_id) { |m| yield m if m.channel == channel }
end
# (see Base#global_unsubscribe)
def global_unsubscribe
begin
@@ -278,40 +268,35 @@
def global_subscribe(last_id = nil, &blk)
raise ArgumentError unless block_given?
highest_id = last_id
- clear_backlog = lambda do
- retries = 4
- begin
- highest_id = process_global_backlog(highest_id, retries > 0, &blk)
- rescue BackLogOutOfOrder => e
- highest_id = e.highest_id
- retries -= 1
- sleep(rand(50) / 1000.0)
- retry
+ clear_backlog =
+ lambda do
+ retries = 4
+ begin
+ highest_id = process_global_backlog(highest_id, retries > 0, &blk)
+ rescue BackLogOutOfOrder => e
+ highest_id = e.highest_id
+ retries -= 1
+ sleep(rand(50) / 1000.0)
+ retry
+ end
end
- end
begin
global_redis = new_redis_connection
- if highest_id
- clear_backlog.call(&blk)
- end
+ clear_backlog.call(&blk) if highest_id
global_redis.subscribe(redis_channel_name) do |on|
on.subscribe do
- if highest_id
- clear_backlog.call(&blk)
- end
+ clear_backlog.call(&blk) if highest_id
@subscribed = true
end
- on.unsubscribe do
- @subscribed = false
- end
+ on.unsubscribe { @subscribed = false }
on.message do |_c, m|
if m == UNSUB_MESSAGE
@subscribed = false
global_redis.unsubscribe
@@ -344,33 +329,34 @@
end
private
def new_redis_connection
- config = @redis_config.filter do |k, v|
- # This is not ideal, required for Redis gem version 5
- # redis-client no longer accepts arbitrary params
- # anything unknown will error out.
- # https://github.com/redis-rb/redis-client/blob/4c8e05acfb3477c1651138a4924616e79e6116f2/lib/redis_client/config.rb#L21-L39
- #
- #
- # We should be doing the opposite and allowlisting params
- # or splitting the object up. Starting with the smallest change that is backwards compatible
- ![
- :backend,
- :logger,
- :long_polling_enabled,
- :long_polling_interval,
- :backend_options,
- :base_route,
- :client_message_filters,
- :site_id_lookup,
- :group_id_lookup,
- :user_id_lookup,
- :transport_codec
- ].include?(k)
- end
+ config =
+ @redis_config.filter do |k, v|
+ # This is not ideal, required for Redis gem version 5
+ # redis-client no longer accepts arbitrary params
+ # anything unknown will error out.
+ # https://github.com/redis-rb/redis-client/blob/4c8e05acfb3477c1651138a4924616e79e6116f2/lib/redis_client/config.rb#L21-L39
+ #
+ #
+ # We should be doing the opposite and allowlisting params
+ # or splitting the object up. Starting with the smallest change that is backwards compatible
+ !%i[
+ backend
+ logger
+ long_polling_enabled
+ long_polling_interval
+ backend_options
+ base_route
+ client_message_filters
+ site_id_lookup
+ group_ids_lookup
+ user_id_lookup
+ transport_codec
+ ].include?(k)
+ end
::Redis.new(config)
end
# redis connection used for publishing messages
def pub_redis
@@ -397,13 +383,11 @@
def global_backlog_key
"__mb_global_backlog_n"
end
def process_global_backlog(highest_id, raise_error)
- if highest_id > pub_redis.get(global_id_key).to_i
- highest_id = 0
- end
+ highest_id = 0 if highest_id > pub_redis.get(global_id_key).to_i
global_backlog(highest_id).each do |old|
if highest_id + 1 == old.global_id
yield old
highest_id = old.global_id
@@ -442,23 +426,25 @@
publish(*@in_memory_backlog[0], queue_in_memory: false)
rescue ::Redis::CommandError => e
if e.message =~ /^READONLY/
try_again = true
else
- @logger.warn("Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}")
+ @logger.warn(
+ "Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}",
+ )
end
rescue => e
- @logger.warn("Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}")
+ @logger.warn(
+ "Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}",
+ )
end
@in_memory_backlog.delete_at(0) unless try_again
end
end
ensure
- @lock.synchronize do
- @flush_backlog_thread = nil
- end
+ @lock.synchronize { @flush_backlog_thread = nil }
end
def cached_eval(redis, script, script_sha1, params)
begin
redis.evalsha script_sha1, params
@@ -477,13 +463,13 @@
begin
# disconnect to force a reconnect when attempting to set the key
# in case we are not connected to the correct server
# which can happen when sharing ips
pub_redis.disconnect!
- pub_redis.set(key, '1')
+ pub_redis.set(key, "1")
false
rescue ::Redis::CommandError => e
- return true if e.message =~ /^READONLY/
+ true if e.message =~ /^READONLY/
end
end
MessageBus::BACKENDS[:redis] = self
end