lib/legion/transport/message.rb in legion-transport-0.1.0 vs lib/legion/transport/message.rb in legion-transport-1.0.0

- old
+ new

@@ -1,8 +1,100 @@ module Legion module Transport class Message - require 'legion/transport/messages/base' - include Legion::Transport::Messages::Base + include Legion::Transport::Common + + def initialize(**options) + @options = options + validate + end + + def publish(options = @options) + raise unless @valid + + exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange + exchange_dest.publish(encode_message, + routing_key: routing_key || '', + content_type: options[:content_type] || content_type, + content_encoding: options[:content_encoding] || content_encoding, + type: options[:type] || type, + priority: options[:priority] || priority, + expiration: options[:expiration] || expiration, + headers: headers) + end + + def message + @options + end + + def encode_message + message_payload = message + message_payload = Legion::JSON.dump(message_payload) if message_payload.is_a? Hash + @options[:content_encoding] = 'identity' + return message_payload unless encrypt? && Legion::Settings[:crypt][:cs_encrypt_ready] + + @options[:content_encoding] = 'encrypted/cs' + encrypt_message(message_payload) + end + + def encrypt_message(message, _type = 'cs') + Legion::Crypt.encrypt(message) + end + + def encrypt? + Legion::Settings[:transport][:messages][:encrypt] + end + + def exchange_name + lex = self.class.ancestors.first.to_s.split('::')[2].downcase + "Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}" + end + + def exchange + Kernel.const_get(exchange_name) + end + + def headers + @options[:headers] ||= {} + %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function chain_id debug].each do |header| # rubocop:disable Layout/LineLength + next unless @options.key? header + + @options[:headers][header] = @options[header].to_s + end + @options[:headers] + rescue StandardError => e + Legion::Logging.error e.message + Legion::Logging.error e.backtrace + end + + def priority + 0 + end + + def expiration + nil + end + + def content_type + 'application/json' + end + + def content_encoding + 'identity' + end + + def type + 'task' + end + + def timestamp + now.to_i + end + + def validate + @valid = true + end end end end + +Dir[__dir__ + '/messages/*.rb'].sort.each { |file| require file }