require 'sneakers' require 'sneakers/metrics/logging_metrics' require 'sneakers/handlers/maxretry' require 'json' require 'oj' module Harmony module Service class RpcService include Sneakers::Worker from_queue ENV['harmony_queue'], timeout_job_after: 10, threads: 1 def work_with_params(message, _delivery_info, metadata) logger.debug "Request: #{message}" request = Oj.load(message) request_class = request.class response_classes = request_response_mapping[request_class] raise "Unacceptable request class: #{request_class}" if response_classes.nil? result = new_handler.work_with_request(request) raise "Unacceptable response class: #{result.class}" unless response_classes.include?(result.class) send_response(result, metadata) ack! rescue StandardError => error logger.error error.message logger.error error.backtrace.join("\n") error_response = ErrorResponse.new message: error.message, exception: error send_response(error_response, metadata) reject! end def stop super # reply_to_exchange.close # not working reply_to_connection.close end def reply_to_connection @reply_to_connection ||= create_reply_to_connection end def create_reply_to_connection opts = Sneakers::CONFIG conn = Bunny.new(opts[:amqp], vhost: opts[:vhost], heartbeat: opts[:heartbeat], logger: Sneakers.logger) conn.start conn end def reply_to_exchange @reply_to_queue ||= create_reply_to_exchange end def create_reply_to_exchange ch = reply_to_connection.create_channel ch.exchange(AMQ::Protocol::EMPTY_STRING, auto_delete: true) end private def send_response(result, metadata) json = Oj.dump(result) logger.debug "Response: #{json}" send_response_json(json, metadata.reply_to, metadata.correlation_id) end def send_response_json(json, routing_key, correlation_id) reply_to_exchange.publish(json, routing_key: routing_key, correlation_id: correlation_id) end def request_response_mapping { Calculator::Request => [Calculator::Response], ActionList::ListRequest => [Array], ActionList::ItemRequest => [Hash], ActionList::ActionRequest => [Response], Chart::Request => [Chart::Response], Form::GetRequest => [Form::GetResponse], Flow::EndedRequest => [Response], Notification::Request => [Notification::AppResponse, Response], AttributeNamesRequest => [AttributeNamesResponse], OauthTokenRequest => [OauthTokenResponse] } end def new_handler handler_class = Sneakers::CONFIG[:handler_class] raise 'No handler specified' if handler_class.nil? handler = Object.const_get(handler_class).new raise "Unable to create handler: #{handler_class}" if handler.nil? handler end end end end