Sha256: fc03147c88f08a6064c172e77d9bbd446edaecc3c04b20cfd890a10300704494
Contents?: true
Size: 1.79 KB
Versions: 1
Compression:
Stored size: 1.79 KB
Contents
require 'concurrent' module GBDispatch class Runner class << self attr_accessor :pool_size # Thread pool for async execution. # Pool size is set by default to core numbers or at least 2. # You can increase size of pool by setting :pool_size variable before using pool. # @return [Concurrent::ThreadPoolExecutor] def pool unless @pool self.pool_size ||=2 threads = [self.pool_size, 2, Concurrent.processor_count].max @pool = Concurrent::ThreadPoolExecutor.new( min_threads: 2, max_threads: threads, max_queue: 10*threads, fallback_policy: :caller_runs ) end @pool end # Execute given block. # If there is an exception thrown, it log it and crash actor. # For more information about error handling, check Celluloid documentation. # @param block [Proc] # @param options [Hash] # @option options [String] :name queue name used for debugging and better logging. def execute(block, options=Hash.new) future = Concurrent::Future.new(:executor => self.pool) do begin name = options[:name] Thread.current[:name] ||= name if name result = block.call result rescue Exception => e if defined?(Opbeat) Opbeat.set_context extra: {queue: name} if name Opbeat.capture_exception(e) end GBDispatch.logger.error "Failed execution of queue #{name} with error #{e.message}" if GBDispatch.logger raise e end end future.execute future.value if future.rejected? raise future.reason end future.value end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
gb_dispatch-0.0.5 | lib/gb_dispatch/runner.rb |