lib/redstream/delayer.rb in redstream-0.4.0 vs lib/redstream/delayer.rb in redstream-0.4.1
- old
+ new
@@ -46,11 +46,11 @@
# loops/blocks forever.
def run_once
@consumer.run_once do |messages|
messages.each do |message|
- seconds_to_sleep = message.message_id.to_f / 1_000 + @delay.to_f - Time.now.to_f
+ seconds_to_sleep = (message.message_id.to_f / 1_000) + @delay.to_f - Time.now.to_f
if seconds_to_sleep > 0
if @batch.size > 0
id = @batch.last.message_id
@@ -83,14 +83,14 @@
@logger.debug "Delayed #{@batch.size} messages for #{@delay.to_f} seconds on stream #{@stream_name}"
Redstream.connection_pool.with do |redis|
redis.pipelined do
@batch.each do |message|
- redis.xadd Redstream.stream_key_name(@stream_name), payload: message.fields["payload"]
+ redis.xadd(Redstream.stream_key_name(@stream_name), { payload: message.fields["payload"] })
end
end
- redis.xdel Redstream.stream_key_name("#{@stream_name}.delay"), @batch.map(&:message_id)
+ redis.xdel(Redstream.stream_key_name("#{@stream_name}.delay"), @batch.map(&:message_id))
end
@batch = []
end
end