lib/gb_dispatch/runner.rb in gb_dispatch-0.0.6 vs lib/gb_dispatch/runner.rb in gb_dispatch-0.1.0
- old
+ new
@@ -27,30 +27,69 @@
# For more information about error handling, check Celluloid documentation.
# @param block [Proc]
# @param options [Hash]
# @option options [String] :name queue name used for debugging and better logging.
def execute(block, options=Hash.new)
- future = Concurrent::Future.new(:executor => self.pool) do
- begin
- name = options[:name]
- Thread.current[:name] ||= name if name
- result = block.call
- result
- rescue Exception => e
- if defined?(Opbeat)
- Opbeat.set_context extra: {queue: name} if name
- Opbeat.capture_exception(e)
- end
- GBDispatch.logger.error "Failed execution of queue #{name} with error #{e.message}"
- raise e
+ if defined?(Rails) && defined?(Rails::VERSION::MAJOR)
+ if Rails::VERSION::MAJOR < 5
+ _execute(block, options)
+ else
+ _execute_rails(block, options)
end
+ else
+ _execute(block, options)
end
+ end
+
+ private
+
+ def _execute(block, options)
+ future = Concurrent::Future.new(:executor => self.pool) do
+ _run_block(block, options)
+ end
future.execute
future.value
if future.rejected?
raise future.reason
end
future.value
+ end
+
+ def _execute_rails(block, options)
+ if defined?(Rails) && defined?(ActiveSupport::Dependencies)
+ future = Concurrent::Future.new(:executor => self.pool) do
+ Rails.application.executor.wrap do
+ ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
+ _run_block(block, options)
+ end
+ end
+ end
+ future.execute
+ ActiveSupport::Dependencies.interlock.permit_concurrent_loads { future.value }
+ if future.rejected?
+ raise future.reason
+ end
+ future.value
+ else
+ raise 'Failed loading rails!'
+ end
+ end
+
+ def _run_block(block, options)
+ begin
+ name = options[:name]
+ Thread.current[:name] ||= name if name
+ result = block.call
+ result
+ rescue Exception => e
+ if defined?(Raven)
+ Raven.tags_context :gb_dispacth => true
+ Raven.extra_context :dispatch_queue => name
+ Raven.capture_exception(e)
+ end
+ GBDispatch.logger.error "Failed execution of queue #{name} with error #{e.message}"
+ raise e
+ end
end
end
end
end