Sha256: c60e0deddb1f25d9aa1ad046e55400bd2f5a6f8e96205faef0794de03ee4e78c

Contents?: true

Size: 1.97 KB

Versions: 14

Compression:

Stored size: 1.97 KB

Contents

require 'gorgon/job_definition'

require 'amqp'
require 'uuidtools'

class OriginatorProtocol
  def initialize logger
    @logger = logger
  end

  def connect connection_information, options={}
    @connection = AMQP.connect(connection_information)
    @channel = AMQP::Channel.new(@connection)
    @connection.on_closed { options[:on_closed].call } if options[:on_closed]
    open_queues
  end

  def publish_files files
    @file_queue = @channel.queue("file_queue_" + UUIDTools::UUID.timestamp_create.to_s)

    files.each do |file|
      @channel.default_exchange.publish(file, :routing_key => @file_queue.name)
    end
  end

  def publish_job job_definition
    job_definition.file_queue_name = @file_queue.name
    job_definition.reply_exchange_name = @reply_exchange.name

    @channel.fanout("gorgon.jobs").publish(job_definition.to_json)
  end

  def send_message_to_listeners type, body={}
    # TODO: we probably want to use a different exchange for this type of messages
    message = {:type => type, :reply_exchange_name => @reply_exchange.name, :body => body}
    @channel.fanout("gorgon.jobs").publish(Yajl::Encoder.encode(message))
  end

  def receive_payloads
    @reply_queue.subscribe do |payload|
      yield payload
    end
  end

  def cancel_job
    @file_queue.purge if @file_queue
    @channel.fanout("gorgon.worker_managers").publish(cancel_message)
    @logger.log "Cancel Message sent"
  end

  def disconnect
    cleanup_queues_and_exchange
    @connection.disconnect
  end

  private

  def open_queues
    @reply_queue = @channel.queue("reply_queue_" + UUIDTools::UUID.timestamp_create.to_s)
    @reply_exchange = @channel.direct("reply_exchange_" + UUIDTools::UUID.timestamp_create.to_s)
    @reply_queue.bind(@reply_exchange)
  end

  def cleanup_queues_and_exchange
    @reply_queue.delete if @reply_queue
    @file_queue.delete if @file_queue
    @reply_exchange.delete if @reply_exchange
  end

  def cancel_message
    Yajl::Encoder.encode({:action => "cancel_job"})
  end
end

Version data entries

14 entries across 14 versions & 1 rubygems

Version Path
gorgon-0.5.0.rc1 lib/gorgon/originator_protocol.rb
gorgon-0.4.5 lib/gorgon/originator_protocol.rb
gorgon-0.4.5.rc1 lib/gorgon/originator_protocol.rb
gorgon-0.4.4 lib/gorgon/originator_protocol.rb
gorgon-0.4.3 lib/gorgon/originator_protocol.rb
gorgon-0.4.2 lib/gorgon/originator_protocol.rb
gorgon-0.4.1 lib/gorgon/originator_protocol.rb
gorgon-0.4.1.rc1 lib/gorgon/originator_protocol.rb
gorgon-0.4.0 lib/gorgon/originator_protocol.rb
gorgon-0.4.0.rc2 lib/gorgon/originator_protocol.rb
gorgon-0.4.0.rc1 lib/gorgon/originator_protocol.rb
gorgon-0.3.2 lib/gorgon/originator_protocol.rb
gorgon-0.3.1 lib/gorgon/originator_protocol.rb
gorgon-0.3.0 lib/gorgon/originator_protocol.rb