lib/thread/pool.rb in thread-0.0.2 vs lib/thread/pool.rb in thread-0.0.3
- old
+ new
@@ -8,21 +8,31 @@
# 0. You just DO WHAT THE FUCK YOU WANT TO.
#++
require 'thread'
+# A pool is a container of a limited amount of threads to which you can add
+# tasks to run.
+#
+# This is usually more performant and less memory intensive than creating a
+# new thread for every task.
class Thread::Pool
+ # A task incapsulates a block being ran by the pool and the arguments to pass
+ # to it.
class Task
Timeout = Class.new(Exception)
Asked = Class.new(Exception)
attr_reader :pool, :timeout, :exception, :thread, :started_at
+ # Create a task in the given pool which will pass the arguments to the
+ # block.
def initialize (pool, *args, &block)
- @pool = pool
- @arguments = args
- @block = block
+ @pool = pool
+ @arguments = args
+ @block = block
+
@running = false
@finished = false
@timedout = false
@terminated = false
end
@@ -30,18 +40,19 @@
def running?; @running; end
def finished?; @finished; end
def timeout?; @timedout; end
def terminated?; @terminated; end
+ # Execute the task in the given thread.
def execute (thread)
return if terminated? || running? || finished?
@thread = thread
@running = true
@started_at = Time.now
- pool.wake_up_timeout
+ pool.__send__.wake_up_timeout
begin
@block.call(*@arguments)
rescue Exception => reason
if reason.is_a? Timeout
@@ -56,24 +67,27 @@
@running = false
@finished = true
@thread = nil
end
+ # Terminate the exception with an optionally given exception.
def terminate! (exception = Asked)
return if terminated? || finished? || timeout?
@terminated = true
return unless running?
@thread.raise exception
end
+ # Force the task to timeout.
def timeout!
terminate! Timeout
end
+ # Timeout the task after the given time.
def timeout_after (time)
@timeout = time
pool.timeout_for self, time
@@ -81,10 +95,17 @@
end
end
attr_reader :min, :max, :spawned
+ # Create the pool with minimum and maximum threads.
+ #
+ # The pool will start with the minimum amount of threads created and will
+ # spawn new threads until the max is reached in case of need.
+ #
+ # A default block can be passed, which will be used to {#process} the passed
+ # data.
def initialize (min, max = nil, &block)
@min = min
@max = max || min
@block = block
@@ -106,29 +127,49 @@
spawn_thread
}
}
end
+ # Check if the pool has been shut down.
def shutdown?; !!@shutdown; end
- def auto_trim?; @auto_trim; end
- def auto_trim!; @auto_trim = true; end
- def no_auto_trim!; @auto_trim = false; end
+ # Check if auto trimming is enabled.
+ def auto_trim?
+ @auto_trim
+ end
+ # Enable auto trimming, unneeded threads will be deleted until the minimum
+ # is reached.
+ def auto_trim!
+ @auto_trim = true
+ end
+
+ # Disable auto trimming.
+ def no_auto_trim!
+ @auto_trim = false
+ end
+
+ # Resize the pool with the passed arguments.
def resize (min, max = nil)
@min = min
@max = max || min
trim!
end
+ # Get the amount of tasks that still have to be run.
def backlog
@mutex.synchronize {
@todo.length
}
end
+ # Add a task to the pool which will execute the block with the given
+ # argument.
+ #
+ # If no block is passed the default block will be used if present, an
+ # ArgumentError will be raised otherwise.
def process (*args, &block)
unless block || @block
raise ArgumentError, 'you must pass a block'
end
@@ -149,10 +190,12 @@
task
end
alias << process
+ # Trim the unused threads, if forced threads will be trimmed even if there
+ # are tasks waiting.
def trim (force = false)
@mutex.synchronize {
if (force || @waiting > 0) && @spawned - @trim_requests > @min
@trim_requests -= 1
@cond.signal
@@ -160,14 +203,16 @@
}
self
end
+ # Force #{trim}.
def trim!
trim true
end
+ # Shut down the pool instantly without finishing to execute tasks.
def shutdown!
@mutex.synchronize {
@shutdown = :now
@cond.broadcast
}
@@ -175,10 +220,11 @@
wake_up_timeout
self
end
+ # Shut down the pool, it will block until all tasks have finished running.
def shutdown
@mutex.synchronize {
@shutdown = :nicely
@cond.broadcast
}
@@ -194,16 +240,18 @@
end
self
end
+ # Join on all threads in the pool.
def join
@workers.first.join until @workers.empty?
self
end
+ # Define a timeout for a task.
def timeout_for (task, timeout)
unless @timeout
spawn_timeout_thread
end
@@ -212,27 +260,28 @@
wake_up_timeout
}
end
+ # Shutdown the pool after a given amount of time.
def shutdown_after (timeout)
Thread.new {
sleep timeout
shutdown
}
self
end
+private
def wake_up_timeout
if defined? @pipes
@pipes.last.write_nonblock 'x' rescue nil
end
end
-private
def spawn_thread
@spawned += 1
thread = Thread.new {
loop do
@@ -279,10 +328,10 @@
def spawn_timeout_thread
@pipes = IO.pipe
@timeout = Thread.new {
loop do
now = Time.now
- timeout = @timeouts.map {|task, timeout|
+ timeout = @timeouts.map {|task, time|
next unless task.started_at
now - task.started_at + task.timeout
}.compact.min unless @timeouts.empty?