lib/alephant/broker/load_strategy/revalidate/refresher.rb in alephant-broker-3.16.2 vs lib/alephant/broker/load_strategy/revalidate/refresher.rb in alephant-broker-3.17.0

- old
+ new

@@ -20,38 +20,40 @@ return if inflight logger.info(event: 'QueueMessage', message: message, method: "#{self.class}#refresh") - queue.send_message(message) + client.send_message( + queue_url: queue_url, + message_body: message + ) + cache.set(inflight_cache_key, true, INFLIGHT_CACHE_TTL) end private - def message - ::JSON.generate(id: component_meta.id, - batch_id: component_meta.batch_id, - options: component_meta.options, - timestamp: Time.now.to_s) + def client + options = {} + options[:endpoint] = ENV['AWS_SQS_ENDPOINT'] if ENV['AWS_SQS_ENDPOINT'] + options[:queue_owner_aws_account_id] = aws_acc_id if aws_acc_id + + @client ||= Aws::SQS::Client.new(options) end - def queue - @queue ||= proc do - client = AWS::SQS.new - url = client.queues.url_for(Broker.config[:sqs_queue_name], queue_options) + def queue_url + options = { + queue_name: Broker.config[:sqs_queue_name] + } - client.queues[url] - end.call + client.get_queue_url(options).queue_url end - def queue_options - opts = {} - opts[:queue_owner_aws_account_id] = aws_acc_id if aws_acc_id - - logger.info(event: 'SQSQueueOptionsConfigured', options: opts, method: "#{self.class}#queue_options") - - opts + def message + ::JSON.generate(id: component_meta.id, + batch_id: component_meta.batch_id, + options: component_meta.options, + timestamp: Time.now.to_s) end def aws_acc_id Broker.config[:aws_account_id] end