# ----------------------------------------------------------------------- # Copyright © 2012 ShepHertz Technologies Pvt Ltd. All rights reserved. # ----------------------------------------------------------------------- require 'rubygems' require 'connection/RESTConnection' require 'util/util' require 'json/pure' require 'App42_Ruby_API/App42Response' require 'message/QueueResponseBuilder' require 'message/Queue' module App42 module Message # # Manages Asynchronous queues. Allows to create, delete, purge messages, view pending messages and # get all messages # # @see Queue # class QueueService # # this is a constructor that takes # # @param apiKey # @param secretKey # @param baseURL # def initialize(api_key, secret_key, base_url) puts "Message Service->initialize" @api_key = api_key @secret_key = secret_key @base_url = base_url @resource = "queue" @messageResource = "message" @version = "1.0" end # Creates a type Pull Queue # # @param queueName # - The name of the queue which has to be created # @param queueDescription # - The description of the queue # # @return Queue object containing queue name which has been created # # @raise App42Exception # def create_pull_queue(queueName, queueDescription) puts "Create Pull Queue Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(queueDescription, "Queue Description"); begin connection = App42::Connection::RESTConnection.new(@base_url) body = {'app42' => {"queue"=> { "name" => queueName, "description" => queueDescription }}}.to_json puts "Body #{body}" query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, "type"=>'pull' } query_params = params.clone params.store("body", body) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/pull" response = connection.post(signature, resource_url, query_params, body) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end # # Deletes the Pull type Queue # # @param queueName # - The name of the queue which has to be deleted # # @return App42Response if deleted successfully # # @raise App42Exception # def delete_pull_queue(queueName) puts "Delete Pull Queue Called " puts "Base url #{@base_url}" response = nil; responseObj = App42Response.new(); util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, "type"=>'pull' } query_params = params.clone params.store("queueName", queueName) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/pull/#{queueName}" response = connection.delete(signature, resource_url, query_params) responseObj.strResponse=(response) responseObj.isResponseSuccess=(true) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return responseObj end # # Purges message on the Queue. Note: once the Queue is purged the messages # are removed from the Queue and wont be available for dequeueing. # # @param queueName # - The name of the queue which has to be purged # # @return Queue object containing queue name which has been purged # # @raise App42Exception # def purge_pull_queue(queueName) puts "Purge Pull Queue Called " puts "Base url #{@base_url}" response = nil; responseObj = App42Response.new(); util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, "type"=>'pull' } query_params = params.clone params.store("queueName", queueName) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/pull/purge/#{queueName}" response = connection.delete(signature, resource_url, query_params) responseObj.strResponse=(response) responseObj.isResponseSuccess=(true) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return responseObj end # # Messages which are pending to be dequeue. Note: Calling this method does not # dequeue the messages in the Queue. The messages stay in the Queue till they are dequeued # # @param queueName # - The name of the queue from which pending messages have to be fetched # # @return Queue object containing pending messages in the Queue # # @raise App42Exception # def pending_messages(queueName) puts "Pending Messages Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/pending/#{queueName}" response = connection.get(signature, resource_url, query_params) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end # # Messages are retrieved and dequeued from the Queue. # # @param queueName # - The name of the queue which have to be retrieved # @param receiveTimeOut # - Receive time out # # @return Queue object containing messages in the Queue # # @raise App42Exception # def get_messages(queueName, receiveTimeOut) puts "Get Messages Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(receiveTimeOut, "ReceiveTimeOut"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) params.store("receiveTimeOut", (receiveTimeOut.to_i).to_s) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/messages/#{queueName}/#{(receiveTimeOut.to_i).to_s}" response = connection.get(signature, resource_url, query_params) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end # # Send message on the queue with an expiry. The message will expire if it is not pulled/dequeued before the expiry # # @param queueName # - The name of the queue to which the message has to be sent # @param msg # - Message that has to be sent # @param exp # - Message expiry time # # @return Queue object containing message which has been sent with its message id and correlation id # # @raise App42Exception # def send_message(queueName, msg, exp) puts "Get Messages Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(msg, "Message"); util.throwExceptionIfNullOrBlank(exp, "Exipiration"); begin connection = App42::Connection::RESTConnection.new(@base_url) body = {'app42' => {"payLoad"=> { "message" => msg, "expiration" => exp }}}.to_json puts "Body #{body}" query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, 'queueName' => queueName } query_params = params.clone params.store("body", body) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@messageResource}/#{queueName}" response = connection.post(signature, resource_url, query_params, body) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end # # Pulls all the message from the queue # # @param queueName # - The name of the queue from which messages have to be pulled # @param receiveTimeOut # - Receive time out # # @return Queue object containing messages which have been pulled # # @raise App42Exception # def receive_message(queueName, receiveTimeOut) puts "Receive Message Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(receiveTimeOut, "ReceiveTimeOut"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) params.store("receiveTimeOut", "" + (receiveTimeOut.to_i).to_s) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@messageResource}/#{queueName}/#{(receiveTimeOut.to_i).to_s}" response = connection.get(signature, resource_url, query_params) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end # # Pull message based on the correlation id # # @param queueName # - The name of the queue from which the message has to be pulled # @param receiveTimeOut # - Receive time out # @param correlationId # - Correlation Id for which message has to be pulled # # @return Queue containing message which has pulled based on the correlation id # # @raise App42Exception # def receive_message_by_correlation_id(queueName, receiveTimeOut, correlationId) puts "Receive Message Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(receiveTimeOut, "ReceiveTimeOut"); util.throwExceptionIfNullOrBlank(correlationId, "Correlation Id"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) params.store("receiveTimeOut", "" + (receiveTimeOut.to_i).to_s) params.store("correlationId", "" + correlationId) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@messageResource}/#{queueName}/#{(receiveTimeOut.to_i).to_s}/#{correlationId}" response = connection.get(signature, resource_url, query_params) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end # # Remove message from the queue based on the message id. Note: Once the message is removed it cannot be pulled # # @param queueName # - The name of the queue from which the message has to be removed # @param messageId # - The message id of the message which has to be removed. # # @return App42Response if removed successfully # # @raise App42Exception # def remove_message(queueName, messageId) puts "Remove Message Called " puts "Base url #{@base_url}" response = nil; responseObj = App42Response.new(); util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(messageId, "messageId"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) params.store("messageId", messageId) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@messageResource}/#{queueName}/#{messageId}" response = connection.delete(signature, resource_url, query_params) responseObj.strResponse=(response) responseObj.isResponseSuccess=(true) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return responseObj end end end end