lib/cloud_powers/synapse/queue.rb in cloud_powers-0.2.1 vs lib/cloud_powers/synapse/queue.rb in cloud_powers-0.2.2

- old
+ new

@@ -1,87 +1,119 @@ require 'uri' -require_relative '../helper' module Smash module CloudPowers module Synapse - include Smash::CloudPowers::Helper - module Queue - Board = Struct.new(:set_name, :set_address, :workflow) do + include Smash::CloudPowers::Helper + include Smash::CloudPowers::AwsResources + # Board <Struct> + # This groups common functionality for a queue + Board = Struct.new(:sqs, :set_name, :set_address, :workflow) do + include Smash::CloudPowers::Synapse::Queue include Smash::CloudPowers::Helper + include Smash::CloudPowers::Zenv + def i_var "@#{name}" end def address - set_address || - env(name) || - "https://sqs.us-west-2.amazonaws.com/088617881078/#{name}" + set_address || zfind(set_name) || + "https://sqs.us-west-2.amazonaws.com/#{zfind(:account_number)}/#{name}" end + def create_queue! + sqs.create_queue(queue_name: to_camel(name)) + self + end + + def destroy! + sqs.delete_queue(queue_url: address) + end + + def message_count + get_queue_message_count(address) + end + def name set_name || address.split('/').last end def next_board workflow.next end + + def pluck_message(board_name = name) + pluck_queue_message(board_name) + end + + def real? + queue_exists?(name) + end + + def send_message(message) + send_queue_message( + address, (valid_json?(message) ? message : message.to_json) + ) + end end # end Board ############################################# def board_name(url) url.to_s.split('/').last end - # def board_name(url) - # # TODO: figure out a way to not have this and :name in Board - # # gets the name from the url - # if url =~ URI.regexp - # url = URI.parse(url) - # url.path.split('/').last.split('_').last - # else - # env(url) - # end - # end + def create_queue!(name) + begin + Board.new(sqs, to_camel(name)).create_queue! + rescue Aws::SQS::Errors::QueueDeletedRecently => e + sleep 5 + retry + end + end - def create_queue(name) - sqs.create_queue(queue_name: to_camel(name)) + def build_queue(name) + Board.new(sqs, to_camel(name)) end def delete_queue_message(queue, opts = {}) poll(queue, opts) do |msg, stats| poller(queue).delete_message(msg) throw :stop_polling end end - def get_count(board) + def get_queue_message_count(board_url) sqs.get_queue_attributes( - queue_url: board_name(board), + queue_url: board_url, attribute_names: ['ApproximateNumberOfMessages'] ).attributes['ApproximateNumberOfMessages'].to_f end - # Params: board<string> - # returns a message and deletes it from its origin - def pluck_message(board) + # @params: board<String|symbol>: The name of the board + # @returns a message and deletes it from its origin + def pluck_queue_message(board) poll(board) do |msg, poller| poller.delete_message(msg) - return msg + return valid_json?(msg.body) ? JSON.parse(msg.body) : msg.body end end def poll(board, opts = {}) this_poller = poller(board) + results = nil this_poller.poll(opts) do |msg| - yield msg, this_poller if block_given? + results = yield msg, this_poller if block_given? + this_poller.delete_message(msg) + throw :stop_polling end + results end def poller(board_name) - board = Board.new(board_name) + board = Board.new(sqs, board_name) unless instance_variable_defined?(board.i_var) instance_variable_set( board.i_var, Aws::SQS::QueuePoller.new(board.address) @@ -89,23 +121,21 @@ end instance_variable_get(board.i_var) end def queue_exists?(name) - sqs.list_queues(queue_name_prefix: name) + !sqs.list_queues(queue_name_prefix: name).queue_urls.empty? end - def send_queue_message(message, *board_info) - board = board_info.first.kind_of?(Board) ? board_info.first : Board.new(*board_info) - message = message.to_json unless message.kind_of? String + def queue_search(name) + sqs.list_queues(queue_name_prefix: name).queue_urls + end + + def send_queue_message(address, message) sqs.send_message( - queue_url: board.address, + queue_url: address, message_body: message ) - end - - def sqs - @sqs ||= Aws::SQS::Client.new(credentials: Auth.creds) end end end end end