lib/thread/pool.rb in thread-0.1.7 vs lib/thread/pool.rb in thread-0.2.0
- old
+ new
@@ -20,42 +20,53 @@
# to it.
class Task
Timeout = Class.new(Exception)
Asked = Class.new(Exception)
- attr_reader :pool, :timeout, :exception, :thread, :started_at
+ attr_reader :pool, :timeout, :exception, :thread, :started_at, :result
# Create a task in the given pool which will pass the arguments to the
# block.
- def initialize (pool, *args, &block)
+ def initialize(pool, *args, &block)
@pool = pool
@arguments = args
@block = block
@running = false
@finished = false
@timedout = false
@terminated = false
end
- def running?; @running; end
- def finished?; @finished; end
- def timeout?; @timedout; end
- def terminated?; @terminated; end
+ def running?
+ @running
+ end
- # Execute the task in the given thread.
- def execute (thread)
+ def finished?
+ @finished
+ end
+
+ def timeout?
+ @timedout
+ end
+
+ def terminated?
+ @terminated
+ end
+
+ # Execute the task.
+ def execute
return if terminated? || running? || finished?
- @thread = thread
+ @thread = Thread.current
@running = true
@started_at = Time.now
pool.__send__ :wake_up_timeout
begin
- @block.call(*@arguments)
+ @result = @block.call(*@arguments)
rescue Exception => reason
if reason.is_a? Timeout
@timedout = true
elsif reason.is_a? Asked
return
@@ -69,16 +80,16 @@
@finished = true
@thread = nil
end
# Raise an exception in the thread used by the task.
- def raise (exception)
+ def raise(exception)
@thread.raise(exception)
end
# Terminate the exception with an optionally given exception.
- def terminate! (exception = Asked)
+ def terminate!(exception = Asked)
return if terminated? || finished? || timeout?
@terminated = true
return unless running?
@@ -90,14 +101,14 @@
def timeout!
terminate! Timeout
end
# Timeout the task after the given time.
- def timeout_after (time)
+ def timeout_after(time)
@timeout = time
- pool.timeout_for self, time
+ pool.__send__ :timeout_for, self, time
self
end
end
@@ -108,11 +119,11 @@
# The pool will start with the minimum amount of threads created and will
# spawn new threads until the max is reached in case of need.
#
# A default block can be passed, which will be used to {#process} the passed
# data.
- def initialize (min, max = nil, &block)
+ def initialize(min, max = nil, &block)
@min = min
@max = max || min
@block = block
@cond = ConditionVariable.new
@@ -138,26 +149,32 @@
}
}
end
# Check if the pool has been shut down.
- def shutdown?; !!@shutdown; end
+ def shutdown?
+ !!@shutdown
+ end
# Check if auto trimming is enabled.
def auto_trim?
@auto_trim
end
# Enable auto trimming, unneeded threads will be deleted until the minimum
# is reached.
def auto_trim!
@auto_trim = true
+
+ self
end
# Disable auto trimming.
def no_auto_trim!
@auto_trim = false
+
+ self
end
# Check if idle trimming is enabled.
def idle_trim?
!@idle_trim.nil?
@@ -165,19 +182,23 @@
# Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity.
# The minimum number of threads is respeced.
def idle_trim!(timeout)
@idle_trim = timeout
+
+ self
end
# Turn of idle trimming.
def no_idle_trim!
@idle_trim = nil
+
+ self
end
# Resize the pool with the passed arguments.
- def resize (min, max = nil)
+ def resize(min, max = nil)
@min = min
@max = max || min
trim!
end
@@ -198,13 +219,13 @@
# Wait until all tasks are consumed. The caller will be blocked until then.
def wait(what = :idle)
case what
when :done
- loop do
+ until done?
@done_mutex.synchronize {
- return self if _done?
+ break if _done?
@done.wait @done_mutex
}
end
@@ -231,23 +252,15 @@
# Add a task to the pool which will execute the block with the given
# argument.
#
# If no block is passed the default block will be used if present, an
# ArgumentError will be raised otherwise.
- def process (*args, &block)
+ def process(*args, &block)
unless block || @block
raise ArgumentError, 'you must pass a block'
end
- wait.process!(*args, &block)
- end
-
- def process! (*args, &block)
- unless block || @block
- raise ArgumentError, 'you must pass a block'
- end
-
task = Task.new(self, *args, &(block || @block))
@mutex.synchronize {
raise 'unable to add work while shutting down' if shutdown?
@@ -261,15 +274,15 @@
}
task
end
- alias << process!
+ alias << process
# Trim the unused threads, if forced threads will be trimmed even if there
# are tasks waiting.
- def trim (force = false)
+ def trim(force = false)
@mutex.synchronize {
if (force || @waiting > 0) && @spawned - @trim_requests > @min
@trim_requests += 1
@cond.signal
end
@@ -300,66 +313,54 @@
@mutex.synchronize {
@shutdown = :nicely
@cond.broadcast
}
- join
+ until @workers.empty?
+ if worker = @workers.first
+ worker.join
+ end
+ end
if @timeout
@shutdown = :now
wake_up_timeout
@timeout.join
end
-
- self
end
- # Join on all threads in the pool.
- def join
- until @workers.empty?
- if worker = @workers.first
- worker.join
- end
- end
-
- self
- end
-
- # Define a timeout for a task.
- def timeout_for (task, timeout)
- unless @timeout
- spawn_timeout_thread
- end
-
- @mutex.synchronize {
- @timeouts[task] = timeout
-
- wake_up_timeout
- }
- end
-
# Shutdown the pool after a given amount of time.
- def shutdown_after (timeout)
+ def shutdown_after(timeout)
Thread.new {
sleep timeout
shutdown
}
-
- self
end
class << self
# If true, tasks will allow raised exceptions to pass through.
#
# Similar to Thread.abort_on_exception
attr_accessor :abort_on_exception
end
private
+ def timeout_for(task, timeout)
+ unless @timeout
+ spawn_timeout_thread
+ end
+
+ @mutex.synchronize {
+ @timeouts[task] = timeout
+
+ wake_up_timeout
+ }
+ end
+
def wake_up_timeout
if defined? @pipes
@pipes.last.write_nonblock 'x' rescue nil
end
end
@@ -399,11 +400,11 @@
end
@todo.shift
} or break
- task.execute(thread)
+ task.execute
break if @shutdown == :now
trim if auto_trim? && @spawned > @min
end
@@ -469,9 +470,9 @@
end
end
class Thread
# Helper to create a pool.
- def self.pool (*args, &block)
+ def self.pool(*args, &block)
Thread::Pool.new(*args, &block)
end
end