lib/legion/transport/message.rb in legion-transport-0.1.0 vs lib/legion/transport/message.rb in legion-transport-1.0.0
- old
+ new
@@ -1,8 +1,100 @@
module Legion
module Transport
class Message
- require 'legion/transport/messages/base'
- include Legion::Transport::Messages::Base
+ include Legion::Transport::Common
+
+ def initialize(**options)
+ @options = options
+ validate
+ end
+
+ def publish(options = @options)
+ raise unless @valid
+
+ exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange
+ exchange_dest.publish(encode_message,
+ routing_key: routing_key || '',
+ content_type: options[:content_type] || content_type,
+ content_encoding: options[:content_encoding] || content_encoding,
+ type: options[:type] || type,
+ priority: options[:priority] || priority,
+ expiration: options[:expiration] || expiration,
+ headers: headers)
+ end
+
+ def message
+ @options
+ end
+
+ def encode_message
+ message_payload = message
+ message_payload = Legion::JSON.dump(message_payload) if message_payload.is_a? Hash
+ @options[:content_encoding] = 'identity'
+ return message_payload unless encrypt? && Legion::Settings[:crypt][:cs_encrypt_ready]
+
+ @options[:content_encoding] = 'encrypted/cs'
+ encrypt_message(message_payload)
+ end
+
+ def encrypt_message(message, _type = 'cs')
+ Legion::Crypt.encrypt(message)
+ end
+
+ def encrypt?
+ Legion::Settings[:transport][:messages][:encrypt]
+ end
+
+ def exchange_name
+ lex = self.class.ancestors.first.to_s.split('::')[2].downcase
+ "Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}"
+ end
+
+ def exchange
+ Kernel.const_get(exchange_name)
+ end
+
+ def headers
+ @options[:headers] ||= {}
+ %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function chain_id debug].each do |header| # rubocop:disable Layout/LineLength
+ next unless @options.key? header
+
+ @options[:headers][header] = @options[header].to_s
+ end
+ @options[:headers]
+ rescue StandardError => e
+ Legion::Logging.error e.message
+ Legion::Logging.error e.backtrace
+ end
+
+ def priority
+ 0
+ end
+
+ def expiration
+ nil
+ end
+
+ def content_type
+ 'application/json'
+ end
+
+ def content_encoding
+ 'identity'
+ end
+
+ def type
+ 'task'
+ end
+
+ def timestamp
+ now.to_i
+ end
+
+ def validate
+ @valid = true
+ end
end
end
end
+
+Dir[__dir__ + '/messages/*.rb'].sort.each { |file| require file }