lib/insque.rb in insque-0.7.2 vs lib/insque.rb in insque-0.7.3

- old
+ new

@@ -75,14 +75,14 @@ when :slow keys = [@slow_inbox] else keys = recipient.is_a?(Array) ? recipient : [recipient] end - value = { message: "#{@sender}_#{message}", params: params, broadcasted_at: Time.now.utc } - logger.debug event: :sending, message: value, to: keys + value = { message: "#{@sender}_#{message}", params: params, broadcasted_at: Time.now.utc }.to_json + logger.debug event: :sending, message: value, to: keys.to_json @redis.multi do |r| - keys.each {|k| r.lpush k, value.to_json} + keys.each {|k| r.lpush k, value} end end def self.listen worker_name='', redis=nil redis ||= create_redis_connection @@ -106,12 +106,12 @@ logger.info event: :starting, worker_name: worker_name, inbox: inbox loop do redis.setex(pointer, inbox_ttl, inbox) if pointer message = redis.brpoplpush(inbox, processing, 0) begin + logger.debug event: :receiving, message: message, inbox: inbox parsed_message = JSON.parse message - logger.debug event: :receiving, message: parsed_message, inbox: inbox send(parsed_message['message'], parsed_message) rescue NoMethodError rescue => e logger.error e ensure @@ -129,21 +129,21 @@ delete = [] redis.lrange(processing, 0, -1).each do |m| begin parsed_message = JSON.parse(m) if parsed_message['restarted_at'] && Time.now.to_i - Time.parse(parsed_message['restarted_at']).to_i > processing_ttl - errors << parsed_message + errors << m delete << m elsif Time.now.to_i - Time.parse(parsed_message['broadcasted_at']).to_i > processing_ttl - restart << parsed_message.merge(restarted_at: Time.now.utc) + restart << parsed_message.merge(restarted_at: Time.now.utc).to_json delete << m end rescue => e logger.error e end end result = redis.multi do |r| - restart.each {|m| r.lpush inbox, m.to_json } + restart.each {|m| r.lpush inbox, m } delete.each {|m| r.lrem processing, 0, m } end if result errors.each {|m| logger.debug event: :deleting, message: m } restart.each {|m| logger.debug event: :restarting, message: m }