#--
#            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
#                    Version 2, December 2004
#
#            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
#   TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
#
#  0. You just DO WHAT THE FUCK YOU WANT TO.
#++

require 'thread'

# A pipe lets you execute various tasks on a set of data in parallel,
# each datum inserted in the pipe is passed along through queues to the various
# functions composing the pipe, the final result is inserted in the final queue.
class Thread::Pipe
	class Task
		attr_accessor :input, :output

		def initialize (func, input = Queue.new, output = Queue.new)
			@input  = input
			@output = output

			@thread = Thread.new {
				while true
					begin
						value = @input.deq
						value = func.call(value)

						@output.enq value
					rescue Exception; end
				end
			}
		end

		def kill
			@thread.raise
		end
	end

	# Create a pipe using the optionally passed objects as input and
	# output queue.
	#
	# The objects must respond to #enq and #deq, and block on #deq.
	def initialize (input = Queue.new, output = Queue.new)
		@tasks = []

		@input  = input
		@output = output

		ObjectSpace.define_finalizer(self, self.class.finalize(@tasks))
	end

	def self.finalize (tasks)
		proc {
			tasks.each(&:kill)
		}
	end

	# Insert data in the pipe.
	def << (data)
		return if @tasks.empty?

		@input.enq data

		self
	end

	# Add a task to the pipe, it must respond to #call and #arity,
	# and #arity must return 1.
	def | (func)
		if func.arity != 1
			raise ArgumentError, 'wrong arity'
		end

		Task.new(func, (@tasks.empty? ? @input : Queue.new), @output).tap {|t|
			@tasks.last.output = t.input unless @tasks.empty?
			@tasks << t
		}

		self
	end

	# Get an element from the output queue.
	def pop (non_block = false)
		@output.deq(non_block)
	end
	
	alias deq pop
	alias ~   pop
end

class Thread
	# Helper to create a pipe.
	def self.| (func)
		Pipe.new | func
	end
end