require 'uri'
require_relative 'board'
module Smash
module CloudPowers
module Synapse
module Queue
include Smash::CloudPowers::AwsResources
include Smash::CloudPowers::Helper
# A simple Struct that acts as a Name to URL map
#
# Parameters
# * :set_name +String+ (optional) - An optional name. It should be the same name as the
# the Board and/or Queue you're working with, or else this Struct isn't that useful
# * :set_url +String+ (optional) - An optional URL. It should be the same URL as the
# the Board and/or Queue you're working with, or else this Struct isn't that useful
#
# Attributes
# * name +String+ - the +:set_name+ or parse the +#address()+ for the name
# * url +String+ - the +:set_url+ or add the name to the end of a best guess at the URL
#
# Example
# name_url_map = NUMap.new(nil, 'https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
# name_url_map.name
# # => 'fooBar'
#
# # and now in reverse
# url_name_map = NUMap.new('snargleBargle')
# url_name_map.address
# # => 'https://sqs.us-west-53.amazonaws.com/001101010010/snargleBargle'
NUMap = Struct.new(:set_name, :set_url) do
# Gives you back the name, even if it hasn't been set
#
# Returns
# +String+
def name
set_name || url.split('/').last # Queue names are last in the URL path
end
# Gives you back the URL, even if it hasn't been set
#
# Returns
# +String+
def url
set_url || Smash::CloudPowers::Queue::Board.new(name).best_guess_address
end
end
# This method can be used to parse a queue name from its address. It can be handy if you need the name
# of a queue but you don't want the overhead of creating a Board object.
#
# Parameters
# * url +String+
#
# Returns
# +String+
#
# Example
# board_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
# => fooBar
def board_name(url)
url.to_s.split('/').last
end
# This method builds a Queue::Board object for you to use but doesn't
# invoke the #create!() method, so no API call is made to create the Queue
# on SQS. This can be used if the Board and/or Queue already exists.
#
# Parameters
# * name +String+ - name of the Queue you want to interact with
#
# Returns
# Queue::Board
#
# Example
# queue_object = build_queue('exampleQueue')
# queue_object.address
# => https://sqs.us-west-2.amazonaws.com/81234567/exampleQueue
def build_queue(name)
Smash::CloudPowers::Queue::Board.build(to_camel(name), sqs)
end
# This method allows you to create a queue on SQS without explicitly creating a Board object
#
# Parameters
# * name +String+ - The name of the Queue to be created
#
# Returns
# Queue::Board
#
# Example
# create_queue('exampleQueue')
# get_queue_message_count
def create_queue!(name)
begin
Smash::CloudPowers::Queue::Board.create!(to_camel(name), sqs)
rescue Aws::SQS::Errors::QueueDeletedRecently => e
sleep 5
retry
end
end
# Deletes a queue message without caring about reading/interacting with the message.
# This is usually used for progress tracking, ie; a Neuron takes a message from the Backlog, moves it to
# WIP and deletes it from Backlog. Then repeats these steps for the remaining States in the Workflow
#
# Parameters
# * queue +String+ - queue is the name of the +Queue+ to interact with
# * opts +Hash+ (optional) - a configuration +Hash+ for the +SQS::QueuePoller+
#
# Notes
# * throws :stop_polling after the message is deleted
#
# Example
# get_queue_message_count('exampleQueue')
# # => n
# delete_queue_message('exampleQueue')
# get_queue_message_count('exampleQueue')
# # => n-1
def delete_queue_message(queue, opts = {})
poll(queue, opts) do |msg, stats|
poller(queue).delete_message(msg)
throw :stop_polling
end
end
# This method is used to get the approximate count of messages in a given queue
#
# Parameters
# * board_url +String+ - the URL for the board you need to get a count from
#
# Returns
# +Float+
#
# Example
# get_queue_message_count('exampleQueue')
# # => n
# delete_queue_message('exampleQueue')
# get_queue_message_count('exampleQueue')
# # => n-1
def get_queue_message_count(board_url)
sqs.get_queue_attributes(
queue_url: board_url,
attribute_names: ['ApproximateNumberOfMessages']
).attributes['ApproximateNumberOfMessages'].to_f
end
# Get a message from a Queue
#
# Parameters
# * board: The name of the board
#
# Returns
# * +String+ if +msg.body+ is not valid JSON
# * +Hash+ if +msg.body+ is valid JSON
#
# Example
# # msg.body == 'Hey' # +String+
# pluck_queue_message('exampleQueue')
# # => 'Hey' # +String+
#
# # msg.body == "\{"tally":"ho"\}" # +JSON+
# pluck_queue_message('exampleQueue')
# # => { 'tally' => 'ho' } # +Hash+
def pluck_queue_message(board)
poll(board) do |msg, poller|
poller.delete_message(msg)
return valid_json?(msg.body) ? JSON.parse(msg.body) : msg.body
end
end
# Polls the given board with the given options hash and a block that interacts with
# the message that is retrieved from the queue
#
# Parameters
# * +board+ +String+ - the name of the queue you want to poll
# * +opts+ +Hash+ - costomizes the Aws::SQS::QueuePoller's #poll(opts) method
# and can have any +AWS::SQS:QueuePoller+ polling configuration option(s)
# * +block+ is the block that is used to interact with the message that was retrieved
#
# Returns
# the results from the +message+ and the +block+ that interacts with the +message(s)+
#
# Example
# # continuously run jobs from messages in the Queue and leaves the message in the queue
# # using the +:skip_delete+ parameter
# poll(:backlog, :skip_delete) do |msg|
# demo_job = Job.new(msg.body)
# demo_job.run
# end
def poll(board_name, opts = {})
this_poller = queue_poller(board_name)
results = nil
this_poller.poll(opts) do |msg|
results = yield msg, this_poller if block_given?
this_poller.delete_message(msg)
throw :stop_polling
end
results
end
# This method can be used to gain a SQS::QueuePoller. It creates a Board object,
# the Board then sends the API call to SQS to create the queue and sets an instance
# variable, using the board's name, to the Board object itself
#
# Parameters
# * board_name +String+ - name of the Queue you want to gain a QueuePoller for
#
# Returns
# board_name:Queue::Board
#
# Notes
# * An instance variable is set with this method, if one doesn't exist for the board
# The instance variable that is created/used is named the same name that was given as
# a parameter.
#
# Example
# # these are equivalent after @exp_queue_poller is set but before it is set,
# # exp_queue_poller
# queue_poller('exampleQueue').poll { |msg| Job.new(msg.body).run }
# @example_queue.poll { |msg| Job.new(msg.body).run }
def queue_poller(board_name)
board = Smash::CloudPowers::Queue::Board.create!(board_name, sqs)
unless instance_variable_defined?(board.i_var)
instance_variable_set(board.i_var, board)
end
instance_variable_get(board.i_var).poller
end
# Checks SQS for the existence of this queue using the #queue_search() method
#
# Parameters
# * name +String+
#
# Returns
# Boolean
#
# Notes
# * see #queue_search()
def queue_exists?(name)
!queue_search(name).empty?
end
# Searches for a queue based on the name
#
# Parameters
# name +String+
#
# Returns
# queue_urls +String+
#
# Example
# results = queue_search('exampleQueue') # returns related URLs
# results.first =~ /exampleQueue/ # regex match against the URL
def queue_search(name)
sqs.list_queues(queue_name_prefix: name).queue_urls
end
# Sends a given message to a given queue
#
# Parameters
# * address +String+ - address of the Queue you want to interact with
# * message +String+ - message to be sent
#
# Returns
# Array - Array of URLs
#
# Example
# legit_address = 'https://sqs.us-west-2.amazonaws.com/12345678/exampleQueue'
# random_message = 'Wowza, this is pretty easy.'
# resp = send_queue_message(legit_address, random_message))
# resp.message_id
# => 'some message id'
def send_queue_message(address, message, this_sqs = sqs)
this_sqs.send_message(queue_url: address, message_body: message)
end
end
end
end
end