class Datapipes # # Build your own source logic in `run` method. # Use `produce` method to emitt data to pipe. # # def run # 10.times {|i| produce(i) } # end # class Source include Composable attr_accessor :pipe def run_all @accumulated ||= [self] set_pipe @accumulated.map {|s| Thread.new { s.run } } end private def produce(data) @pipe.recieve(data) end def set_pipe @accumulated.each {|s| s.pipe = @pipe } end end end