Sha256: 5fc120f600b6d48fd455fd82926c57de57cd1186e728dba6b409204a8ded0d85

Contents?: true

Size: 1.22 KB

Versions: 1

Compression:

Stored size: 1.22 KB

Contents

require 'celluloid' unless defined? Celluloid

module Celluloid
  class Coordinator
    include Celluloid

    def initialize(destination)
      @destination = destination
      @backlog = []
      @futures = []
    end

    def enqueue(epic)
      if @running
        @backlog << epic
      else
        @current = epic.to_a
        3.times do
          async.dequeue
        end
      end
    end

    def dequeue
      if work = @current.shift
        Celluloid::Internals::Logger.info "sending work: #{work.inspect}"
        result = @destination.sync(:perform, work)
        Celluloid::Internals::Logger.info "got result: #{result.inspect}"
        async.dequeue
      else
        Celluloid::Internals::Logger.info "no more work"
      end
    end
  end

  class Reiterator
    def initialize(ary)
      @iterator = ary.each
    end

    def next
      @iterator.next
    rescue StopIteration
      @iterator.rewind
      retry
    end
  end

  class Router
    include Celluloid

    def initialize(number, klass)
      @children = number.times.map do
        klass.new
      end
      @iterator = Reiterator.new(@children)
    end

    def method_missing(meth, *args, &block)
      @iterator.next.sync(meth, *args, &block)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
celluloid-coordinator-0.0.1.pre0 lib/celluloid/coordinator.rb