Sha256: 369c2b01dcbf5b78a8d4057daf8196a788d4b30e12c958397d36c098a14b6ccc

Contents?: true

Size: 1.51 KB

Versions: 2

Compression:

Stored size: 1.51 KB

Contents

module ActiveHook
  module Server
    # The Queue object processes any messages that are queued into our Redis server.
    # It will perform a 'blocking pop' on our message list until one is added.
    #
    class Queue
      def initialize
        @done = false
      end

      # Starts our queue process. This will run until instructed to stop.
      #
      def start
        until @done
          json = retrieve_message
          MessageRunner.new(json) if json
        end
      end

      # Shutsdown our queue process.
      #
      def shutdown
        @done = true
      end

      private

      # Performs a 'blocking pop' on our redis queue list.
      #
      def retrieve_message
        json = Server.redis.with do |c|
          c.brpop(Server.config.queue_namespace)
        end
        json.last if json
      end
    end

    class MessageRunner
      def initialize(json)
        @message = Message.new(JSON.parse(json))
        @post = Send.new(message: @message)
        start
      end

      def start
        @post.start
        Server.redis.with do |conn|
          @post.success? ? message_success(conn) : message_failed(conn)
        end
      end

      private

      def message_success(conn)
        conn.incr("#{Server.config.queue_namespace}:success")
      end

      def message_failed(conn)
        conn.incr("#{Server.config.queue_namespace}:failed")
        return unless @message.retry?
        conn.zadd(Server.config.retry_namespace, @message.retry_at, @message.to_json)
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
activehook-server-0.1.7 lib/activehook/server/queue.rb
activehook-server-0.1.6 lib/activehook/server/queue.rb