lib/alephant/broker/load_strategy/revalidate/refresher.rb in alephant-broker-3.16.2 vs lib/alephant/broker/load_strategy/revalidate/refresher.rb in alephant-broker-3.17.0
- old
+ new
@@ -20,38 +20,40 @@
return if inflight
logger.info(event: 'QueueMessage', message: message, method: "#{self.class}#refresh")
- queue.send_message(message)
+ client.send_message(
+ queue_url: queue_url,
+ message_body: message
+ )
+
cache.set(inflight_cache_key, true, INFLIGHT_CACHE_TTL)
end
private
- def message
- ::JSON.generate(id: component_meta.id,
- batch_id: component_meta.batch_id,
- options: component_meta.options,
- timestamp: Time.now.to_s)
+ def client
+ options = {}
+ options[:endpoint] = ENV['AWS_SQS_ENDPOINT'] if ENV['AWS_SQS_ENDPOINT']
+ options[:queue_owner_aws_account_id] = aws_acc_id if aws_acc_id
+
+ @client ||= Aws::SQS::Client.new(options)
end
- def queue
- @queue ||= proc do
- client = AWS::SQS.new
- url = client.queues.url_for(Broker.config[:sqs_queue_name], queue_options)
+ def queue_url
+ options = {
+ queue_name: Broker.config[:sqs_queue_name]
+ }
- client.queues[url]
- end.call
+ client.get_queue_url(options).queue_url
end
- def queue_options
- opts = {}
- opts[:queue_owner_aws_account_id] = aws_acc_id if aws_acc_id
-
- logger.info(event: 'SQSQueueOptionsConfigured', options: opts, method: "#{self.class}#queue_options")
-
- opts
+ def message
+ ::JSON.generate(id: component_meta.id,
+ batch_id: component_meta.batch_id,
+ options: component_meta.options,
+ timestamp: Time.now.to_s)
end
def aws_acc_id
Broker.config[:aws_account_id]
end