lib/legion/transport/message.rb in legion-transport-1.1.5 vs lib/legion/transport/message.rb in legion-transport-1.1.6

- old
+ new

@@ -6,11 +6,11 @@ def initialize(**options) @options = options validate end - def publish(options = @options) # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity + def publish(options = @options) # rubocop:disable Metrics/AbcSize raise unless @valid exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange exchange_dest.publish(encode_message, routing_key: routing_key || '', @@ -20,16 +20,24 @@ priority: options[:priority] || priority, expiration: options[:expiration] || expiration, headers: headers) end + def expiration + if @options.key? :expiration + @options[:expiration] + elsif @options.key? :ttl + @options[:ttl] + end + end + def message @options end def routing_key - nil + @options[:routing_key] if @options.key? :routing_key end def encode_message message_payload = message message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String @@ -62,11 +70,11 @@ def exchange Kernel.const_get(exchange_name) end def headers - @options[:headers] ||= {} + @options[:headers] ||= Concurrent::Hash.new %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function chain_id debug].each do |header| # rubocop:disable Layout/LineLength next unless @options.key? header @options[:headers][header] = @options[header].to_s end @@ -76,13 +84,9 @@ Legion::Logging.error e.backtrace end def priority 0 - end - - def expiration - nil end def content_type 'application/json' end