require 'concurrent-ruby' module Legion module Transport module Connection class << self def connector Legion::Transport::CONNECTOR end def setup # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity,Metrics/MethodLength Legion::Logging.info("Using transport connector: #{Legion::Transport::CONNECTOR}") if @session.respond_to?(:value) && session.respond_to?(:closed?) && session.closed? @channel_thread = Concurrent::ThreadLocalVar.new(nil) elsif @session.respond_to?(:value) && session.respond_to?(:closed?) && session.open? nil elsif Legion::Transport::TYPE == 'march_hare' @session ||= Concurrent::Atom.new( MarchHare.connect(host: Legion::Settings[:transport][:connection][:host], vhost: Legion::Settings[:transport][:connection][:vhost], user: Legion::Settings[:transport][:connection][:user], password: Legion::Settings[:transport][:connection][:password], port: Legion::Settings[:transport][:connection][:port]) ) @channel_thread = Concurrent::ThreadLocalVar.new(nil) session.start session.create_channel.basic_qos(1) else @session ||= Concurrent::Atom.new( connector.new( Legion::Settings[:transport][:connection], logger: Legion::Logging::Logger.new(level: 'warn'), log_level: :info ) ) @channel_thread = Concurrent::ThreadLocalVar.new(nil) session.start session.create_channel.basic_qos(20, true) end if session.respond_to? :on_blocked session.on_blocked { Legion::Logging.warn('Legion::Transport is being blocked by RabbitMQ!') } end if session.respond_to? :on_unblocked session.on_unblocked { Legion::Logging.info('Legion::Transport is no longer being blocked by RabbitMQ') } end if session.respond_to? :after_recovery_completed session.after_recovery_completed { Legion::Logging.info('Legion::Transport has completed recovery') } end true end def channel # rubocop:disable Metrics/AbcSize return @channel_thread.value if !@channel_thread.value.nil? && @channel_thread.value.open? @channel_thread.value = session.create_channel if Legion::Transport::TYPE == 'march_hare' @channel_thread.value.basic_qos(Legion::Settings[:transport][:prefetch]) else @channel_thread.value.prefetch(Legion::Settings[:transport][:prefetch]) end @channel_thread.value end def session nil if @session.nil? @session.value end def channel_thread channel end def channel_open? channel.open? end def session_open? session.open? end def shutdown session.close end end end end end