lib/shifty/dsl.rb in shifty-0.4.2 vs lib/shifty/dsl.rb in shifty-0.5.0
- old
+ new
@@ -17,40 +17,43 @@
handoff nil
end
end
end
- def relay_worker(&block)
+ def relay_worker(options = {}, &block)
+ options[:tags] ||= []
+ options[:tags] << :relay
ensure_regular_arity(block)
- Worker.new(tags: [:relay]) do |value|
+ Worker.new(options) do |value|
value && block.call(value)
end
end
- def side_worker(mode = :normal, &block)
+ def side_worker(options = {}, &block)
+ options[:tags] ||= []
+ options[:tags] << :side_effect
+ mode = options[:mode] || :normal
ensure_regular_arity(block)
- Worker.new(tags: [:side_effect]) do |value|
+ Worker.new(options) do |value|
value.tap do |v|
used_value = (mode == :hardened) ?
Marshal.load(Marshal.dump(v)) : v
v && block.call(used_value)
end
end
end
- def filter_worker(argument = nil, &block)
- if block && argument.respond_to?(:call)
- throw_with "You cannot supply two callables"
- end
- callable = argument.respond_to?(:call) ? argument : block
- ensure_callable(callable)
+ def filter_worker(options = {}, &block)
+ options[:tags] ||= []
+ options[:tags] << :filter
+ ensure_callable!(block)
- Worker.new(tags: [:filter]) do |value, supply|
- while value && !callable.call(value)
+ Worker.new(options) do |value, supply|
+ while value && !block.call(value)
value = supply.shift
end
value
end
end
@@ -60,18 +63,22 @@
value.nil? ||
!!batch_full.call(value, collection)
end
end
- def batch_worker(options = {gathering: 1}, &block)
+ def batch_worker(options = {}, &block)
+ options[:tags] ||= []
+ options[:tags] << :batch
+ options[:gathering] ||= 1
+
ensure_regular_arity(block) if block
batch_full = block ||
proc { |_, batch| batch.size >= options[:gathering] }
- batch_context = BatchContext.new({batch_full: batch_full})
+ options[:context] = BatchContext.new({batch_full: batch_full})
- Worker.new(tags: [:batch], context: batch_context) do |value, supply, context|
+ Worker.new(options) do |value, supply, context|
if value
context.collection = [value]
until context.batch_complete?(
context.collection.last,
context.collection
@@ -81,14 +88,16 @@
context.collection.compact
end
end
end
- def splitter_worker(&block)
+ def splitter_worker(options = {}, &block)
+ options[:tags] ||= []
+ options[:tags] << :splitter
ensure_regular_arity(block)
- Worker.new(tags: [:splitter]) do |value|
+ Worker.new(options) do |value|
if value.nil?
value
else
parts = [block.call(value)].flatten
while parts.size > 1
@@ -97,13 +106,15 @@
parts.shift
end
end
end
+ # don't like that this is a second exception to accepting options..
def trailing_worker(trail_length = 2)
+ options = {tags: [:trailing]}
trail = []
- Worker.new(tags: [:trailing]) do |value, supply|
+ Worker.new(options) do |value, supply|
if value
trail.unshift value
if trail.size >= trail_length
trail.pop
end
@@ -111,11 +122,11 @@
trail.unshift supply.shift
end
trail
else
- value
+ value # hint: it's nil!
end
end
end
def handoff(something)
@@ -126,10 +137,10 @@
def throw_with(*msg)
raise WorkerInitializationError.new([msg].flatten.join(" "))
end
- def ensure_callable(callable)
+ def ensure_callable!(callable)
unless callable&.respond_to?(:call)
throw_with "You must supply a callable"
end
end