Sha256: 7e7776e4f8928bcc87fedc23512df5197f93dba579451048ea3c8b79966187fd
Contents?: true
Size: 1.66 KB
Versions: 4
Compression:
Stored size: 1.66 KB
Contents
require "ostruct" require "shifty/taggable" module Shifty class Worker attr_reader :supply, :tags include Shifty::Taggable def initialize(p = {}, &block) @supply = p[:supply] @task = block || p[:task] @context = p[:context] || OpenStruct.new self.criteria = p[:criteria] self.tags = p[:tags] end def shift ensure_ready_to_work! workflow.resume end def ready_to_work? @task && (supply || !task_accepts_a_value?) end def supplies(subscribing_worker) subscribing_worker.supply = self subscribing_worker end alias_method :|, :supplies def supply=(supplier) raise WorkerError.new("Worker is a source, and cannot accept a supply") unless suppliable? @supply = supplier end def suppliable? @task && @task.arity > 0 end private def ensure_ready_to_work! @task ||= default_task unless ready_to_work? raise "This worker's task expects to receive a value from a supplier, but has no supply." end end def workflow @my_little_machine ||= Fiber.new { loop do value = supply&.shift if criteria_passes? Fiber.yield @task.call(value, supply, @context) else Fiber.yield value end end } end def default_task proc { |value| value } end def task_accepts_a_value? @task.arity > 0 end def task_method_exists? methods.include? :task end def task_method_accepts_a_value? method(:task).arity > 0 end end class WorkerError < StandardError; end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
shifty-0.5.0 | lib/shifty/worker.rb |
shifty-0.4.2 | lib/shifty/worker.rb |
shifty-0.4.1 | lib/shifty/worker.rb |
shifty-0.4.0 | lib/shifty/worker.rb |