Sha256: 20e1d16b78c7366f68d27635e26c8b090332e8dcce05f59626cd7b7532255ab7

Contents?: true

Size: 1.43 KB

Versions: 1

Compression:

Stored size: 1.43 KB

Contents

require "singleton"
require "ostruct"

module Callisto

  class Pool

    include Singleton

    attr_accessor :queue, :running, :pending, :workers

    class << self

      def settings=(options)
        defaults = {
          :max_workers => 10,
          :identifier  => proc { |entry| entry.object_id },
          :callback    => proc { |entry| entry.call }
        }
        @settings = OpenStruct.new(defaults.merge(options))
      end

      def settings
        @settings
      end

    end

    def initialize
      self.pending, self.running, self.workers = [], [], []
      self.queue = Queue.new
      1.upto(self.class.settings.max_workers) do
        worker = Thread.new do
          loop do
            task = self.queue.pop
            self.pending.delete self.class.settings.identifier.call(task)
            self.running << self.class.settings.identifier.call(task)
            self.class.settings.callback.call(task)
            self.running.delete self.class.settings.identifier.call(task)
          end
        end
        self.workers << worker
      end
    end

    def wait(id = nil)
      sleep(0.1) while (id ? processes.include?(id) : processes.any?)
    end

    def <<(task)
      identifier = self.class.settings.identifier.call(task)
      if !processes.include?(identifier)
        self.pending << identifier
        self.queue << task
      end
      identifier
    end

    def processes
      pending + running
    end

  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
callisto-0.9.1 lib/callisto/pool.rb