Sha256: 9f4692ff514b04f361ed4930f3b4fb87f6346b3c844c1521cb83ec4358a52d1a

Contents?: true

Size: 1.58 KB

Versions: 2

Compression:

Stored size: 1.58 KB

Contents

module Stepladder
  class Worker
    attr_accessor :supplier

    def initialize(p={}, &block)
      @supplier = p[:supplier]
      @filter   = p[:filter] || default_filter
      @task     = block || p[:task]
    end

    def product
      if ready_to_work?
        work.resume
      end
    end

    def ready_to_work?
      @task ||= default_task
      if (task_accepts_a_value? && supplier.nil?)
        raise "This worker's task expects to receive a value from a supplier, but has no supplier."
      end
      true
    end

    def |(subscribing_worker)
      subscribing_worker.supplier = self
      subscribing_worker
    end

    private

    def work
      @my_little_machine ||= Fiber.new do
        loop do
          value = supplier && supplier.product
          if value.nil? || passes_filter?(value)
            handoff @task.call(value)
          end
        end
      end
    end

    def default_task
      if task_method_exists?
        if task_method_accepts_a_value?
          Proc.new { |value| self.task value }
        else
          Proc.new { self.task }
        end
      else # no task method, so assuming we have supplier...
        Proc.new { |value| value }
      end
    end

    def task_accepts_a_value?
      @task.arity > 0
    end

    def task_method_exists?
      self.methods.include? :task
    end

    def task_method_accepts_a_value?
      self.method(:task).arity > 0
    end

    def default_filter
      Proc.new do |value|
        true
      end
    end

    def passes_filter?(value)
      @filter.call value
    end

  end
end

def handoff(product)
  Fiber.yield product
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
stepladder-0.0.2 lib/stepladder/worker.rb
stepladder-0.0.1 lib/stepladder/worker.rb