Sha256: 1177edd4068aebd035750293f17de9bf5b6bb18404c8e8a24327772830829f9c

Contents?: true

Size: 1.71 KB

Versions: 3

Compression:

Stored size: 1.71 KB

Contents

# Stolen from RubyTapas by Avdi Grimm, episode 145.
module Daemonic
  class Pool

    class StopSignal

      def inspect
        "[STOP SIGNAL]"
      end
      alias_method :to_s, :inspect

    end

    STOP_SIGNAL = StopSignal.new

    attr_reader :producer

    def initialize(producer)
      @producer = producer
      @jobs    = SizedQueue.new(producer.queue_size)
      @threads = producer.concurrency.times.map {|worker_num|
        Thread.new do
          dispatch(worker_num)
        end
      }
    end

    def enqueue(job)
      logger.debug { "Enqueueing #{job.inspect}" }
      @jobs.push(job)
    end
    alias_method :<<, :enqueue

    def stop
      @threads.size.times do
        enqueue(STOP_SIGNAL)
      end
      @threads.each(&:join)
    end

    private

    def dispatch(worker_num)
      logger.debug { "T#{worker_num}: Starting" }
      loop do
        job = @jobs.pop
        if STOP_SIGNAL.equal?(job)
          logger.debug { "T#{worker_num}: Received stop signal, terminating." }
          break
        end
        begin
          logger.debug { "T#{worker_num}: Consuming #{job.inspect}" }
          worker.consume(job)
          Thread.pass
        rescue Object => error
          if error.is_a?(SystemExit) # allow app to exit
            logger.warn { "T#{worker_num}: Received SystemExit, shutting down" }
            producer.stop
          else
            logger.warn { "T#{worker_num}: #{error.class} while processing #{job}: #{error}" }
            logger.info { error.backtrace.join("\n") }
          end
          Thread.pass
        end
      end
      logger.debug { "T#{worker_num}: Stopped" }
    end

    def worker
      producer.worker
    end

    def logger
      producer.logger
    end

  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
daemonic-0.1.3 lib/daemonic/pool.rb
daemonic-0.1.2 lib/daemonic/pool.rb
daemonic-0.1.1 lib/daemonic/pool.rb