lib/routemaster/responses/future_response.rb in routemaster-drain-2.3.0 vs lib/routemaster/responses/future_response.rb in routemaster-drain-2.4.0
- old
+ new
@@ -1,32 +1,18 @@
-require 'thread/pool'
-require 'thread/future'
+require 'concurrent/future'
+require 'concurrent/executor/cached_thread_pool'
require 'singleton'
require 'delegate'
module Routemaster
module Responses
- # A pool of threads, used for parallel/future request processing.
- class Pool < SimpleDelegator
- include Singleton
-
- def initialize
- Thread.pool(5, 20).tap do |p|
- # TODO: configurable pool size and trim timeout?
- p.auto_trim!
- p.idle_trim! 10 # 10 seconds
- super p
- end
- end
- end
-
class FutureResponse
extend Forwardable
# The `block` is expected to return a {Response}
def initialize(&block)
- @future = Pool.instance.future(&block)
+ @future = Concurrent::Future.execute(executor: Pool.current, &block)
end
# @!attribute status
# @return [Integer]
# Delegated to the `block`'s return value.
@@ -39,8 +25,42 @@
# @return [Hashie::Mash]
# Delegated to the `block`'s return value.
delegate :value => :@future
delegate %i(status headers body) => :value
+ delegate :respond_to_missing? => :value
+
+ def method_missing(m, *args, &block)
+ value.public_send(m, *args, &block)
+ end
+
+ def value
+ @future.value.tap do
+ raise @future.reason if @future.rejected?
+ end
+ end
+
+ module Pool
+ LOCK = Mutex.new
+
+ def self.current
+ LOCK.synchronize do
+ @pool ||= _build_pool
+ end
+ end
+
+ def self.reset
+ LOCK.synchronize do
+ return unless @pool
+ @pool.tap(&:shutdown).wait_for_termination
+ @pool = nil
+ end
+ self
+ end
+
+ def self._build_pool
+ Concurrent::CachedThreadPool.new(min_length: 5, max_length: 20, max_queue: 0, fallback_policy: :caller_runs)
+ end
+ end
end
end
end