require 'uri'
require 'cloud_powers/synapse/queue/board'
module Smash
module CloudPowers
module Synapse
module Queue
include Smash::CloudPowers::AwsResources
include Smash::CloudPowers::Helpers
# A simple Struct that acts as a Name to URL map
#
# Parameters
# * :set_name +String+ (optional) - An optional name. It should be the same name as the
# the QueueResource and/or Queue you're working with, or else this Struct isn't that useful
# * :set_url +String+ (optional) - An optional URL. It should be the same URL as the
# the QueueResource and/or Queue you're working with, or else this Struct isn't that useful
#
# Attributes
# * name +String+ - the +:set_name+ or parse the +#address()+ for the name
# * url +String+ - the +:set_url+ or add the name to the end of a best guess at the URL
#
# Example
# name_url_map = NUMap.new(nil, 'https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
# name_url_map.name
# # => 'fooBar'
#
# # and now in reverse
# url_name_map = NUMap.new('snargleBargle')
# url_name_map.address
# # => 'https://sqs.us-west-53.amazonaws.com/001101010010/snargleBargle'
NUMap = Struct.new(:set_name, :set_url) do
# Gives you back the name, even if it hasn't been set
#
# Returns
# +String+
def name
set_name || url.split('/').last # Queue names are last in the URL path
end
# Gives you back the URL, even if it hasn't been set
#
# Returns
# +String+
def url
set_url || Smash::CloudPowers::Synapse::Queue::Board.new(name: name).best_guess_address
end
end
# Gives a best guess at the URL that points toward this Resource's Queue. It uses a couple params
# to build a standard URL for SQS. The only problem with using this last resort is you may need
# to use a Queue from a different region, account or name but it can be a handy catch-all for the URLs
# for most cases.
#
# Returns
# +String+
#
# Example
# best_guess_address('fooBar')
# => "https://sqs.us-west-2.amazonaws.com/12345678/fooBar"
#
# Notes
# * See Smash::CloudPowers::Zenv#zfind() to understand how
# this method finds your region and account number
def best_guess_address(board_name = @name)
"https://sqs.#{zfind(:aws_region)}.amazonaws.com/#{zfind(:account_number)}/#{board_name}"
end
# This method can be used to parse a queue name from its address.
# It can be handy if you need the name of a queue but you don't want
# the overhead of creating a QueueResource object.
#
# Parameters
# * url +String+
#
# Returns
# +String+
#
# Example
# board_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
# => foo_bar_board
def board_name(arg)
base_name = to_snake(arg.to_s.split('/').last)
%r{_board$} =~ base_name ? base_name : "#{base_name}_board"
end
# Build a Smash::CloudPowers::Synapse::Queue and the getter and setter
# methods. This method uses the Creatable and Resource interface for
# consistency throughout CloudPowers.
#
# Parameters
# * +:name+ +String+ - the name of the Queue will be used to create
# a getter and setter. Please see CloudPowers::Creatable
# * +type+ +String+|+Symbol+ - Default is
# Smash::CloudPowers::Synapse::Queue::Board. As long as the
# type you request exists, you can use it but only the Classes that
# use CloudPowers::Creatable and extend from
# CloudPowers::Resource are supported. Please see
# CloudPowers::Creatable and CloudPowers::Resource
# * config +KeywordArgument+s - any config you want to pass along.
#
# Returns
# Smash::CloudPowers::Synapse::Queue[::?]
#
# Notes:
# * See create_queue to create the real object; e.g. an
# AWS::SQS::Queue. This method just creates a representation
# of that object in memory. It can be linked up to a real Object or
# left as is and used later.
# * See CloudPowers::Creatable
# * See CloudPowers::Resource
def build_queue(name:, type: :board, **config)
build_method_name = "build_#{type}"
if self.respond_to? build_method_name
self.public_send build_method_name, name: name, **config
else
build_board(name: name, **config)
end
end
# Create a Smash::CloudPowers::Synapse::Queue, the getter
# and setter methods and the real object, either out in the wild or right
# here, on disk; e.g. create an Aws::SQS::Queue in AWS and the
# appropriately mapped Smash::CloudPowers::Synapse::Queue[::?]
# object, locally to use in other methods, etc. This method uses the
# Creatable and Resource interface for consistency throughout CloudPowers.
#
# Parameters
# * +:name+ +String+ - the name of the Queue will be used to create
# a getter and setter. Please see CloudPowers::Creatable
# * +type+ +String+|+Symbol+ - Default is
# Smash::CloudPowers::Synapse::Queue::Board. As long as the
# type you request exists, you can use it but only the Classes that
# use CloudPowers::Creatable and extend from
# CloudPowers::Resource are supported. Please see
# CloudPowers::Creatable and CloudPowers::Resource
# * config +KeywordArgument+s - any config you want to pass along.
#
# Returns
# Smash::CloudPowers::Synapse::Queue[::?]
#
# Notes:
# * See build_queue to create the real object; e.g. an
# AWS::SQS::Queue. This method just creates a representation
# of that object in memory. It can be linked up to a real Object or
# left as is and used later.
# * See CloudPowers::Creatable
# * See CloudPowers::Resource
def create_queue(name:, type: :board, **config)
create_method_name = "build_#{type}"
if self.respond_to? create_method_name
self.public_send create_method_name, name: name, **config
else
create_queue(name: name, **config)
end
end
# This method builds a Queue::QueueResource object for you to use but doesn't
# invoke the #create!() method, so no API call is made to create the Queue
# on SQS. This can be used if the QueueResource and/or Queue already exists.
#
# Parameters
# * name +String+ - name of the Queue you want to interact with
#
# Returns
# Queue::QueueResource
#
# Example
# queue_object = build_queue('exampleQueue')
# queue_object.address
# => https://sqs.us-west-2.amazonaws.com/81234567/exampleQueue
def build_board(name:, client: sqs, **config)
board_resource = Smash::CloudPowers::Synapse::Queue::Board.build(
name: to_camel(name), client: client, **config
)
attr_map(board_resource.call_name => board_resource) do |attribute, resource|
instance_attr_accessor attribute
resource
end
board_resource
end
# This method allows you to create a queue on SQS without explicitly creating a QueueResource object
#
# Parameters
# * name +String+ - The name of the Queue to be created
#
# Returns
# Queue::QueueResource
#
# Example
# create_queue('exampleQueue')
# get_queue_message_count
def create_board(name:, client: sqs, **config)
board_resource = Smash::CloudPowers::Synapse::Queue::Board.create!(
name: to_camel(name), client: sqs
)
attr_map(board_resource.call_name => board_resource) do |attribute, resource|
instance_attr_accessor attribute
resource
end
board_resource
end
# Deletes a queue message without caring about reading/interacting with the message.
# This is usually used for progress tracking, ie; a Neuron takes a message from the Backlog, moves it to
# WIP and deletes it from Backlog. Then repeats these steps for the remaining States in the Workflow
#
# Parameters
# * queue +String+ - queue is the name of the +Queue+ to interact with
# * opts +Hash+ (optional) - a configuration +Hash+ for the +SQS::QueuePoller+
#
# Notes
# * throws :stop_polling after the message is deleted
#
# Example
# get_queue_message_count('exampleQueue')
# # => n
# delete_queue_message('exampleQueue')
# get_queue_message_count('exampleQueue')
# # => n-1
def delete_queue_message(queue, opts = {})
poll(queue, opts) do |msg, stats|
poller(queue).delete_message(msg)
throw :stop_polling
end
end
# This method is used to get the approximate count of messages in a given queue
#
# Parameters
# * resource_url +String+ - the URL for the resource you need to get a count from
#
# Returns
# +Float+
#
# Example
# get_queue_message_count('exampleQueue')
# # => n
# delete_queue_message('exampleQueue')
# get_queue_message_count('exampleQueue')
# # => n-1
def get_queue_message_count(resource_url)
sqs.get_queue_attributes(
queue_url: resource_url,
attribute_names: ['ApproximateNumberOfMessages']
).attributes['ApproximateNumberOfMessages'].to_f
end
# Get an Aws::SQS::QueuePoller for a a queue to deal with messages
#
# Parameters
# * name +String+ - name of the queue. this also becomes the name of
# the +Poller+ object
# * client Aws::SQS::Client (optional) - good for hitting
# different regions or even stubbing for testing
def get_queue_poller(name, client = sqs)
queue_poller(url: best_guess_address(name), client: client)
end
# Get a message from a Queue
#
# Parameters
# * resource: The name of the resource
#
# Returns
# * +String+ if +msg.body+ is not valid JSON
# * +Hash+ if +msg.body+ is valid JSON
#
# Example
# # msg.body == 'Hey' # +String+
# pluck_queue_message('exampleQueue')
# # => 'Hey' # +String+
#
# # msg.body == "\{"tally":"ho"\}" # +JSON+
# pluck_queue_message('exampleQueue')
# # => { 'tally' => 'ho' } # +Hash+
def pluck_queue_message(resource)
poll(resource) do |msg, poller|
poller.delete_message(msg)
return valid_json?(msg.body) ? from_json(msg.body) : msg.body
end
end
# Polls the given resource with the given options hash and a block that interacts with
# the message that is retrieved from the queue
#
# Parameters
# * :queue_name +String+ - the name of the queue that you want to poll
# * :sqs +AWS::SQS:QueuePoller+ polling configuration option(s)
# * +block+ is the block that is used to interact with the message that was retrieved
#
# Returns
# the results from the +message+ and the +block+ that interacts with the +message(s)+
#
# Example
# # continuously run jobs from messages in the Queue and leaves the message in the queue
# # using the +:skip_delete+ parameter
# poll(:backlog, :skip_delete) do |msg|
# demo_job = Job.new(msg.body)
# demo_job.run
# end
def poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config)
results = nil
poller.poll(config) do |msg|
results = yield msg, poller if block_given?
poller.delete_message(msg)
throw :stop_polling
end
results
end
# Checks SQS for the existence of this queue using the #queue_search() method
#
# Parameters
# * name +String+
#
# Returns
# Boolean
#
# Notes
# * See #queue_search()
def queue_exists?(name)
!queue_search(name).empty?
end
# This method can be used to parse a queue name from its address.
# It can be handy if you need the name of a queue but you don't want
# the overhead of creating a QueueResource object.
#
# Parameters
# * url +String+
#
# Returns
# +String+
#
# Example
# resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
# => fooBar
def queue_name(arg)
base_name = to_snake(arg.to_s.split('/').last)
%r{_queue$} =~ base_name ? base_name : "#{base_name}_queue"
end
# This method can be used to parse a queue poller name from its queue
# name or @url. It can be handy if you need the name of a
# queue but you don't want the overhead of creating a QueueResource object.
#
# Parameters
# * url +String+
#
# Returns
# +String+
#
# Example
# resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
# => fooBar
def queue_poller_name(arg)
base_name = to_snake(arg.to_s.split('/').last)
%r{_queue_poller$} =~ base_name ? base_name : "#{base_name}_queue_poller"
end
# Searches for a queue based on the name
#
# Parameters
# name +String+
#
# Returns
# queue_urls +String+
#
# Example
# results = queue_search('exampleQueue') # returns related URLs
# results.first =~ /exampleQueue/ # regex match against the URL
def queue_search(name)
urls = sqs.list_queues(queue_name_prefix: name).queue_urls
# TODO: allow a collection of blocks to be itterated through. Each one
# would be able to further scope down a set of previous results then
# pass it to the next. When there are no scoping blocks left, build
# the boards and return them. Saves a TON on memory and time
urls.map do |url|
build_board(name: queue_name(url), client: sqs) do |board|
board.instance_attr_accessor :url
board.url = url
board
end
end
end
# Sends a given message to a given queue
#
# Parameters
# * address +String+ - address of the Queue you want to interact with
# * message +String+ - message to be sent
#
# Returns
# Array - Array of URLs
#
# Example
# legit_address = 'https://sqs.us-west-2.amazonaws.com/12345678/exampleQueue'
# random_message = 'Wowza, this is pretty easy.'
# resp = send_queue_message(legit_address, random_message))
# resp.message_id
# => 'some message id'
def send_queue_message(address, message, this_sqs = sqs)
this_sqs.send_message(queue_url: address, message_body: message)
end
end
end
end
end