lib/shifty/dsl.rb in shifty-0.4.2 vs lib/shifty/dsl.rb in shifty-0.5.0

- old
+ new

@@ -17,40 +17,43 @@ handoff nil end end end - def relay_worker(&block) + def relay_worker(options = {}, &block) + options[:tags] ||= [] + options[:tags] << :relay ensure_regular_arity(block) - Worker.new(tags: [:relay]) do |value| + Worker.new(options) do |value| value && block.call(value) end end - def side_worker(mode = :normal, &block) + def side_worker(options = {}, &block) + options[:tags] ||= [] + options[:tags] << :side_effect + mode = options[:mode] || :normal ensure_regular_arity(block) - Worker.new(tags: [:side_effect]) do |value| + Worker.new(options) do |value| value.tap do |v| used_value = (mode == :hardened) ? Marshal.load(Marshal.dump(v)) : v v && block.call(used_value) end end end - def filter_worker(argument = nil, &block) - if block && argument.respond_to?(:call) - throw_with "You cannot supply two callables" - end - callable = argument.respond_to?(:call) ? argument : block - ensure_callable(callable) + def filter_worker(options = {}, &block) + options[:tags] ||= [] + options[:tags] << :filter + ensure_callable!(block) - Worker.new(tags: [:filter]) do |value, supply| - while value && !callable.call(value) + Worker.new(options) do |value, supply| + while value && !block.call(value) value = supply.shift end value end end @@ -60,18 +63,22 @@ value.nil? || !!batch_full.call(value, collection) end end - def batch_worker(options = {gathering: 1}, &block) + def batch_worker(options = {}, &block) + options[:tags] ||= [] + options[:tags] << :batch + options[:gathering] ||= 1 + ensure_regular_arity(block) if block batch_full = block || proc { |_, batch| batch.size >= options[:gathering] } - batch_context = BatchContext.new({batch_full: batch_full}) + options[:context] = BatchContext.new({batch_full: batch_full}) - Worker.new(tags: [:batch], context: batch_context) do |value, supply, context| + Worker.new(options) do |value, supply, context| if value context.collection = [value] until context.batch_complete?( context.collection.last, context.collection @@ -81,14 +88,16 @@ context.collection.compact end end end - def splitter_worker(&block) + def splitter_worker(options = {}, &block) + options[:tags] ||= [] + options[:tags] << :splitter ensure_regular_arity(block) - Worker.new(tags: [:splitter]) do |value| + Worker.new(options) do |value| if value.nil? value else parts = [block.call(value)].flatten while parts.size > 1 @@ -97,13 +106,15 @@ parts.shift end end end + # don't like that this is a second exception to accepting options.. def trailing_worker(trail_length = 2) + options = {tags: [:trailing]} trail = [] - Worker.new(tags: [:trailing]) do |value, supply| + Worker.new(options) do |value, supply| if value trail.unshift value if trail.size >= trail_length trail.pop end @@ -111,11 +122,11 @@ trail.unshift supply.shift end trail else - value + value # hint: it's nil! end end end def handoff(something) @@ -126,10 +137,10 @@ def throw_with(*msg) raise WorkerInitializationError.new([msg].flatten.join(" ")) end - def ensure_callable(callable) + def ensure_callable!(callable) unless callable&.respond_to?(:call) throw_with "You must supply a callable" end end