Sha256: 7a9be4952d27e659e0fc001ce9cf357260af7c56c200a019a21811ddd235b450

Contents?: true

Size: 1.48 KB

Versions: 4

Compression:

Stored size: 1.48 KB

Contents

module Consumers
  class Base
    # Methods which are related to implement a base consumer.

    # The queue name should be defined here.
    QUEUE_NAME = 'smart_que.default'

    attr_accessor :queue_name

    def queue_name
      @queue_name
    end

    # This method will return the default queue which present
    # in the message queues. Consumer specific queue should be
    # defined and implemented in the consumer sub classes.
    def queue
      @queue ||= channel.queue(queue_name)
    end

    # Establish connection to Message Queues.
    def connection
      ::SmartQue.establish_connection
    end

    # Create channel with the established connection.
    def channel
      @channel ||= connection.create_channel
    end

    def config
      ::SmartQue.config
    end

    # Method which kick start the consumer process thread
    def start
      channel.prefetch(10)
      queue.subscribe(manual_ack: true, exclusive: false) do |delivery_info, metadata, payload|
        begin
          body = JSON.parse(payload).with_indifferent_access
          status = run(body)
        rescue => e
          status = :error
        end

        if status == :ok
          channel.ack(delivery_info.delivery_tag)
        elsif status == :retry
          channel.reject(delivery_info.delivery_tag, true)
        else # :error, nil etc
          channel.reject(delivery_info.delivery_tag, false)
        end
      end

      wait_for_threads
    end

    def wait_for_threads
      sleep
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
smart-que-0.2.3 lib/smart_que/consumers/base.rb
smart-que-0.2.0 lib/smart_que/consumers/base.rb
smart-que-0.1.2 lib/smart_que/consumers/base.rb
smart-que-0.1.1 lib/smart_que/consumers/base.rb