lib/gb_dispatch/runner.rb in gb_dispatch-0.0.6 vs lib/gb_dispatch/runner.rb in gb_dispatch-0.1.0

- old
+ new

@@ -27,30 +27,69 @@ # For more information about error handling, check Celluloid documentation. # @param block [Proc] # @param options [Hash] # @option options [String] :name queue name used for debugging and better logging. def execute(block, options=Hash.new) - future = Concurrent::Future.new(:executor => self.pool) do - begin - name = options[:name] - Thread.current[:name] ||= name if name - result = block.call - result - rescue Exception => e - if defined?(Opbeat) - Opbeat.set_context extra: {queue: name} if name - Opbeat.capture_exception(e) - end - GBDispatch.logger.error "Failed execution of queue #{name} with error #{e.message}" - raise e + if defined?(Rails) && defined?(Rails::VERSION::MAJOR) + if Rails::VERSION::MAJOR < 5 + _execute(block, options) + else + _execute_rails(block, options) end + else + _execute(block, options) end + end + + private + + def _execute(block, options) + future = Concurrent::Future.new(:executor => self.pool) do + _run_block(block, options) + end future.execute future.value if future.rejected? raise future.reason end future.value + end + + def _execute_rails(block, options) + if defined?(Rails) && defined?(ActiveSupport::Dependencies) + future = Concurrent::Future.new(:executor => self.pool) do + Rails.application.executor.wrap do + ActiveSupport::Dependencies.interlock.permit_concurrent_loads do + _run_block(block, options) + end + end + end + future.execute + ActiveSupport::Dependencies.interlock.permit_concurrent_loads { future.value } + if future.rejected? + raise future.reason + end + future.value + else + raise 'Failed loading rails!' + end + end + + def _run_block(block, options) + begin + name = options[:name] + Thread.current[:name] ||= name if name + result = block.call + result + rescue Exception => e + if defined?(Raven) + Raven.tags_context :gb_dispacth => true + Raven.extra_context :dispatch_queue => name + Raven.capture_exception(e) + end + GBDispatch.logger.error "Failed execution of queue #{name} with error #{e.message}" + raise e + end end end end end