lib/gorgon/originator_protocol.rb in gorgon-0.2.0 vs lib/gorgon/originator_protocol.rb in gorgon-0.3.0

- old
+ new

@@ -14,11 +14,11 @@ @connection.on_closed { options[:on_closed].call } if options[:on_closed] open_queues end def publish_files files - @file_queue = @channel.queue(UUIDTools::UUID.timestamp_create.to_s) + @file_queue = @channel.queue("file_queue_" + UUIDTools::UUID.timestamp_create.to_s) files.each do |file| @channel.default_exchange.publish(file, :routing_key => @file_queue.name) end end @@ -28,13 +28,13 @@ job_definition.reply_exchange_name = @reply_exchange.name @channel.fanout("gorgon.jobs").publish(job_definition.to_json) end - def ping_listeners - # TODO: we probably want to use a different exchange for pinging when we add more services - message = {:type => "ping", :reply_exchange_name => @reply_exchange.name} + def send_message_to_listeners type, body={} + # TODO: we probably want to use a different exchange for this type of messages + message = {:type => type, :reply_exchange_name => @reply_exchange.name, :body => body} @channel.fanout("gorgon.jobs").publish(Yajl::Encoder.encode(message)) end def receive_payloads @reply_queue.subscribe do |payload| @@ -47,24 +47,25 @@ @channel.fanout("gorgon.worker_managers").publish(cancel_message) @logger.log "Cancel Message sent" end def disconnect - cleanup_queues + cleanup_queues_and_exchange @connection.disconnect end private def open_queues - @reply_queue = @channel.queue(UUIDTools::UUID.timestamp_create.to_s) - @reply_exchange = @channel.direct(UUIDTools::UUID.timestamp_create.to_s) + @reply_queue = @channel.queue("reply_queue_" + UUIDTools::UUID.timestamp_create.to_s) + @reply_exchange = @channel.direct("reply_exchange_" + UUIDTools::UUID.timestamp_create.to_s) @reply_queue.bind(@reply_exchange) end - def cleanup_queues + def cleanup_queues_and_exchange @reply_queue.delete if @reply_queue @file_queue.delete if @file_queue + @reply_exchange.delete if @reply_exchange end def cancel_message Yajl::Encoder.encode({:action => "cancel_job"}) end