lib/thread/pool.rb in thread-0.0.2 vs lib/thread/pool.rb in thread-0.0.3

- old
+ new

@@ -8,21 +8,31 @@ # 0. You just DO WHAT THE FUCK YOU WANT TO. #++ require 'thread' +# A pool is a container of a limited amount of threads to which you can add +# tasks to run. +# +# This is usually more performant and less memory intensive than creating a +# new thread for every task. class Thread::Pool + # A task incapsulates a block being ran by the pool and the arguments to pass + # to it. class Task Timeout = Class.new(Exception) Asked = Class.new(Exception) attr_reader :pool, :timeout, :exception, :thread, :started_at + # Create a task in the given pool which will pass the arguments to the + # block. def initialize (pool, *args, &block) - @pool = pool - @arguments = args - @block = block + @pool = pool + @arguments = args + @block = block + @running = false @finished = false @timedout = false @terminated = false end @@ -30,18 +40,19 @@ def running?; @running; end def finished?; @finished; end def timeout?; @timedout; end def terminated?; @terminated; end + # Execute the task in the given thread. def execute (thread) return if terminated? || running? || finished? @thread = thread @running = true @started_at = Time.now - pool.wake_up_timeout + pool.__send__.wake_up_timeout begin @block.call(*@arguments) rescue Exception => reason if reason.is_a? Timeout @@ -56,24 +67,27 @@ @running = false @finished = true @thread = nil end + # Terminate the exception with an optionally given exception. def terminate! (exception = Asked) return if terminated? || finished? || timeout? @terminated = true return unless running? @thread.raise exception end + # Force the task to timeout. def timeout! terminate! Timeout end + # Timeout the task after the given time. def timeout_after (time) @timeout = time pool.timeout_for self, time @@ -81,10 +95,17 @@ end end attr_reader :min, :max, :spawned + # Create the pool with minimum and maximum threads. + # + # The pool will start with the minimum amount of threads created and will + # spawn new threads until the max is reached in case of need. + # + # A default block can be passed, which will be used to {#process} the passed + # data. def initialize (min, max = nil, &block) @min = min @max = max || min @block = block @@ -106,29 +127,49 @@ spawn_thread } } end + # Check if the pool has been shut down. def shutdown?; !!@shutdown; end - def auto_trim?; @auto_trim; end - def auto_trim!; @auto_trim = true; end - def no_auto_trim!; @auto_trim = false; end + # Check if auto trimming is enabled. + def auto_trim? + @auto_trim + end + # Enable auto trimming, unneeded threads will be deleted until the minimum + # is reached. + def auto_trim! + @auto_trim = true + end + + # Disable auto trimming. + def no_auto_trim! + @auto_trim = false + end + + # Resize the pool with the passed arguments. def resize (min, max = nil) @min = min @max = max || min trim! end + # Get the amount of tasks that still have to be run. def backlog @mutex.synchronize { @todo.length } end + # Add a task to the pool which will execute the block with the given + # argument. + # + # If no block is passed the default block will be used if present, an + # ArgumentError will be raised otherwise. def process (*args, &block) unless block || @block raise ArgumentError, 'you must pass a block' end @@ -149,10 +190,12 @@ task end alias << process + # Trim the unused threads, if forced threads will be trimmed even if there + # are tasks waiting. def trim (force = false) @mutex.synchronize { if (force || @waiting > 0) && @spawned - @trim_requests > @min @trim_requests -= 1 @cond.signal @@ -160,14 +203,16 @@ } self end + # Force #{trim}. def trim! trim true end + # Shut down the pool instantly without finishing to execute tasks. def shutdown! @mutex.synchronize { @shutdown = :now @cond.broadcast } @@ -175,10 +220,11 @@ wake_up_timeout self end + # Shut down the pool, it will block until all tasks have finished running. def shutdown @mutex.synchronize { @shutdown = :nicely @cond.broadcast } @@ -194,16 +240,18 @@ end self end + # Join on all threads in the pool. def join @workers.first.join until @workers.empty? self end + # Define a timeout for a task. def timeout_for (task, timeout) unless @timeout spawn_timeout_thread end @@ -212,27 +260,28 @@ wake_up_timeout } end + # Shutdown the pool after a given amount of time. def shutdown_after (timeout) Thread.new { sleep timeout shutdown } self end +private def wake_up_timeout if defined? @pipes @pipes.last.write_nonblock 'x' rescue nil end end -private def spawn_thread @spawned += 1 thread = Thread.new { loop do @@ -279,10 +328,10 @@ def spawn_timeout_thread @pipes = IO.pipe @timeout = Thread.new { loop do now = Time.now - timeout = @timeouts.map {|task, timeout| + timeout = @timeouts.map {|task, time| next unless task.started_at now - task.started_at + task.timeout }.compact.min unless @timeouts.empty?