Sha256: c9ebbb500cc50bf16636b3ddff3f2b66e92b9330910f1e6a00f960897a9a7723
Contents?: true
Size: 1.77 KB
Versions: 1
Compression:
Stored size: 1.77 KB
Contents
require 'concurrent' module GBDispatch class Runner class << self attr_accessor :pool_size # Thread pool for async execution. # Pool size is set by default to core numbers or at least 2. # You can increase size of pool by setting :pool_size variable before using pool. # @return [Concurrent::ThreadPoolExecutor] def pool unless @pool self.pool_size ||=2 threads = [self.pool_size, 2, Concurrent.processor_count].max @pool = Concurrent::ThreadPoolExecutor.new( min_threads: 2, max_threads: threads, max_queue: 10*threads, fallback_policy: :caller_runs ) end @pool end # Execute given block. # If there is an exception thrown, it log it and crash actor. # For more information about error handling, check Celluloid documentation. # @param block [Proc] # @param options [Hash] # @option options [String] :name queue name used for debugging and better logging. def execute(block, options=Hash.new) future = Concurrent::Future.new(:executor => self.pool) do begin name = options[:name] Thread.current[:name] ||= name if name result = block.call result rescue Exception => e if defined?(Opbeat) Opbeat.set_context extra: {queue: name} if name Opbeat.capture_exception(e) end GBDispatch.logger.error "Failed execution of queue #{name} with error #{e.message}" raise e end end future.execute future.value if future.rejected? raise future.reason end future.value end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
gb_dispatch-0.0.6 | lib/gb_dispatch/runner.rb |