require 'sneakers/handlers/maxretry' module Appfuel module Service class Worker include Sneakers::Worker include Appfuel::Application::AppContainer include Appfuel::Application::Dispatcher class << self def inherited(klass) container = klass.app_container consumer_key = "#{klass.top_container_key}.consumer_keys" unless container.key?(consumer_key) container.register(consumer_key, []) end key = klass.container_class_path container.register(key, klass) container[consumer_key] << key end def top_container_key "message_brokers" end def container_class_type 'consumers' end end # Sneakers worker hook to handle messages from RabbitMQ # # # @param msg [String] JSON string of inputs # @param delivery_info [Bunny::Delivery::Info] # @param properties [Bunny::MessageProperties] # @return [Appfuel::Response] def work_with_params(msg, delivery_info, properties) begin request = create_request(msg, delivery_info, properties) rescue => e request = create_request('{}', delivery_info, properties) handle_exception("failed to build request", e, request) return ack! end begin response = dispatch(request, app_container) rescue => e handle_exception("failed to dispatch", e, request) return ack! end if response.failure? logger.error "[#{request.action_route}] #{response.errors.format}" end if rpc?(properties) publish_rpc(request, response) end ack! end def rpc?(properties) properties.correlation_id && properties.reply_to end # Publish a response for the rpc request. # # @param request [MsgRequest] # @param respons [Appfuel::Response] # @return [Nil] def publish_rpc(request, response) options = { correlation_id: request.correlation_id, routing_key: request.reply_to, headers: { "action_route" => request.action_route } } publish(response.to_json, options) nil end def logger @logger ||= app_container[:logger] end private def handle_exception(label, e, request) err_msg = "[queue #{queue}] #{label}: #{e.message}" logger.error err_msg if rpc?(request.properties) response = create_error_response(queue, error_msg) publish_rpc(request, response) end end def create_error_response(key, err_msg) Appfuel::ResponseHandler.new.error(key => [error_msg]) end def create_request(msg, delivery_info, properties) Appfuel::Service::MsgRequest.new(msg, delivery_info, properties) end end end end