lib/thread/pipe.rb in thread-0.0.6 vs lib/thread/pipe.rb in thread-0.0.6.1

- old
+ new

@@ -12,29 +12,41 @@ # A pipe lets you execute various tasks on a set of data in parallel, # each datum inserted in the pipe is passed along through queues to the various # functions composing the pipe, the final result is inserted in the final queue. class Thread::Pipe + # A task incapsulates a part of the pipe. class Task attr_accessor :input, :output + # Create a Task which will call the passed function and get input + # from the optional parameter and put output in the optional parameter. def initialize (func, input = Queue.new, output = Queue.new) - @input = input - @output = output + @input = input + @output = output + @handling = false @thread = Thread.new { while true + value = @input.deq + + @handling = true begin - value = @input.deq value = func.call(value) - @output.enq value rescue Exception; end + @handling = false end } end + # Check if the task has nothing to do. + def empty? + !@handling && @input.empty? && @output.empty? + end + + # Stop the task. def kill @thread.raise end end @@ -49,25 +61,17 @@ @output = output ObjectSpace.define_finalizer(self, self.class.finalize(@tasks)) end + # @private def self.finalize (tasks) proc { tasks.each(&:kill) } end - # Insert data in the pipe. - def << (data) - return if @tasks.empty? - - @input.enq data - - self - end - # Add a task to the pipe, it must respond to #call and #arity, # and #arity must return 1. def | (func) if func.arity != 1 raise ArgumentError, 'wrong arity' @@ -79,16 +83,33 @@ } self end + # Check if the pipe is empty. + def empty? + @input.empty? && @output.empty? && @tasks.all?(&:empty?) + end + + # Insert data in the pipe. + def enq (data) + return if @tasks.empty? + + @input.enq data + + self + end + + alias push enq + alias << enq + # Get an element from the output queue. - def pop (non_block = false) + def deq (non_block = false) @output.deq(non_block) end - alias deq pop - alias ~ pop + alias pop deq + alias ~ deq end class Thread # Helper to create a pipe. def self.| (func)