Sha256: c435eb8cd989fe92d5d2dad73226829b47f80a5e2117ab7234a2c6454ef1043d

Contents?: true

Size: 913 Bytes

Versions: 4

Compression:

Stored size: 913 Bytes

Contents

# frozen_string_literal: true

require 'aws-sdk-sqs'
require 'liam/common'
require 'liam/message_processor'

module Liam
  class Consumer
    include Common

    def initialize(options: {})
      @options = options
    end

    def self.message(*args)
      new(*args).send(:execute)
    end

    private

    attr_reader :options

    def execute
      poller.poll(poller_options) do |messages|
        puts "[aws-liam] Received #{messages} messages"
        puts messages.join("\n")
        messages.each do |message|
          MessageProcessor.process(message)
        end
      end
    end

    def poller
      Aws::SQS::QueuePoller.new(sqs_queue, client: Aws::SQS::Client.new(client_options))
    end

    def sqs_queue
      env_credentials.dig('aws', 'sqs', 'queue')
    end

    def poller_options
      { max_number_of_messages: options['max_message'] || options[:max_message] || 10 }
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
aws-liam-0.0.6 lib/liam/consumer.rb
aws-liam-0.0.5 lib/liam/consumer.rb
aws-liam-0.0.4 lib/liam/consumer.rb
aws-liam-0.0.3 lib/liam/consumer.rb