lib/thread/pool.rb in thread-0.1.7 vs lib/thread/pool.rb in thread-0.2.0

- old
+ new

@@ -20,42 +20,53 @@ # to it. class Task Timeout = Class.new(Exception) Asked = Class.new(Exception) - attr_reader :pool, :timeout, :exception, :thread, :started_at + attr_reader :pool, :timeout, :exception, :thread, :started_at, :result # Create a task in the given pool which will pass the arguments to the # block. - def initialize (pool, *args, &block) + def initialize(pool, *args, &block) @pool = pool @arguments = args @block = block @running = false @finished = false @timedout = false @terminated = false end - def running?; @running; end - def finished?; @finished; end - def timeout?; @timedout; end - def terminated?; @terminated; end + def running? + @running + end - # Execute the task in the given thread. - def execute (thread) + def finished? + @finished + end + + def timeout? + @timedout + end + + def terminated? + @terminated + end + + # Execute the task. + def execute return if terminated? || running? || finished? - @thread = thread + @thread = Thread.current @running = true @started_at = Time.now pool.__send__ :wake_up_timeout begin - @block.call(*@arguments) + @result = @block.call(*@arguments) rescue Exception => reason if reason.is_a? Timeout @timedout = true elsif reason.is_a? Asked return @@ -69,16 +80,16 @@ @finished = true @thread = nil end # Raise an exception in the thread used by the task. - def raise (exception) + def raise(exception) @thread.raise(exception) end # Terminate the exception with an optionally given exception. - def terminate! (exception = Asked) + def terminate!(exception = Asked) return if terminated? || finished? || timeout? @terminated = true return unless running? @@ -90,14 +101,14 @@ def timeout! terminate! Timeout end # Timeout the task after the given time. - def timeout_after (time) + def timeout_after(time) @timeout = time - pool.timeout_for self, time + pool.__send__ :timeout_for, self, time self end end @@ -108,11 +119,11 @@ # The pool will start with the minimum amount of threads created and will # spawn new threads until the max is reached in case of need. # # A default block can be passed, which will be used to {#process} the passed # data. - def initialize (min, max = nil, &block) + def initialize(min, max = nil, &block) @min = min @max = max || min @block = block @cond = ConditionVariable.new @@ -138,26 +149,32 @@ } } end # Check if the pool has been shut down. - def shutdown?; !!@shutdown; end + def shutdown? + !!@shutdown + end # Check if auto trimming is enabled. def auto_trim? @auto_trim end # Enable auto trimming, unneeded threads will be deleted until the minimum # is reached. def auto_trim! @auto_trim = true + + self end # Disable auto trimming. def no_auto_trim! @auto_trim = false + + self end # Check if idle trimming is enabled. def idle_trim? !@idle_trim.nil? @@ -165,19 +182,23 @@ # Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. # The minimum number of threads is respeced. def idle_trim!(timeout) @idle_trim = timeout + + self end # Turn of idle trimming. def no_idle_trim! @idle_trim = nil + + self end # Resize the pool with the passed arguments. - def resize (min, max = nil) + def resize(min, max = nil) @min = min @max = max || min trim! end @@ -198,13 +219,13 @@ # Wait until all tasks are consumed. The caller will be blocked until then. def wait(what = :idle) case what when :done - loop do + until done? @done_mutex.synchronize { - return self if _done? + break if _done? @done.wait @done_mutex } end @@ -231,23 +252,15 @@ # Add a task to the pool which will execute the block with the given # argument. # # If no block is passed the default block will be used if present, an # ArgumentError will be raised otherwise. - def process (*args, &block) + def process(*args, &block) unless block || @block raise ArgumentError, 'you must pass a block' end - wait.process!(*args, &block) - end - - def process! (*args, &block) - unless block || @block - raise ArgumentError, 'you must pass a block' - end - task = Task.new(self, *args, &(block || @block)) @mutex.synchronize { raise 'unable to add work while shutting down' if shutdown? @@ -261,15 +274,15 @@ } task end - alias << process! + alias << process # Trim the unused threads, if forced threads will be trimmed even if there # are tasks waiting. - def trim (force = false) + def trim(force = false) @mutex.synchronize { if (force || @waiting > 0) && @spawned - @trim_requests > @min @trim_requests += 1 @cond.signal end @@ -300,66 +313,54 @@ @mutex.synchronize { @shutdown = :nicely @cond.broadcast } - join + until @workers.empty? + if worker = @workers.first + worker.join + end + end if @timeout @shutdown = :now wake_up_timeout @timeout.join end - - self end - # Join on all threads in the pool. - def join - until @workers.empty? - if worker = @workers.first - worker.join - end - end - - self - end - - # Define a timeout for a task. - def timeout_for (task, timeout) - unless @timeout - spawn_timeout_thread - end - - @mutex.synchronize { - @timeouts[task] = timeout - - wake_up_timeout - } - end - # Shutdown the pool after a given amount of time. - def shutdown_after (timeout) + def shutdown_after(timeout) Thread.new { sleep timeout shutdown } - - self end class << self # If true, tasks will allow raised exceptions to pass through. # # Similar to Thread.abort_on_exception attr_accessor :abort_on_exception end private + def timeout_for(task, timeout) + unless @timeout + spawn_timeout_thread + end + + @mutex.synchronize { + @timeouts[task] = timeout + + wake_up_timeout + } + end + def wake_up_timeout if defined? @pipes @pipes.last.write_nonblock 'x' rescue nil end end @@ -399,11 +400,11 @@ end @todo.shift } or break - task.execute(thread) + task.execute break if @shutdown == :now trim if auto_trim? && @spawned > @min end @@ -469,9 +470,9 @@ end end class Thread # Helper to create a pool. - def self.pool (*args, &block) + def self.pool(*args, &block) Thread::Pool.new(*args, &block) end end