module ScCore class MqResourceManager include AMQP def self.initialize_mq_resources(exchanges=[]) if RUBY_PLATFORM == "java" EventMachine::run { logger.info "Started EventMachine to create MQ resources. Receiver settings are #{MqConfig.receiver_connection_settings}" AMQP.connect(MqConfig.receiver_connection_settings, :on_tcp_connection_failure => Proc.new { logger.info "TCP connection failure." }, :on_possible_authentication_failure => Proc.new { logger.info "Auth failure." }) do |connection| logger.info "Got #{connection} on callback." create_exchange_and_queue_and_bind(connection, SupplyChain.queue_name, SupplyChain.exchange_name, 'fanout') create_exchange_and_queue_and_bind(connection, SupplyChain.sideline_queue_name, SupplyChain.sideline_exchange_name, 'fanout', true) exchanges.each do |exchange| create_exchange_and_queue_and_bind(connection, exchange[:queue_name], exchange[:exchange_name], exchange[:exchange_type] || 'fanout', exchange[:stop], exchange[:routing_key]) end if exchanges end } logger.info "Stopped EventMachine after creating MQ resources." end end def self.create_exchange_and_queue_and_bind(connection, queue_name, exchange_name, exchange_type, stop=false, routing_key=nil) Channel.new(connection, :auto_recovery => true) do |channel, open_ok| Exchange.new(channel, exchange_type, exchange_name, :durable=>true, :autodelete=>false, :internal=>false) do |exchange, declare_ok| logger.info "Created exchange #{exchange_name}" AMQP::Queue.new(channel, queue_name, :durable => true) do |queue, declare_ok| logger.info "Created queue #{queue_name}" queue.bind(exchange, :routing_key => routing_key) do logger.info "Bound #{queue_name} to #{exchange_name}" if stop connection.close { EventMachine.stop } end end end if queue_name end end end end end