lib/splash/transports/rabbitmq.rb in prometheus-splash-0.5.3 vs lib/splash/transports/rabbitmq.rb in prometheus-splash-0.6.0
- old
+ new
@@ -1,16 +1,26 @@
# coding: utf-8
+
+# base Splash module
module Splash
+
+ # Splash Transports namespace
module Transports
+
+ # RabbitMQ Transport
module Rabbitmq
+ # Subscriber Mode RabbitMQ Client
class Subscriber
include Splash::Config
extend Forwardable
def_delegators :@queue, :subscribe
+ # Constructor Forward subscribe method and initialize a Bunny Client atribute @queue
+ # @param [Hash] options
+ # @option options [String] :queue the name of the subscribed queue
def initialize(options = {})
@config = get_config.transports
host = @config[:rabbitmq][:host]
port = @config[:rabbitmq][:port]
@@ -30,47 +40,61 @@
end
end
-
+ # publish / get Mode RabbitMQ Client
class Client
include Splash::Config
include Splash::Transports
include Splash::Loggers
+ # Constructor initialize a Bunny Client
def initialize
@config = get_config.transports
host = @config[:rabbitmq][:host]
port = @config[:rabbitmq][:port]
vhost = (@config[:rabbitmq][:vhost])? @config[:rabbitmq][:vhost] : '/'
passwd = (@config[:rabbitmq][:passwd])? @config[:rabbitmq][:passwd] : 'guest'
user = (@config[:rabbitmq][:user])? @config[:rabbitmq][:user] : 'guest'
conf = { :host => host, :vhost => vhost, :user => user, :password => passwd, :port => port.to_i}
begin
- @connection = Bunny.new conf
+ @connection = Bunny.new conf
@connection.start
@channel = @connection.create_channel
rescue Bunny::Exception
splash_exit case: :service_dependence_missing, more: "RabbitMQ Transport not available."
end
end
-
+ # purge a queue
+ # @param [Hash] options
+ # @option options [String] :queue the name of the queue to purge
def purge(options)
@channel.queue(options[:queue]).purge
end
+ # publish to a queue
+ # @param [Hash] options
+ # @option options [String] :queue the name of the queue to purge
+ # @option options [String] :message the message to send
def publish(options ={})
return @channel.default_exchange.publish(options[:message], :routing_key => options[:queue])
end
+ # ack a specific message for manual ack with a delivery tag to a queue
+ # @param [String] ack
+ # @return [Boolean]
def ack(ack)
return @channel.acknowledge(ack, false)
end
+
+ # send an execution order message (verb+payload) via RabbitMQ to an slash input queue
+ # @param [Hash] order
+ # @return [Void] unserialized Void object from YAML
def execute(order)
queue = order[:return_to]
lock = Mutex.new
res = nil
condition = ConditionVariable.new
@@ -83,18 +107,24 @@
get_default_client.publish queue: order[:queue], message: order.to_yaml
lock.synchronize { condition.wait(lock) }
return res
end
+ # Get a message from a RabbitMQ queue
+ # @param [Hash] options
+ # @option options [String] :queue the name of the queue to query
+ # @option options [String] :manual_ack flag to inhibit ack
+ # @return [Hash] Payload + ack tag if :manual_ack
def get(options ={})
queue = @channel.queue(options[:queue])
opt = {}; opt[:manual_ack] = (options[:manual_ack])? true : false
delivery_info, properties, payload = queue.pop
res = {:message => payload}
res[:ack] = delivery_info.delivery_tag if options[:manual_ack]
return res
end
+ # close the RabbitMQ connection
def close
@connection.close
end
end