lib/cloud_powers/synapse/queue.rb in cloud_powers-0.2.1 vs lib/cloud_powers/synapse/queue.rb in cloud_powers-0.2.2
- old
+ new
@@ -1,87 +1,119 @@
require 'uri'
-require_relative '../helper'
module Smash
module CloudPowers
module Synapse
- include Smash::CloudPowers::Helper
-
module Queue
- Board = Struct.new(:set_name, :set_address, :workflow) do
+ include Smash::CloudPowers::Helper
+ include Smash::CloudPowers::AwsResources
+ # Board <Struct>
+ # This groups common functionality for a queue
+ Board = Struct.new(:sqs, :set_name, :set_address, :workflow) do
+ include Smash::CloudPowers::Synapse::Queue
include Smash::CloudPowers::Helper
+ include Smash::CloudPowers::Zenv
+
def i_var
"@#{name}"
end
def address
- set_address ||
- env(name) ||
- "https://sqs.us-west-2.amazonaws.com/088617881078/#{name}"
+ set_address || zfind(set_name) ||
+ "https://sqs.us-west-2.amazonaws.com/#{zfind(:account_number)}/#{name}"
end
+ def create_queue!
+ sqs.create_queue(queue_name: to_camel(name))
+ self
+ end
+
+ def destroy!
+ sqs.delete_queue(queue_url: address)
+ end
+
+ def message_count
+ get_queue_message_count(address)
+ end
+
def name
set_name || address.split('/').last
end
def next_board
workflow.next
end
+
+ def pluck_message(board_name = name)
+ pluck_queue_message(board_name)
+ end
+
+ def real?
+ queue_exists?(name)
+ end
+
+ def send_message(message)
+ send_queue_message(
+ address, (valid_json?(message) ? message : message.to_json)
+ )
+ end
end # end Board
#############################################
def board_name(url)
url.to_s.split('/').last
end
- # def board_name(url)
- # # TODO: figure out a way to not have this and :name in Board
- # # gets the name from the url
- # if url =~ URI.regexp
- # url = URI.parse(url)
- # url.path.split('/').last.split('_').last
- # else
- # env(url)
- # end
- # end
+ def create_queue!(name)
+ begin
+ Board.new(sqs, to_camel(name)).create_queue!
+ rescue Aws::SQS::Errors::QueueDeletedRecently => e
+ sleep 5
+ retry
+ end
+ end
- def create_queue(name)
- sqs.create_queue(queue_name: to_camel(name))
+ def build_queue(name)
+ Board.new(sqs, to_camel(name))
end
def delete_queue_message(queue, opts = {})
poll(queue, opts) do |msg, stats|
poller(queue).delete_message(msg)
throw :stop_polling
end
end
- def get_count(board)
+ def get_queue_message_count(board_url)
sqs.get_queue_attributes(
- queue_url: board_name(board),
+ queue_url: board_url,
attribute_names: ['ApproximateNumberOfMessages']
).attributes['ApproximateNumberOfMessages'].to_f
end
- # Params: board<string>
- # returns a message and deletes it from its origin
- def pluck_message(board)
+ # @params: board<String|symbol>: The name of the board
+ # @returns a message and deletes it from its origin
+ def pluck_queue_message(board)
poll(board) do |msg, poller|
poller.delete_message(msg)
- return msg
+ return valid_json?(msg.body) ? JSON.parse(msg.body) : msg.body
end
end
def poll(board, opts = {})
this_poller = poller(board)
+ results = nil
this_poller.poll(opts) do |msg|
- yield msg, this_poller if block_given?
+ results = yield msg, this_poller if block_given?
+ this_poller.delete_message(msg)
+ throw :stop_polling
end
+ results
end
def poller(board_name)
- board = Board.new(board_name)
+ board = Board.new(sqs, board_name)
unless instance_variable_defined?(board.i_var)
instance_variable_set(
board.i_var,
Aws::SQS::QueuePoller.new(board.address)
@@ -89,23 +121,21 @@
end
instance_variable_get(board.i_var)
end
def queue_exists?(name)
- sqs.list_queues(queue_name_prefix: name)
+ !sqs.list_queues(queue_name_prefix: name).queue_urls.empty?
end
- def send_queue_message(message, *board_info)
- board = board_info.first.kind_of?(Board) ? board_info.first : Board.new(*board_info)
- message = message.to_json unless message.kind_of? String
+ def queue_search(name)
+ sqs.list_queues(queue_name_prefix: name).queue_urls
+ end
+
+ def send_queue_message(address, message)
sqs.send_message(
- queue_url: board.address,
+ queue_url: address,
message_body: message
)
- end
-
- def sqs
- @sqs ||= Aws::SQS::Client.new(credentials: Auth.creds)
end
end
end
end
end