lib/amq/uri.rb in amq-protocol-2.2.0 vs lib/amq/uri.rb in amq-protocol-2.3.0.rc1
- old
+ new
@@ -4,32 +4,97 @@
require "uri"
module AMQ
class URI
# @private
- AMQP_PORTS = {"amqp" => 5672, "amqps" => 5671}.freeze
+ AMQP_DEFAULT_PORTS = {
+ "amqp" => 5672,
+ "amqps" => 5671
+ }.freeze
+ private_constant :AMQP_DEFAULT_PORTS
+ DEFAULTS = {
+ heartbeat: nil,
+ connection_timeout: nil,
+ channel_max: nil,
+ auth_mechanism: [],
+ verify: false,
+ fail_if_no_peer_cert: false,
+ cacertfile: nil,
+ certfile: nil,
+ keyfile: nil
+ }.freeze
+
def self.parse(connection_string)
uri = ::URI.parse(connection_string)
raise ArgumentError.new("Connection URI must use amqp or amqps schema (example: amqp://bus.megacorp.internal:5766), learn more at http://bit.ly/ks8MXK") unless %w{amqp amqps}.include?(uri.scheme)
- opts = {}
+ opts = DEFAULTS.dup
opts[:scheme] = uri.scheme
opts[:user] = ::CGI::unescape(uri.user) if uri.user
opts[:pass] = ::CGI::unescape(uri.password) if uri.password
opts[:host] = uri.host if uri.host
- opts[:port] = uri.port || AMQP_PORTS[uri.scheme]
- opts[:ssl] = uri.scheme.to_s.downcase =~ /amqps/i
+ opts[:port] = uri.port || AMQP_DEFAULT_PORTS[uri.scheme]
+ opts[:ssl] = uri.scheme.to_s.downcase =~ /amqps/i # TODO: rename to tls
if uri.path =~ %r{^/(.*)}
raise ArgumentError.new("#{uri} has multiple-segment path; please percent-encode any slashes in the vhost name (e.g. /production => %2Fproduction). Learn more at http://bit.ly/amqp-gem-and-connection-uris") if $1.index('/')
opts[:vhost] = ::CGI::unescape($1)
end
+ if uri.query
+ query_params = CGI::parse(uri.query)
+
+ normalized_query_params = Hash[query_params.map { |param, value| [param, value.one? ? value.first : value] }]
+
+ opts[:heartbeat] = normalized_query_params["heartbeat"].to_i
+ opts[:connection_timeout] = normalized_query_params["connection_timeout"].to_i
+ opts[:channel_max] = normalized_query_params["channel_max"].to_i
+ opts[:auth_mechanism] = normalized_query_params["auth_mechanism"]
+
+ %w(cacertfile certfile keyfile).each do |tls_option|
+ if normalized_query_params[tls_option] && uri.scheme == "amqp"
+ raise ArgumentError.new("The option '#{tls_option}' can only be used in URIs that use amqps schema")
+ else
+ opts[tls_option.to_sym] = normalized_query_params[tls_option]
+ end
+ end
+
+ %w(verify fail_if_no_peer_cert).each do |tls_option|
+ if normalized_query_params[tls_option] && uri.scheme == "amqp"
+ raise ArgumentError.new("The option '#{tls_option}' can only be used in URIs that use amqps schema")
+ else
+ opts[tls_option.to_sym] = as_boolean(normalized_query_params[tls_option])
+ end
+ end
+ end
+
opts
end
+
def self.parse_amqp_url(s)
parse(s)
end
+
+ #
+ # Implementation
+ #
+
+ # Normalizes values returned by CGI.parse.
+ # @private
+ def self.as_boolean(val)
+ case val
+ when true then true
+ when false then false
+ when 1 then true
+ when 0 then false
+ when "true" then true
+ when "false" then false
+ else
+ !!val
+ end
+ end
+
+ private_class_method :as_boolean
end
end