lib/thread/pipe.rb in thread-0.0.6 vs lib/thread/pipe.rb in thread-0.0.6.1
- old
+ new
@@ -12,29 +12,41 @@
# A pipe lets you execute various tasks on a set of data in parallel,
# each datum inserted in the pipe is passed along through queues to the various
# functions composing the pipe, the final result is inserted in the final queue.
class Thread::Pipe
+ # A task incapsulates a part of the pipe.
class Task
attr_accessor :input, :output
+ # Create a Task which will call the passed function and get input
+ # from the optional parameter and put output in the optional parameter.
def initialize (func, input = Queue.new, output = Queue.new)
- @input = input
- @output = output
+ @input = input
+ @output = output
+ @handling = false
@thread = Thread.new {
while true
+ value = @input.deq
+
+ @handling = true
begin
- value = @input.deq
value = func.call(value)
-
@output.enq value
rescue Exception; end
+ @handling = false
end
}
end
+ # Check if the task has nothing to do.
+ def empty?
+ !@handling && @input.empty? && @output.empty?
+ end
+
+ # Stop the task.
def kill
@thread.raise
end
end
@@ -49,25 +61,17 @@
@output = output
ObjectSpace.define_finalizer(self, self.class.finalize(@tasks))
end
+ # @private
def self.finalize (tasks)
proc {
tasks.each(&:kill)
}
end
- # Insert data in the pipe.
- def << (data)
- return if @tasks.empty?
-
- @input.enq data
-
- self
- end
-
# Add a task to the pipe, it must respond to #call and #arity,
# and #arity must return 1.
def | (func)
if func.arity != 1
raise ArgumentError, 'wrong arity'
@@ -79,16 +83,33 @@
}
self
end
+ # Check if the pipe is empty.
+ def empty?
+ @input.empty? && @output.empty? && @tasks.all?(&:empty?)
+ end
+
+ # Insert data in the pipe.
+ def enq (data)
+ return if @tasks.empty?
+
+ @input.enq data
+
+ self
+ end
+
+ alias push enq
+ alias << enq
+
# Get an element from the output queue.
- def pop (non_block = false)
+ def deq (non_block = false)
@output.deq(non_block)
end
- alias deq pop
- alias ~ pop
+ alias pop deq
+ alias ~ deq
end
class Thread
# Helper to create a pipe.
def self.| (func)