lib/legion/transport/message.rb in legion-transport-1.1.5 vs lib/legion/transport/message.rb in legion-transport-1.1.6
- old
+ new
@@ -6,11 +6,11 @@
def initialize(**options)
@options = options
validate
end
- def publish(options = @options) # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
+ def publish(options = @options) # rubocop:disable Metrics/AbcSize
raise unless @valid
exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange
exchange_dest.publish(encode_message,
routing_key: routing_key || '',
@@ -20,16 +20,24 @@
priority: options[:priority] || priority,
expiration: options[:expiration] || expiration,
headers: headers)
end
+ def expiration
+ if @options.key? :expiration
+ @options[:expiration]
+ elsif @options.key? :ttl
+ @options[:ttl]
+ end
+ end
+
def message
@options
end
def routing_key
- nil
+ @options[:routing_key] if @options.key? :routing_key
end
def encode_message
message_payload = message
message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String
@@ -62,11 +70,11 @@
def exchange
Kernel.const_get(exchange_name)
end
def headers
- @options[:headers] ||= {}
+ @options[:headers] ||= Concurrent::Hash.new
%i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function chain_id debug].each do |header| # rubocop:disable Layout/LineLength
next unless @options.key? header
@options[:headers][header] = @options[header].to_s
end
@@ -76,13 +84,9 @@
Legion::Logging.error e.backtrace
end
def priority
0
- end
-
- def expiration
- nil
end
def content_type
'application/json'
end