lib/thread/pipe.rb in thread-0.1.7 vs lib/thread/pipe.rb in thread-0.2.0

- old
+ new

@@ -18,11 +18,11 @@ class Task attr_accessor :input, :output # Create a Task which will call the passed function and get input # from the optional parameter and put output in the optional parameter. - def initialize (func, input = Queue.new, output = Queue.new) + def initialize(func, input = Queue.new, output = Queue.new) @input = input @output = output @handling = false @thread = Thread.new { @@ -52,29 +52,29 @@ # Create a pipe using the optionally passed objects as input and # output queue. # # The objects must respond to #enq and #deq, and block on #deq. - def initialize (input = Queue.new, output = Queue.new) + def initialize(input = Queue.new, output = Queue.new) @tasks = [] @input = input @output = output ObjectSpace.define_finalizer self, self.class.finalizer(@tasks) end # @private - def self.finalizer (tasks) + def self.finalizer(tasks) proc { tasks.each(&:kill) } end # Add a task to the pipe, it must respond to #call and #arity, # and #arity must return 1. - def | (func) + def |(func) if func.arity != 1 raise ArgumentError, 'wrong arity' end Task.new(func, (@tasks.empty? ? @input : Queue.new), @output).tap {|t| @@ -89,11 +89,11 @@ def empty? @input.empty? && @output.empty? && @tasks.all?(&:empty?) end # Insert data in the pipe. - def enq (data) + def enq(data) return if @tasks.empty? @input.enq data self @@ -101,19 +101,19 @@ alias push enq alias << enq # Get an element from the output queue. - def deq (non_block = false) + def deq(non_block = false) @output.deq(non_block) end alias pop deq alias ~ deq end class Thread # Helper to create a pipe. - def self.| (func) + def self.|(func) Pipe.new | func end end