Sha256: 6bef24a95393aa28233900956ef6000561b28f888cd931a77587d9cf4955bb8c

Contents?: true

Size: 1.9 KB

Versions: 1

Compression:

Stored size: 1.9 KB

Contents

module Stepladder
  class Worker
    attr_accessor :supplier

    def initialize(p={}, &block)
      @supplier = p[:supplier]
      @filter   = p[:filter] || default_filter
      @task     = block || p[:task]
      # don't define default task here
      # because we want to allow for
      # an initialized worker to have
      # a task injected, including
      # method-based tasks.
    end

    def product
      ensure_ready_to_work!
      workflow.resume
    end

    def ready_to_work?
      @task && (supplier || !task_accepts_a_value?)
    end

    def |(subscribing_worker)
      subscribing_worker.supplier = self
      subscribing_worker
    end

    private

    def ensure_ready_to_work!
      @task ||= default_task
      # at this point we will ensure a task exists
      # because we know that the worker is being
      # asked for product

      unless ready_to_work?
        raise "This worker's task expects to receive a value from a supplier, but has no supplier."
      end
    end

    def workflow
      @my_little_machine ||= Fiber.new do
        loop do
          value = supplier && supplier.product
          if value.nil? || passes_filter?(value)
            Fiber.yield @task.call(value)
          end
        end
      end
    end

    def default_task
      if task_method_exists?
        if task_method_accepts_a_value?
          Proc.new { |value| self.task value }
        else
          Proc.new { self.task }
        end
      else # no task method, so assuming we have supplier...
        Proc.new { |value| value }
      end
    end

    def task_accepts_a_value?
      @task.arity > 0
    end

    def task_method_exists?
      self.methods.include? :task
    end

    def task_method_accepts_a_value?
      self.method(:task).arity > 0
    end

    def default_filter
      Proc.new do |value|
        true
      end
    end

    def passes_filter?(value)
      @filter.call value
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
stepladder-0.2.0 lib/stepladder/worker.rb