Sha256: 9f4692ff514b04f361ed4930f3b4fb87f6346b3c844c1521cb83ec4358a52d1a
Contents?: true
Size: 1.58 KB
Versions: 2
Compression:
Stored size: 1.58 KB
Contents
module Stepladder class Worker attr_accessor :supplier def initialize(p={}, &block) @supplier = p[:supplier] @filter = p[:filter] || default_filter @task = block || p[:task] end def product if ready_to_work? work.resume end end def ready_to_work? @task ||= default_task if (task_accepts_a_value? && supplier.nil?) raise "This worker's task expects to receive a value from a supplier, but has no supplier." end true end def |(subscribing_worker) subscribing_worker.supplier = self subscribing_worker end private def work @my_little_machine ||= Fiber.new do loop do value = supplier && supplier.product if value.nil? || passes_filter?(value) handoff @task.call(value) end end end end def default_task if task_method_exists? if task_method_accepts_a_value? Proc.new { |value| self.task value } else Proc.new { self.task } end else # no task method, so assuming we have supplier... Proc.new { |value| value } end end def task_accepts_a_value? @task.arity > 0 end def task_method_exists? self.methods.include? :task end def task_method_accepts_a_value? self.method(:task).arity > 0 end def default_filter Proc.new do |value| true end end def passes_filter?(value) @filter.call value end end end def handoff(product) Fiber.yield product end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
stepladder-0.0.2 | lib/stepladder/worker.rb |
stepladder-0.0.1 | lib/stepladder/worker.rb |