lib/datapipes.rb in datapipes-0.1.4 vs lib/datapipes.rb in datapipes-0.1.5

- old
+ new

@@ -10,17 +10,27 @@ class Datapipes # Pass datapipes components instances. # Each component can be composed. See detail in examples. # - # tube and pipe are optional. - # If not given tube, a default tube which takes no effect is used. - def initialize(source, sink, tube: Tube.new, pipe: Pipe.new) - @source = source - @tube = tube - @sink = sink - @pipe = pipe + # Pass parameters as a hash: + # + # datapipe = Datapipes.new(source: my_source, sink: my_sink) + # + # Or pass all: + # + # datapipe = Datapipes.new( + # source: my_source, + # sink: my_sink, + # tube: my_tube, + # pipe: my_pipe + # ) + # + # All arguments are optional. But in most case, you specify + # source and sink. + def initialize(args = {}) + @source, @sink, @tube, @pipe = *ArgParser.extract(args) end # Run sources, data flow via pipe, tubes and sinks work. # Everything work with just call this method. # @@ -28,35 +38,45 @@ # this method returns. def run_resource @source.pipe = @pipe runners = @source.run_all - consumer = run_comsumer + sink = run_sink runners.each(&:join) notify_resource_ending - consumer.join + sink.join end private - def run_comsumer + def run_sink Thread.new do loop do - data = @pipe.pull + data = @pipe.pour_out break if resource_ended?(data) @sink.run_all(@tube.run(data)) end end end def notify_resource_ending - @pipe.recieve Notification.new + @pipe.pour_in Notification.new end def resource_ended?(data) data.is_a? Notification end Notification = Class.new + + module ArgParser + def self.extract(args) + source = args[:source] || Source.new + sink = args[:sink] || Sink.new + tube = args[:tube] || Tube.new + pipe = args[:pipe] || Pipe.new + [source, sink, tube, pipe] + end + end end