lib/routemaster/responses/future_response.rb in routemaster-drain-2.3.0 vs lib/routemaster/responses/future_response.rb in routemaster-drain-2.4.0

- old
+ new

@@ -1,32 +1,18 @@ -require 'thread/pool' -require 'thread/future' +require 'concurrent/future' +require 'concurrent/executor/cached_thread_pool' require 'singleton' require 'delegate' module Routemaster module Responses - # A pool of threads, used for parallel/future request processing. - class Pool < SimpleDelegator - include Singleton - - def initialize - Thread.pool(5, 20).tap do |p| - # TODO: configurable pool size and trim timeout? - p.auto_trim! - p.idle_trim! 10 # 10 seconds - super p - end - end - end - class FutureResponse extend Forwardable # The `block` is expected to return a {Response} def initialize(&block) - @future = Pool.instance.future(&block) + @future = Concurrent::Future.execute(executor: Pool.current, &block) end # @!attribute status # @return [Integer] # Delegated to the `block`'s return value. @@ -39,8 +25,42 @@ # @return [Hashie::Mash] # Delegated to the `block`'s return value. delegate :value => :@future delegate %i(status headers body) => :value + delegate :respond_to_missing? => :value + + def method_missing(m, *args, &block) + value.public_send(m, *args, &block) + end + + def value + @future.value.tap do + raise @future.reason if @future.rejected? + end + end + + module Pool + LOCK = Mutex.new + + def self.current + LOCK.synchronize do + @pool ||= _build_pool + end + end + + def self.reset + LOCK.synchronize do + return unless @pool + @pool.tap(&:shutdown).wait_for_termination + @pool = nil + end + self + end + + def self._build_pool + Concurrent::CachedThreadPool.new(min_length: 5, max_length: 20, max_queue: 0, fallback_policy: :caller_runs) + end + end end end end