Sha256: f55fa53d244028eae95f927522c15e0475660855e55972c31a38d0c8b2033e02

Contents?: true

Size: 1.38 KB

Versions: 1

Compression:

Stored size: 1.38 KB

Contents

module Daemonic
  class Producer

    attr_reader :worker, :concurrency, :options, :queue_size

    def initialize(worker, options)
      @worker      = worker
      @options     = options
      @concurrency = options.fetch(:concurrency) { 4 }
      @queue_size  = options.fetch(:queue_size) { @concurrency + 1 }
      @logger      = options[:logger]
      @running     = true
    end

    def run

      logger.info "Starting producer with #{concurrency} consumer threads."

      at_exit {
        last_error = $!
        if last_error
          msg = "Shutting down: #{last_error.inspect}\n#{last_error.backtrace.join("\n")}"
          logger.fatal msg
          $stdout.puts msg
        else
          logger.fatal "Shutting down"
          $stdout.puts "Shutting down"
        end
      }

      Signal.trap("INT") { stop }
      Signal.trap("TERM") { stop }

      pool = Pool.new(self)

      producer = Thread.new do
        while @running
          worker.produce(pool)
          Thread.pass
        end
        logger.info { "Producer has been shut down. Stopping the thread pool" }
        pool.stop
      end

      producer.join

    end

    def stop
      @running = false
    end

    def logger
      @logger ||= @options.fetch(:logger) {
        Logger.new(@options[:log] || STDOUT).tap { |logger|
          logger.level = @options[:log_level] || Logger::INFO
        }
      }
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
daemonic-0.1.3 lib/daemonic/producer.rb