module Legion module Transport class Message include Legion::Transport::Common def initialize(**options) @options = options validate end def publish(options = @options) raise unless @valid exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange exchange_dest.publish(encode_message, routing_key: routing_key || '', content_type: options[:content_type] || content_type, content_encoding: options[:content_encoding] || content_encoding, type: options[:type] || type, priority: options[:priority] || priority, expiration: options[:expiration] || expiration, headers: headers) end def message @options end def encode_message message_payload = message message_payload = Legion::JSON.dump(message_payload) if message_payload.is_a? Hash @options[:content_encoding] = 'identity' return message_payload unless encrypt? && Legion::Settings[:crypt][:cs_encrypt_ready] @options[:content_encoding] = 'encrypted/cs' encrypt_message(message_payload) end def encrypt_message(message, _type = 'cs') Legion::Crypt.encrypt(message) end def encrypt? Legion::Settings[:transport][:messages][:encrypt] end def exchange_name lex = self.class.ancestors.first.to_s.split('::')[2].downcase "Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}" end def exchange Kernel.const_get(exchange_name) end def headers @options[:headers] ||= {} %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function chain_id debug].each do |header| # rubocop:disable Layout/LineLength next unless @options.key? header @options[:headers][header] = @options[header].to_s end @options[:headers] rescue StandardError => e Legion::Logging.error e.message Legion::Logging.error e.backtrace end def priority 0 end def expiration nil end def content_type 'application/json' end def content_encoding 'identity' end def type 'task' end def timestamp now.to_i end def validate @valid = true end end end end Dir[__dir__ + '/messages/*.rb'].sort.each { |file| require file }