Sha256: 3730da2a62df02c84e5a2819a481d717a7f3b7d25d8e91ced6fc0d676d3b3172

Contents?: true

Size: 1.01 KB

Versions: 2

Compression:

Stored size: 1.01 KB

Contents

require "glass_octopus/unit_of_work"

module GlassOctopus
  # @api private
  class Consumer
    attr_reader :connection, :processor, :executor, :logger

    def initialize(connection, processor, executor, logger)
      @connection = connection
      @processor  = processor
      @executor   = executor
      @logger     = logger
    end

    def run
      connection.fetch_message do |message|
        work = UnitOfWork.new(message, processor, logger)
        submit(work)
      end
    end

    def shutdown(timeout=10)
      connection.close
      executor.shutdown
      logger.info("Waiting for workers to terminate...")
      executor.wait_for_termination(timeout)
    end

    def submit(work)
      if executor.post(work) { |work| work.perform }
        logger.debug { "Accepted message: #{work.message.to_h}" }
      else
        logger.warn { "Rejected message: #{work.message.to_h}" }
      end
    rescue Concurrent::RejectedExecutionError
      logger.warn { "Rejected message: #{work.message.to_h}" }
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
glass_octopus-1.1.0 lib/glass_octopus/consumer.rb
glass_octopus-1.0.0 lib/glass_octopus/consumer.rb