Sha256: 3730da2a62df02c84e5a2819a481d717a7f3b7d25d8e91ced6fc0d676d3b3172
Contents?: true
Size: 1.01 KB
Versions: 2
Compression:
Stored size: 1.01 KB
Contents
require "glass_octopus/unit_of_work" module GlassOctopus # @api private class Consumer attr_reader :connection, :processor, :executor, :logger def initialize(connection, processor, executor, logger) @connection = connection @processor = processor @executor = executor @logger = logger end def run connection.fetch_message do |message| work = UnitOfWork.new(message, processor, logger) submit(work) end end def shutdown(timeout=10) connection.close executor.shutdown logger.info("Waiting for workers to terminate...") executor.wait_for_termination(timeout) end def submit(work) if executor.post(work) { |work| work.perform } logger.debug { "Accepted message: #{work.message.to_h}" } else logger.warn { "Rejected message: #{work.message.to_h}" } end rescue Concurrent::RejectedExecutionError logger.warn { "Rejected message: #{work.message.to_h}" } end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
glass_octopus-1.1.0 | lib/glass_octopus/consumer.rb |
glass_octopus-1.0.0 | lib/glass_octopus/consumer.rb |