lib/redstream/delayer.rb in redstream-0.4.0 vs lib/redstream/delayer.rb in redstream-0.4.1

- old
+ new

@@ -46,11 +46,11 @@ # loops/blocks forever. def run_once @consumer.run_once do |messages| messages.each do |message| - seconds_to_sleep = message.message_id.to_f / 1_000 + @delay.to_f - Time.now.to_f + seconds_to_sleep = (message.message_id.to_f / 1_000) + @delay.to_f - Time.now.to_f if seconds_to_sleep > 0 if @batch.size > 0 id = @batch.last.message_id @@ -83,14 +83,14 @@ @logger.debug "Delayed #{@batch.size} messages for #{@delay.to_f} seconds on stream #{@stream_name}" Redstream.connection_pool.with do |redis| redis.pipelined do @batch.each do |message| - redis.xadd Redstream.stream_key_name(@stream_name), payload: message.fields["payload"] + redis.xadd(Redstream.stream_key_name(@stream_name), { payload: message.fields["payload"] }) end end - redis.xdel Redstream.stream_key_name("#{@stream_name}.delay"), @batch.map(&:message_id) + redis.xdel(Redstream.stream_key_name("#{@stream_name}.delay"), @batch.map(&:message_id)) end @batch = [] end end