lib/msgr/client.rb in msgr-0.15.0.1.b136 vs lib/msgr/client.rb in msgr-0.15.0.1.b139

- old
+ new

@@ -1,42 +1,67 @@ # frozen_string_literal: true + +require 'uri' +require 'cgi' + module Msgr # rubocop:disable Metrics/ClassLength class Client include Logging - attr_reader :uri, :config - # rubocop:disable Metrics/AbcSize - # rubocop:disable Metrics/MethodLength + attr_reader :config + + # rubocop:disable MethodLength def initialize(config = {}) - @uri = ::URI.parse config[:uri] ? config.delete(:uri) : 'amqp://localhost/' - config[:pass] ||= @uri.password + @config = { + host: '127.0.0.1', + vhost: '/', + max: 2 + } - @uri.user = config[:user] ||= @uri.user || 'guest' - @uri.scheme = (config[:ssl] ||= @uri.scheme.to_s.casecmp('amqps').zero?) ? 'amqps' : 'amqp' - @uri.host = config[:host] ||= @uri.host || '127.0.0.1' - @uri.port = config[:port] ||= @uri.port - @uri.path = config[:vhost] ||= @uri.path.present? ? @uri.path : '/' - config.reject! {|_, v| v.nil? } + @config.merge! parse(config.delete(:uri)) if config.key?(:uri) + @config.merge! config.symbolize_keys - @config = config - @config[:max] ||= 2 - @mutex = ::Mutex.new @routes = Routes.new @pid ||= ::Process.pid log(:debug) { "Created new client on process ##{@pid}..." } end + # rubocop:enable all + # rubocop:disable AbcSize + # rubocop:disable MethodLength + # rubocop:disable PerceivedComplexity + # rubocop:disable CyclomaticComplexity + def uri + @uri = begin + uri = ::URI.parse('amqp://localhost') + + uri.user = CGI.escape(config[:user]) if config.key?(:user) + uri.password = '****' if config.key?(:pass) + uri.host = config[:host] if config.key?(:host) + uri.port = config[:port] if config.key?(:port) + uri.scheme = config[:ssl] ? 'amqps' : 'amqp' + + if config.key?(:vhost) && config[:vhost] != '/' + uri.path = "/#{CGI.escape(config[:vhost])}" + end + + uri + end + end + # rubocop:enable all + def running? mutex.synchronize do check_process! connection.running? end end + # rubocop:disable AbcSize def start mutex.synchronize do check_process! return if connection.running? @@ -45,10 +70,11 @@ @routes << config[:routing_file] if config[:routing_file].present? @routes.reload connection.bind(@routes) end end + # rubocop:enable all def connect mutex.synchronize do check_process! return if connection.running? @@ -57,10 +83,11 @@ connection.connect end end + # rubocop:disable AbcSize def stop(opts = {}) mutex.synchronize do check_process! log(:debug) { "Stop on #{uri}..." } @@ -71,10 +98,11 @@ dispatcher.shutdown reset end end + # rubocop:enable all def purge(release: false) mutex.synchronize do check_process! @@ -153,7 +181,25 @@ @pool = nil @channel = nil @bindings = nil @dispatcher = nil end + + # rubocop:disable AbcSize + def parse(uri) + # Legacy parsing of URI configuration; does not follow usual + # AMQP vhost encoding but used regular URL path + uri = ::URI.parse(uri) + + config = {} + config[:user] ||= uri.user if uri.user + config[:pass] ||= uri.password if uri.password + config[:host] ||= uri.host if uri.host + config[:port] ||= uri.port if uri.port + config[:vhost] ||= uri.path if uri.path + config[:ssl] ||= uri.scheme.casecmp('amqps').zero? + + config + end + # rubocop:enable all end end