Sha256: f55fa53d244028eae95f927522c15e0475660855e55972c31a38d0c8b2033e02
Contents?: true
Size: 1.38 KB
Versions: 1
Compression:
Stored size: 1.38 KB
Contents
module Daemonic class Producer attr_reader :worker, :concurrency, :options, :queue_size def initialize(worker, options) @worker = worker @options = options @concurrency = options.fetch(:concurrency) { 4 } @queue_size = options.fetch(:queue_size) { @concurrency + 1 } @logger = options[:logger] @running = true end def run logger.info "Starting producer with #{concurrency} consumer threads." at_exit { last_error = $! if last_error msg = "Shutting down: #{last_error.inspect}\n#{last_error.backtrace.join("\n")}" logger.fatal msg $stdout.puts msg else logger.fatal "Shutting down" $stdout.puts "Shutting down" end } Signal.trap("INT") { stop } Signal.trap("TERM") { stop } pool = Pool.new(self) producer = Thread.new do while @running worker.produce(pool) Thread.pass end logger.info { "Producer has been shut down. Stopping the thread pool" } pool.stop end producer.join end def stop @running = false end def logger @logger ||= @options.fetch(:logger) { Logger.new(@options[:log] || STDOUT).tap { |logger| logger.level = @options[:log_level] || Logger::INFO } } end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
daemonic-0.1.3 | lib/daemonic/producer.rb |