lib/thread/pipe.rb in thread-0.1.7 vs lib/thread/pipe.rb in thread-0.2.0
- old
+ new
@@ -18,11 +18,11 @@
class Task
attr_accessor :input, :output
# Create a Task which will call the passed function and get input
# from the optional parameter and put output in the optional parameter.
- def initialize (func, input = Queue.new, output = Queue.new)
+ def initialize(func, input = Queue.new, output = Queue.new)
@input = input
@output = output
@handling = false
@thread = Thread.new {
@@ -52,29 +52,29 @@
# Create a pipe using the optionally passed objects as input and
# output queue.
#
# The objects must respond to #enq and #deq, and block on #deq.
- def initialize (input = Queue.new, output = Queue.new)
+ def initialize(input = Queue.new, output = Queue.new)
@tasks = []
@input = input
@output = output
ObjectSpace.define_finalizer self, self.class.finalizer(@tasks)
end
# @private
- def self.finalizer (tasks)
+ def self.finalizer(tasks)
proc {
tasks.each(&:kill)
}
end
# Add a task to the pipe, it must respond to #call and #arity,
# and #arity must return 1.
- def | (func)
+ def |(func)
if func.arity != 1
raise ArgumentError, 'wrong arity'
end
Task.new(func, (@tasks.empty? ? @input : Queue.new), @output).tap {|t|
@@ -89,11 +89,11 @@
def empty?
@input.empty? && @output.empty? && @tasks.all?(&:empty?)
end
# Insert data in the pipe.
- def enq (data)
+ def enq(data)
return if @tasks.empty?
@input.enq data
self
@@ -101,19 +101,19 @@
alias push enq
alias << enq
# Get an element from the output queue.
- def deq (non_block = false)
+ def deq(non_block = false)
@output.deq(non_block)
end
alias pop deq
alias ~ deq
end
class Thread
# Helper to create a pipe.
- def self.| (func)
+ def self.|(func)
Pipe.new | func
end
end