lib/message_bus/backends/redis.rb in message_bus-4.3.7 vs lib/message_bus/backends/redis.rb in message_bus-4.3.8

- old
+ new

@@ -1,9 +1,9 @@ # frozen_string_literal: true -require 'redis' -require 'digest' +require "redis" +require "digest" module MessageBus module Backends # The Redis backend stores published messages in Redis sorted sets (using # ZADD, where the score is the message ID), one for each channel (where @@ -47,49 +47,43 @@ # @param [Integer] max_backlog_size the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped. def initialize(redis_config = {}, max_backlog_size = 1000) @redis_config = redis_config.dup @clear_every = redis_config.delete(:clear_every) || 1 @logger = @redis_config[:logger] - unless @redis_config[:enable_redis_logger] - @redis_config[:logger] = nil - end + @redis_config[:logger] = nil unless @redis_config[:enable_redis_logger] @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 @max_in_memory_publish_backlog = 1000 @in_memory_backlog = [] @lock = Mutex.new @flush_backlog_thread = nil @pub_redis = nil @subscribed = false # after 7 days inactive backlogs will be removed - @max_backlog_age = 604800 + @max_backlog_age = 604_800 end # Reconnects to Redis; used after a process fork, typically triggered by a forking webserver # @see Base#after_fork def after_fork @pub_redis&.disconnect! end # (see Base#reset!) def reset! - pub_redis.keys("__mb_*").each do |k| - pub_redis.del k - end + pub_redis.keys("__mb_*").each { |k| pub_redis.del k } end # (see Base#destroy) def destroy @pub_redis&.disconnect! end # Deletes all backlogs and their data. Does not delete ID pointers, so new publications will get IDs that continue from the last publication before the expiry. Use with extreme caution. # @see Base#expire_all_backlogs! def expire_all_backlogs! - pub_redis.keys("__mb_*backlog_n").each do |k| - pub_redis.del k - end + pub_redis.keys("__mb_*backlog_n").each { |k| pub_redis.del k } end # Note, the script takes care of all expiry of keys, however # we do not expire the global backlog key cause we have no simple way to determine what it should be on publish # we do not provide a mechanism to set a global max backlog age, only a per-channel which we can override on publish @@ -155,27 +149,29 @@ msg.encode_without_ids, max_backlog_age, max_backlog_size, max_global_backlog_size, channel, - clear_every + clear_every, ], keys: [ global_id_key, backlog_id_key, backlog_key, global_backlog_key, - redis_channel_name - ] + redis_channel_name, + ], ) rescue ::Redis::CommandError => e if queue_in_memory && e.message =~ /READONLY/ @lock.synchronize do @in_memory_backlog << [channel, data] if @in_memory_backlog.length > @max_in_memory_publish_backlog @in_memory_backlog.delete_at(0) - @logger.warn("Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}") + @logger.warn( + "Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}", + ) end end if @flush_backlog_thread == nil @lock.synchronize do @@ -207,13 +203,11 @@ def backlog(channel, last_id = 0) redis = pub_redis backlog_key = backlog_key(channel) items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf" - items.map do |i| - MessageBus::Message.decode(i) - end + items.map { |i| MessageBus::Message.decode(i) } end # (see Base#global_backlog) def global_backlog(last_id = 0) items = pub_redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf" @@ -252,17 +246,13 @@ if last_id # we need to translate this to a global id, at least give it a shot # we are subscribing on global and global is always going to be bigger than local # so worst case is a replay of a few messages message = get_message(channel, last_id) - if message - last_id = message.global_id - end + last_id = message.global_id if message end - global_subscribe(last_id) do |m| - yield m if m.channel == channel - end + global_subscribe(last_id) { |m| yield m if m.channel == channel } end # (see Base#global_unsubscribe) def global_unsubscribe begin @@ -278,40 +268,35 @@ def global_subscribe(last_id = nil, &blk) raise ArgumentError unless block_given? highest_id = last_id - clear_backlog = lambda do - retries = 4 - begin - highest_id = process_global_backlog(highest_id, retries > 0, &blk) - rescue BackLogOutOfOrder => e - highest_id = e.highest_id - retries -= 1 - sleep(rand(50) / 1000.0) - retry + clear_backlog = + lambda do + retries = 4 + begin + highest_id = process_global_backlog(highest_id, retries > 0, &blk) + rescue BackLogOutOfOrder => e + highest_id = e.highest_id + retries -= 1 + sleep(rand(50) / 1000.0) + retry + end end - end begin global_redis = new_redis_connection - if highest_id - clear_backlog.call(&blk) - end + clear_backlog.call(&blk) if highest_id global_redis.subscribe(redis_channel_name) do |on| on.subscribe do - if highest_id - clear_backlog.call(&blk) - end + clear_backlog.call(&blk) if highest_id @subscribed = true end - on.unsubscribe do - @subscribed = false - end + on.unsubscribe { @subscribed = false } on.message do |_c, m| if m == UNSUB_MESSAGE @subscribed = false global_redis.unsubscribe @@ -344,33 +329,34 @@ end private def new_redis_connection - config = @redis_config.filter do |k, v| - # This is not ideal, required for Redis gem version 5 - # redis-client no longer accepts arbitrary params - # anything unknown will error out. - # https://github.com/redis-rb/redis-client/blob/4c8e05acfb3477c1651138a4924616e79e6116f2/lib/redis_client/config.rb#L21-L39 - # - # - # We should be doing the opposite and allowlisting params - # or splitting the object up. Starting with the smallest change that is backwards compatible - ![ - :backend, - :logger, - :long_polling_enabled, - :long_polling_interval, - :backend_options, - :base_route, - :client_message_filters, - :site_id_lookup, - :group_id_lookup, - :user_id_lookup, - :transport_codec - ].include?(k) - end + config = + @redis_config.filter do |k, v| + # This is not ideal, required for Redis gem version 5 + # redis-client no longer accepts arbitrary params + # anything unknown will error out. + # https://github.com/redis-rb/redis-client/blob/4c8e05acfb3477c1651138a4924616e79e6116f2/lib/redis_client/config.rb#L21-L39 + # + # + # We should be doing the opposite and allowlisting params + # or splitting the object up. Starting with the smallest change that is backwards compatible + !%i[ + backend + logger + long_polling_enabled + long_polling_interval + backend_options + base_route + client_message_filters + site_id_lookup + group_ids_lookup + user_id_lookup + transport_codec + ].include?(k) + end ::Redis.new(config) end # redis connection used for publishing messages def pub_redis @@ -397,13 +383,11 @@ def global_backlog_key "__mb_global_backlog_n" end def process_global_backlog(highest_id, raise_error) - if highest_id > pub_redis.get(global_id_key).to_i - highest_id = 0 - end + highest_id = 0 if highest_id > pub_redis.get(global_id_key).to_i global_backlog(highest_id).each do |old| if highest_id + 1 == old.global_id yield old highest_id = old.global_id @@ -442,23 +426,25 @@ publish(*@in_memory_backlog[0], queue_in_memory: false) rescue ::Redis::CommandError => e if e.message =~ /^READONLY/ try_again = true else - @logger.warn("Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}") + @logger.warn( + "Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}", + ) end rescue => e - @logger.warn("Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}") + @logger.warn( + "Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}", + ) end @in_memory_backlog.delete_at(0) unless try_again end end ensure - @lock.synchronize do - @flush_backlog_thread = nil - end + @lock.synchronize { @flush_backlog_thread = nil } end def cached_eval(redis, script, script_sha1, params) begin redis.evalsha script_sha1, params @@ -477,13 +463,13 @@ begin # disconnect to force a reconnect when attempting to set the key # in case we are not connected to the correct server # which can happen when sharing ips pub_redis.disconnect! - pub_redis.set(key, '1') + pub_redis.set(key, "1") false rescue ::Redis::CommandError => e - return true if e.message =~ /^READONLY/ + true if e.message =~ /^READONLY/ end end MessageBus::BACKENDS[:redis] = self end