lib/datapipes.rb in datapipes-0.1.4 vs lib/datapipes.rb in datapipes-0.1.5
- old
+ new
@@ -10,17 +10,27 @@
class Datapipes
# Pass datapipes components instances.
# Each component can be composed. See detail in examples.
#
- # tube and pipe are optional.
- # If not given tube, a default tube which takes no effect is used.
- def initialize(source, sink, tube: Tube.new, pipe: Pipe.new)
- @source = source
- @tube = tube
- @sink = sink
- @pipe = pipe
+ # Pass parameters as a hash:
+ #
+ # datapipe = Datapipes.new(source: my_source, sink: my_sink)
+ #
+ # Or pass all:
+ #
+ # datapipe = Datapipes.new(
+ # source: my_source,
+ # sink: my_sink,
+ # tube: my_tube,
+ # pipe: my_pipe
+ # )
+ #
+ # All arguments are optional. But in most case, you specify
+ # source and sink.
+ def initialize(args = {})
+ @source, @sink, @tube, @pipe = *ArgParser.extract(args)
end
# Run sources, data flow via pipe, tubes and sinks work.
# Everything work with just call this method.
#
@@ -28,35 +38,45 @@
# this method returns.
def run_resource
@source.pipe = @pipe
runners = @source.run_all
- consumer = run_comsumer
+ sink = run_sink
runners.each(&:join)
notify_resource_ending
- consumer.join
+ sink.join
end
private
- def run_comsumer
+ def run_sink
Thread.new do
loop do
- data = @pipe.pull
+ data = @pipe.pour_out
break if resource_ended?(data)
@sink.run_all(@tube.run(data))
end
end
end
def notify_resource_ending
- @pipe.recieve Notification.new
+ @pipe.pour_in Notification.new
end
def resource_ended?(data)
data.is_a? Notification
end
Notification = Class.new
+
+ module ArgParser
+ def self.extract(args)
+ source = args[:source] || Source.new
+ sink = args[:sink] || Sink.new
+ tube = args[:tube] || Tube.new
+ pipe = args[:pipe] || Pipe.new
+ [source, sink, tube, pipe]
+ end
+ end
end