Sha256: 369c2b01dcbf5b78a8d4057daf8196a788d4b30e12c958397d36c098a14b6ccc
Contents?: true
Size: 1.51 KB
Versions: 2
Compression:
Stored size: 1.51 KB
Contents
module ActiveHook module Server # The Queue object processes any messages that are queued into our Redis server. # It will perform a 'blocking pop' on our message list until one is added. # class Queue def initialize @done = false end # Starts our queue process. This will run until instructed to stop. # def start until @done json = retrieve_message MessageRunner.new(json) if json end end # Shutsdown our queue process. # def shutdown @done = true end private # Performs a 'blocking pop' on our redis queue list. # def retrieve_message json = Server.redis.with do |c| c.brpop(Server.config.queue_namespace) end json.last if json end end class MessageRunner def initialize(json) @message = Message.new(JSON.parse(json)) @post = Send.new(message: @message) start end def start @post.start Server.redis.with do |conn| @post.success? ? message_success(conn) : message_failed(conn) end end private def message_success(conn) conn.incr("#{Server.config.queue_namespace}:success") end def message_failed(conn) conn.incr("#{Server.config.queue_namespace}:failed") return unless @message.retry? conn.zadd(Server.config.retry_namespace, @message.retry_at, @message.to_json) end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
activehook-server-0.1.7 | lib/activehook/server/queue.rb |
activehook-server-0.1.6 | lib/activehook/server/queue.rb |