lib/msgr/client.rb in msgr-0.15.0.1.b136 vs lib/msgr/client.rb in msgr-0.15.0.1.b139
- old
+ new
@@ -1,42 +1,67 @@
# frozen_string_literal: true
+
+require 'uri'
+require 'cgi'
+
module Msgr
# rubocop:disable Metrics/ClassLength
class Client
include Logging
- attr_reader :uri, :config
- # rubocop:disable Metrics/AbcSize
- # rubocop:disable Metrics/MethodLength
+ attr_reader :config
+
+ # rubocop:disable MethodLength
def initialize(config = {})
- @uri = ::URI.parse config[:uri] ? config.delete(:uri) : 'amqp://localhost/'
- config[:pass] ||= @uri.password
+ @config = {
+ host: '127.0.0.1',
+ vhost: '/',
+ max: 2
+ }
- @uri.user = config[:user] ||= @uri.user || 'guest'
- @uri.scheme = (config[:ssl] ||= @uri.scheme.to_s.casecmp('amqps').zero?) ? 'amqps' : 'amqp'
- @uri.host = config[:host] ||= @uri.host || '127.0.0.1'
- @uri.port = config[:port] ||= @uri.port
- @uri.path = config[:vhost] ||= @uri.path.present? ? @uri.path : '/'
- config.reject! {|_, v| v.nil? }
+ @config.merge! parse(config.delete(:uri)) if config.key?(:uri)
+ @config.merge! config.symbolize_keys
- @config = config
- @config[:max] ||= 2
-
@mutex = ::Mutex.new
@routes = Routes.new
@pid ||= ::Process.pid
log(:debug) { "Created new client on process ##{@pid}..." }
end
+ # rubocop:enable all
+ # rubocop:disable AbcSize
+ # rubocop:disable MethodLength
+ # rubocop:disable PerceivedComplexity
+ # rubocop:disable CyclomaticComplexity
+ def uri
+ @uri = begin
+ uri = ::URI.parse('amqp://localhost')
+
+ uri.user = CGI.escape(config[:user]) if config.key?(:user)
+ uri.password = '****' if config.key?(:pass)
+ uri.host = config[:host] if config.key?(:host)
+ uri.port = config[:port] if config.key?(:port)
+ uri.scheme = config[:ssl] ? 'amqps' : 'amqp'
+
+ if config.key?(:vhost) && config[:vhost] != '/'
+ uri.path = "/#{CGI.escape(config[:vhost])}"
+ end
+
+ uri
+ end
+ end
+ # rubocop:enable all
+
def running?
mutex.synchronize do
check_process!
connection.running?
end
end
+ # rubocop:disable AbcSize
def start
mutex.synchronize do
check_process!
return if connection.running?
@@ -45,10 +70,11 @@
@routes << config[:routing_file] if config[:routing_file].present?
@routes.reload
connection.bind(@routes)
end
end
+ # rubocop:enable all
def connect
mutex.synchronize do
check_process!
return if connection.running?
@@ -57,10 +83,11 @@
connection.connect
end
end
+ # rubocop:disable AbcSize
def stop(opts = {})
mutex.synchronize do
check_process!
log(:debug) { "Stop on #{uri}..." }
@@ -71,10 +98,11 @@
dispatcher.shutdown
reset
end
end
+ # rubocop:enable all
def purge(release: false)
mutex.synchronize do
check_process!
@@ -153,7 +181,25 @@
@pool = nil
@channel = nil
@bindings = nil
@dispatcher = nil
end
+
+ # rubocop:disable AbcSize
+ def parse(uri)
+ # Legacy parsing of URI configuration; does not follow usual
+ # AMQP vhost encoding but used regular URL path
+ uri = ::URI.parse(uri)
+
+ config = {}
+ config[:user] ||= uri.user if uri.user
+ config[:pass] ||= uri.password if uri.password
+ config[:host] ||= uri.host if uri.host
+ config[:port] ||= uri.port if uri.port
+ config[:vhost] ||= uri.path if uri.path
+ config[:ssl] ||= uri.scheme.casecmp('amqps').zero?
+
+ config
+ end
+ # rubocop:enable all
end
end