lib/thread.rb in rubysl-thread-2.0.3 vs lib/thread.rb in rubysl-thread-2.1

- old
+ new

@@ -1,341 +1,682 @@ -require "rubysl/thread" +# +# thread.rb - thread support classes +# $Date: 2006-12-31 07:02:22 -0800 (Sun, 31 Dec 2006) $ +# by Yukihiro Matsumoto <matz@netlab.co.jp> +# +# Copyright (C) 2001 Yukihiro Matsumoto +# Copyright (C) 2000 Network Applied Communication Laboratory, Inc. +# Copyright (C) 2000 Information-technology Promotion Agency, Japan +# + +if $DEBUG + Thread.abort_on_exception = true +end + +# +# ConditionVariable objects augment class Mutex. Using condition variables, +# it is possible to suspend while in the middle of a critical section until a +# resource becomes available. +# +# Example: +# +# require 'thread' +# +# mutex = Mutex.new +# resource = ConditionVariable.new +# +# a = Thread.new { +# mutex.synchronize { +# # Thread 'a' now needs the resource +# resource.wait(mutex) +# # 'a' can now have the resource +# } +# } +# +# b = Thread.new { +# mutex.synchronize { +# # Thread 'b' has finished using the resource +# resource.signal +# } +# } +# +class ConditionVariable + # + # Creates a new ConditionVariable + # + def initialize + @waiters = [] + end + + # + # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. + # + def wait(mutex, timeout=nil) + Rubinius.lock(self) + + begin + wchan = Rubinius::Channel.new + + begin + mutex.unlock + @waiters.push wchan + Rubinius.unlock(self) + signaled = wchan.receive_timeout timeout + ensure + mutex.lock + Rubinius.lock(self) + + unless signaled or @waiters.delete(wchan) + # we timed out, but got signaled afterwards (e.g. while waiting to + # acquire @lock), so pass that signal on to the next waiter + @waiters.shift << true unless @waiters.empty? + end + end + + if timeout + !!signaled + else + self + end + ensure + Rubinius.unlock(self) + end + end + + # + # Wakes up the first thread in line waiting for this lock. + # + def signal + Rubinius.lock(self) + begin + @waiters.shift << true unless @waiters.empty? + ensure + Rubinius.unlock(self) + end + self + end + + # + # Wakes up all threads waiting for this lock. + # + def broadcast + Rubinius.lock(self) + begin + @waiters.shift << true until @waiters.empty? + ensure + Rubinius.unlock(self) + end + self + end +end + +# +# This class provides a way to synchronize communication between threads. +# +# Example: +# +# require 'thread' +# +# queue = Queue.new +# +# producer = Thread.new do +# 5.times do |i| +# sleep rand(i) # simulate expense +# queue << i +# puts "#{i} produced" +# end +# end +# +# consumer = Thread.new do +# 5.times do |i| +# value = queue.pop +# sleep rand(i/2) # simulate expense +# puts "consumed #{value}" +# end +# end +# +# consumer.join +# +class Queue + # + # Creates a new queue. + # + def initialize + @que = [] + @que.taint # enable tainted comunication + self.taint + @waiting = [] + @waiting.taint + @mutex = Mutex.new + @resource = ConditionVariable.new + end + + # + # Pushes +obj+ to the queue. + # + def push(obj) + @mutex.synchronize do + @que.push obj + @resource.signal + end + end + + # + # Alias of push + # + alias << push + + # + # Alias of push + # + alias enq push + + # + # Retrieves data from the queue. If the queue is empty, the calling thread is + # suspended until data is pushed onto the queue. If +non_block+ is true, the + # thread isn't suspended, and an exception is raised. + # + def pop(non_block=false) + while true + @mutex.synchronize do + #FIXME: some code in net or somewhere violates encapsulation + #and demands that a waiting queue exist for Queue, as a result + #we have to do a linear search here to remove the current Thread. + @waiting.delete(Thread.current) + if @que.empty? + raise ThreadError, "queue empty" if non_block + @waiting.push Thread.current + @resource.wait(@mutex) + else + retval = @que.shift + @resource.signal + return retval + end + end + end + end + + # + # Alias of pop + # + alias shift pop + + # + # Alias of pop + # + alias deq pop + + # + # Returns +true+ if the queue is empty. + # + def empty? + @que.empty? + end + + # + # Removes all objects from the queue. + # + def clear + @que.clear + end + + # + # Returns the length of the queue. + # + def length + @que.length + end + + # + # Alias of length. + # + alias size length + + # + # Returns the number of threads waiting on the queue. + # + def num_waiting + @waiting.size + end +end + +# +# This class represents queues of specified size capacity. The push operation +# may be blocked if the capacity is full. +# +# See Queue for an example of how a SizedQueue works. +# +class SizedQueue < Queue + # + # Creates a fixed-length queue with a maximum size of +max+. + # + def initialize(max) + raise ArgumentError, "queue size must be positive" unless max > 0 + @max = max + @queue_wait = [] + @queue_wait.taint # enable tainted comunication + @size_mutex = Mutex.new + @sem = ConditionVariable.new + super() + end + + # + # Returns the maximum size of the queue. + # + def max + @max + end + + # + # Sets the maximum size of the queue. + # + def max=(max) + @size_mutex.synchronize do + @max = max + @sem.broadcast + end + max + end + + # + # Pushes +obj+ to the queue. If there is no space left in the queue, waits + # until space becomes available. + # + def push(obj) + while true + @size_mutex.synchronize do + @queue_wait.delete(Thread.current) + if @que.size >= @max + @queue_wait.push Thread.current + @sem.wait(@size_mutex) + else + return super(obj) + end + end + end + end + + # + # Alias of push + # + alias << push + + # + # Alias of push + # + alias enq push + + # + # Retrieves data from the queue and runs a waiting thread, if any. + # + def pop(*args) + retval = super + + @size_mutex.synchronize do + if @que.size < @max + @sem.broadcast + end + end + + return retval + end + + # + # Alias of pop + # + alias shift pop + + # + # Alias of pop + # + alias deq pop + + # + # Returns the number of threads waiting on the queue. + # + def num_waiting + @waiting.size + @queue_wait.size + end +end + +# Documentation comments: +# - How do you make RDoc inherit documentation from superclass?