Sha256: b9e511825f4290ab934fa6f66f6b84119120d3207cd96f06baa9addb00c7ea63

Contents?: true

Size: 1.97 KB

Versions: 3

Compression:

Stored size: 1.97 KB

Contents

require "logger"
require "concurrent"
require "net/http"
require "aws-sdk"

module Sqspoller
  class QueueController

    def initialize(queue_name, polling_threads_count, task_delegator, access_key_id, secret_access_key, region, logger_file)
      @logger = Logger.new(logger_file)
      @queue_name = queue_name
      @polling_threads_count = polling_threads_count
      @sqs = Aws::SQS::Client.new(:access_key_id => access_key_id, :secret_access_key => secret_access_key, :region => region)
      @queue_details = @sqs.get_queue_url(queue_name: queue_name)
      @threads = []
      @task_delegator = task_delegator
    end

    def start
      queue_url = @queue_details.queue_url
      @logger.info "Going to start polling threads for queue: #{queue_url}"
      @polling_threads_count.times do
        start_thread queue_url
      end
    end

    def threads
      return @threads
    end

    def start_thread(queue_url)
      @threads << Thread.new do
        @logger.info "Poller thread started for queue: #{queue_url}"
        poller = Aws::SQS::QueuePoller.new(queue_url)

        while true
          @logger.info "Polling queue #{@queue_name} for messages"
          begin
            msgs = @sqs.receive_message :queue_url => queue_url
          rescue Exception => e
            @logger.info "Error receiving messages from queue #{@queue_name}: #{e.message}"
            next
          end
          msgs.messages.each { |received_message|
            begin
              @logger.info "Received message #{@queue_name} : #{received_message.message_id}"
              @task_delegator.process self, received_message, @queue_name
            rescue Exception => e
              @logger.info "Encountered error #{e.message} while submitting message from queue #{queue_url}"
            end
          }
        end
      end
    end

    def delete_message(receipt_handle)
      @sqs.delete_message(
        queue_url: @queue_details.queue_url,
        receipt_handle: receipt_handle
      )
    end

  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
fourkites-sqspoller-0.1.12.28 lib/sqspoller/queue_controller.rb
fourkites-sqspoller-0.1.12.27 lib/sqspoller/queue_controller.rb
fourkites-sqspoller-0.1.12.26 lib/sqspoller/queue_controller.rb