# encoding: utf-8 require "hot_bunnies/shutdown_listener" module HotBunnies java_import com.rabbitmq.client.ConnectionFactory java_import com.rabbitmq.client.Connection java_import java.util.concurrent.ConcurrentHashMap class Session # # API # # Connects to a RabbitMQ node. # # @api public def self.connect(options={}) cf = ConnectionFactory.new cf.uri = options[:uri] if options[:uri] cf.host = hostname_from(options) if include_host?(options) cf.port = options[:port] if options[:port] cf.virtual_host = vhost_from(options) if include_vhost?(options) cf.connection_timeout = timeout_from(options) if include_timeout?(options) cf.username = username_from(options) if include_username?(options) cf.password = password_from(options) if include_password?(options) cf.requested_heartbeat = heartbeat_from(options) if include_heartbeat?(options) cf.connection_timeout = connection_timeout_from(options) if include_connection_timeout?(options) tls = (options[:ssl] || options[:tls]) case tls when true then cf.use_ssl_protocol when String then if options[:trust_manager] cf.use_ssl_protocol(tls, options[:trust_manager]) else cf.use_ssl_protocol(tls) end end new(cf) end attr_reader :thread, :channels def initialize(connection_factory) @cf = connection_factory @connection = converting_rjc_exceptions_to_ruby do self.new_connection end @channels = ConcurrentHashMap.new @thread = Thread.current end def create_channel(n = nil) jc = if n @connection.create_channel(n) else @connection.create_channel end ch = Channel.new(self, jc) register_channel(ch) ch end def close @channels.select { |_, ch| ch.open? }.each do |_, ch| ch.close end @connection.close end def on_shutdown(&block) sh = ShutdownListener.new(self, &block) @connection.add_shutdown_listener(sh) sh end def flush @connection.flush end def heartbeat=(n) @connection.heartbeat = n end def start # no-op # # This method mimics Bunny::Session#start in Bunny 0.9. # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection, # which happens to have a #start method which is not idempotent. # # So we stub out #start in case someone migrating from Bunny forgets to remove # the call to #start. MK. end def method_missing(selector, *args) @connection.__send__(selector, *args) end # @return [String] # @api public def to_s "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>" end # # Implementation # def register_channel(ch) @channels[ch.channel_number] = ch end def unregister_channel(ch) @channels.delete(ch.channel_number) end protected def self.hostname_from(options) options[:host] || options[:hostname] || ConnectionFactory.DEFAULT_HOST end def self.include_host?(options) !!(options[:host] || options[:hostname]) end def self.vhost_from(options) options[:virtual_host] || options[:vhost] || ConnectionFactory.DEFAULT_VHOST end def self.include_vhost?(options) !!(options[:virtual_host] || options[:vhost]) end def self.timeout_from(options) options[:connection_timeout] || options[:timeout] end def self.include_timeout?(options) !!(options[:connection_timeout] || options[:timeout]) end def self.username_from(options) options[:username] || options[:user] || ConnectionFactory.DEFAULT_USER end def self.heartbeat_from(options) options[:heartbeat_interval] || options[:requested_heartbeat] || ConnectionFactory.DEFAULT_HEARTBEAT end def self.connection_timeout_from(options) options[:connection_timeout_interval] || options[:connection_timeout] || ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT end def self.include_username?(options) !!(options[:username] || options[:user]) end def self.password_from(options) options[:password] || options[:pass] || ConnectionFactory.DEFAULT_PASS end def self.include_password?(options) !!(options[:password] || options[:pass]) end def self.include_heartbeat?(options) !!(options[:heartbeat_interval] || options[:requested_heartbeat] || options[:heartbeat]) end def self.include_connection_timeout?(options) !!(options[:connection_timeout_interval] || options[:connection_timeout]) end # Executes a block, catching Java exceptions RabbitMQ Java client throws and # transforms them to Ruby exceptions that are then re-raised. # # @private def converting_rjc_exceptions_to_ruby(&block) begin block.call rescue java.net.ConnectException => e raise ConnectionRefused.new("Connection to #{@cf.host}:#{@cf.port} refused") rescue java.net.UnknownHostException => e raise ConnectionRefused.new("Connection to #{@cf.host}:#{@cf.port} refused: host unknown") rescue com.rabbitmq.client.PossibleAuthenticationFailureException => e raise PossibleAuthenticationFailureError.new(@cf.username, @cf.virtual_host, @cf.password.bytesize) end end def new_connection converting_rjc_exceptions_to_ruby do @cf.new_connection end end end end