lib/insque.rb in insque-0.7.2 vs lib/insque.rb in insque-0.7.3
- old
+ new
@@ -75,14 +75,14 @@
when :slow
keys = [@slow_inbox]
else
keys = recipient.is_a?(Array) ? recipient : [recipient]
end
- value = { message: "#{@sender}_#{message}", params: params, broadcasted_at: Time.now.utc }
- logger.debug event: :sending, message: value, to: keys
+ value = { message: "#{@sender}_#{message}", params: params, broadcasted_at: Time.now.utc }.to_json
+ logger.debug event: :sending, message: value, to: keys.to_json
@redis.multi do |r|
- keys.each {|k| r.lpush k, value.to_json}
+ keys.each {|k| r.lpush k, value}
end
end
def self.listen worker_name='', redis=nil
redis ||= create_redis_connection
@@ -106,12 +106,12 @@
logger.info event: :starting, worker_name: worker_name, inbox: inbox
loop do
redis.setex(pointer, inbox_ttl, inbox) if pointer
message = redis.brpoplpush(inbox, processing, 0)
begin
+ logger.debug event: :receiving, message: message, inbox: inbox
parsed_message = JSON.parse message
- logger.debug event: :receiving, message: parsed_message, inbox: inbox
send(parsed_message['message'], parsed_message)
rescue NoMethodError
rescue => e
logger.error e
ensure
@@ -129,21 +129,21 @@
delete = []
redis.lrange(processing, 0, -1).each do |m|
begin
parsed_message = JSON.parse(m)
if parsed_message['restarted_at'] && Time.now.to_i - Time.parse(parsed_message['restarted_at']).to_i > processing_ttl
- errors << parsed_message
+ errors << m
delete << m
elsif Time.now.to_i - Time.parse(parsed_message['broadcasted_at']).to_i > processing_ttl
- restart << parsed_message.merge(restarted_at: Time.now.utc)
+ restart << parsed_message.merge(restarted_at: Time.now.utc).to_json
delete << m
end
rescue => e
logger.error e
end
end
result = redis.multi do |r|
- restart.each {|m| r.lpush inbox, m.to_json }
+ restart.each {|m| r.lpush inbox, m }
delete.each {|m| r.lrem processing, 0, m }
end
if result
errors.each {|m| logger.debug event: :deleting, message: m }
restart.each {|m| logger.debug event: :restarting, message: m }