# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. require 'thread' require 'set' require_relative 'listener' require_relative 'work_queue' module Qpid::Proton public # An AMQP container manages a set of {Listener}s and {Connection}s which # contain {#Sender} and {#Receiver} links to transfer messages. Usually, each # AMQP client or server process has a single container for all of its # connections and links. # # One or more threads can call {#run}, events generated by all the listeners and # connections will be dispatched in the {#run} threads. class Container include TimeCompare # Error raised if the container is used after {#stop} has been called. class StoppedError < RuntimeError def initialize(*args) super("container has been stopped"); end end # Create a new Container # @overload initialize(id=nil) # @param id [String,Symbol] A unique ID for this container, use random UUID if nil. # # @overload initialize(handler=nil, id=nil) # @param id [String,Symbol] A unique ID for this container, use random UUID if nil. # @param handler [MessagingHandler] Optional default handler for connections # that do not have their own handler (see {#connect} and {#listen}) # # *Note*: For multi-threaded code, it is recommended to use a separate # handler instance for each connection, as a shared handler may be called # concurrently. # def initialize(*args) @handler, @id, @panic = nil case args.size when 2 then @handler, @id = args when 1 then @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol) @handler = args[0] unless @id when 0 then else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2" end # Use an empty messaging adapter to give default behaviour if there's no global handler. @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil) @id = (@id || SecureRandom.uuid).freeze # Implementation note: # # - #run threads take work items from @work, process them, and rearm them for select # - work items are: ConnectionTask, ListenTask, :start, :select, :schedule # - nil on the @work queue makes a #run thread exit @work = Queue.new @work << :start @work << :select @wake = SelectWaker.new # Wakes #run thread in IO.select @auto_stop = true # Stop when @active drops to 0 @schedule = Schedule.new @schedule_working = false # True if :schedule is on the work queue # Following instance variables protected by lock @lock = Mutex.new @active = 0 # All active tasks, in @selectable, @work or being processed @selectable = Set.new # Tasks ready to block in IO.select @running = 0 # Count of #run threads @stopped = false # #stop called @stop_err = nil # Optional error to pass to tasks, from #stop end # @return [MessagingHandler] The container-wide handler attr_reader :handler # @return [String] unique identifier for this container attr_reader :id # Auto-stop flag. # # True (the default) means that the container will stop automatically, as if {#stop} # had been called, when the last listener or connection closes. # # False means {#run} will not return unless {#stop} is called. # # @return [Bool] auto-stop state attr_accessor :auto_stop # True if the container has been stopped and can no longer be used. # @return [Bool] stopped state attr_accessor :stopped # Number of threads in {#run} # @return [Bool] {#run} thread count def running() @lock.synchronize { @running }; end # Open an AMQP connection. # # @param url [String, URI] Open a {TCPSocket} to url.host, url.port. # url.scheme must be "amqp" or "amqps", url.scheme.nil? is treated as "amqp" # url.user, url.password are used as defaults if opts[:user], opts[:password] are nil # @option (see Connection#open) # @return [Connection] The new AMQP connection def connect(url, opts=nil) not_stopped url = Qpid::Proton::uri url opts ||= {} if url.user || url.password opts[:user] ||= url.user opts[:password] ||= url.password end opts[:ssl_domain] ||= SSLDomain.new(SSLDomain::MODE_CLIENT) if url.scheme == "amqps" connect_io(TCPSocket.new(url.host, url.port), opts) end # Open an AMQP protocol connection on an existing {IO} object # @param io [IO] An existing {IO} object, e.g. a {TCPSocket} # @option (see Connection#open) def connect_io(io, opts=nil) not_stopped cd = connection_driver(io, opts) cd.connection.open() add(cd) cd.connection end # Listen for incoming AMQP connections # # @param url [String,URI] Listen on host:port of the AMQP URL # @param handler [Listener::Handler] A {Listener::Handler} object that will be called # with events for this listener and can generate a new set of options for each one. # @return [Listener] The AMQP listener. # def listen(url, handler=Listener::Handler.new) not_stopped url = Qpid::Proton::uri url # TODO aconway 2017-11-01: amqps, SSL listen_io(TCPServer.new(url.host, url.port), handler) end # Listen for incoming AMQP connections on an existing server socket. # @param io A server socket, for example a {TCPServer} # @param handler [Listener::Handler] Handler for events from this listener # def listen_io(io, handler=Listener::Handler.new) not_stopped l = ListenTask.new(io, handler, self) add(l) l end # Run the container: wait for IO activity, dispatch events to handlers. # # *Multi-threaading* : More than one thread can call {#run} concurrently, # the container will use all {#run} threads as a thread pool. Calls to # {MessagingHandler} or {Listener::Handler} methods are serialized for each # connection or listener. See {WorkQueue} for coordinating with other # threads. # # *Exceptions*: If any handler method raises an exception it will stop the # container, and the exception will be raised by all calls to {#run}. For # single threaded code this is often desirable. Multi-threaded server # applications should normally rescue exceptions in the handler and deal # with them in another way: logging, closing the connection with an error # condition, signalling another thread etc. # # @return [void] Returns when the container stops, see {#stop} and {#auto_stop} # # @raise [StoppedError] If the container has already been stopped when {#run} was called. # # @raise [Exception] If any {MessagingHandler} or {Listener::Handler} managed by # the container raises an exception, that exception will be raised by {#run} # def run @lock.synchronize do @running += 1 # Note: ensure clause below will decrement @running raise StoppedError if @stopped end while task = @work.pop run_one(task, Time.now) end raise @panic if @panic ensure @lock.synchronize do if (@running -= 1) > 0 work_wake nil # Signal the next thread else @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop end end end # Stop the container. # # Close all listeners and abort all connections without doing AMQP protocol close. # # {#stop} returns immediately, calls to {#run} will return when all activity # is finished. # # The container can no longer be used, using a stopped container raises # {StoppedError} on attempting. Create a new container if you want to # resume activity. # # @param error [Condition] Optional error condition passed to # {MessagingHandler#on_transport_error} for each connection and # {Listener::Handler::on_error} for each listener. # # @param panic [Exception] Optional exception raised by all concurrent calls # to run() # def stop(error=nil, panic=nil) @lock.synchronize do return if @stopped @stop_err = Condition.convert(error) @panic = panic @stopped = true check_stop_lh # NOTE: @stopped => # - no new run threads can join # - no more select calls after next wakeup # - once @active == 0, all threads will be stopped with nil end @wake.wake end # Schedule code to be executed after a delay. # @param delay [Numeric] delay in seconds, must be >= 0 # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed # @return [void] # @raise [ThreadError] if +non_block+ is true and the operation would block def schedule(delay, non_block=false, &block) not_stopped @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, non_block, &block) @wake.wake end private def wake() @wake.wake; end # Container driver applies options and adds container context to events class ConnectionTask < Qpid::Proton::HandlerDriver include TimeCompare def initialize container, io, opts, server=false super io, opts[:handler] transport.set_server if server transport.apply opts connection.apply opts @work_queue = WorkQueue.new container connection.instance_variable_set(:@work_queue, @work_queue) end def next_tick() earliest(super, @work_queue.send(:next_tick)); end def process(now) @work_queue.send(:process, now); super(); end def dispatch # Intercept dispatch to close work_queue super @work_queue.send(:close) if read_closed? && write_closed? end end class ListenTask < Listener def initialize(io, handler, container) super @closing = @closed = nil env = ENV['PN_TRACE_EVT'] if env && ["true", "1", "yes", "on"].include?(env.downcase) @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_" else @log_prefix = nil end dispatch(:on_open); end def process return if @closed unless @closing begin return @io.accept, dispatch(:on_accept) rescue IO::WaitReadable, Errno::EINTR rescue IOError, SystemCallError => e close e end end ensure if @closing @io.close rescue nil @closed = true dispatch(:on_error, @condition) if @condition dispatch(:on_close) end end def can_read?() !finished?; end def can_write?() false; end def finished?() @closed; end def dispatch(method, *args) # TODO aconway 2017-11-27: better logging STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method) end def next_tick() nil; end end # Selectable object that can be used to wake IO.select from another thread class SelectWaker def initialize @rd, @wr = IO.pipe @lock = Mutex.new @set = false end def to_io() @rd; end def wake @lock.synchronize do return if @set # Don't write if already has data @set = true begin @wr.write_nonblock('x') rescue IO::WaitWritable end end end def reset @lock.synchronize do return unless @set begin @rd.read_nonblock(1) rescue IO::WaitReadable end @set = false end end def close @rd.close @wr.close end end # Handle a single item from the @work queue, this is the heart of the #run loop. def run_one(task, now) case task when :start @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start when :select # Compute read/write select sets and minimum next_tick for select timeout r, w = [@wake], [] next_tick = @schedule.next_tick @lock.synchronize do @selectable.each do |s| r << s if s.send :can_read? w << s if s.send :can_write? next_tick = earliest(s.next_tick, next_tick) end end timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick r, w = IO.select(r, w, nil, timeout) now = Time.now unless timeout == 0 @wake.reset if r && r.delete(@wake) # selected is a Set to eliminate duplicates between r, w and next_tick due. selected = Set.new selected.merge(r) if r selected.merge(w) if w @lock.synchronize do if @stopped # close everything @selectable.each { |s| s.close @stop_err; @work << s } @selectable.clear @wake.close return end if !@schedule_working && before_eq(@schedule.next_tick, now) @schedule_working = true @work << :schedule end selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) }) @selectable -= selected # Remove selected tasks from @selectable end selected.each { |s| @work << s } # Queue up tasks needing #process @work << :select # Enable next select when ConnectionTask then maybe_panic { task.process now } rearm task when ListenTask then io, opts = maybe_panic { task.process } add(connection_driver(io, opts, true)) if io rearm task when :schedule then if maybe_panic { @schedule.process now } @lock.synchronize { @active -= 1; check_stop_lh } else @lock.synchronize { @schedule_working = false } end end end def do_select # Compute the sets to select for read and write, and the minimum next_tick for the timeout r, w = [@wake], [] next_tick = nil @lock.synchronize do @selectable.each do |s| r << s if s.can_read? w << s if s.can_write? next_tick = earliest(s.next_tick, next_tick) end end next_tick = earliest(@schedule.next_tick, next_tick) # Do the select and queue up all resulting work now = Time.now timeout = next_tick - now if next_tick r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout) @wake.reset selected = Set.new @lock.synchronize do if @stopped @selectable.each { |s| s.close @stop_err; @work << s } @wake.close return end # Check if schedule has items due and is not already working if !@schedule_working && before_eq(@schedule.next_tick, now) @work << :schedule @schedule_working = true end # Eliminate duplicates between r, w and next_tick due. selected.merge(r) if r selected.delete(@wake) selected.merge(w) if w @selectable -= selected selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) }) @selectable -= selected end selected.each { |s| @work << s } # Queue up tasks needing #process @work << :select end # Rescue any exception raised by the block and stop the container. def maybe_panic begin yield rescue Exception => e stop(nil, e) nil end end # Normally if we add work we need to set a wakeup to ensure a single #run # thread doesn't get stuck in select while there is other work on the queue. def work_wake(task) @work << task @wake.wake end def connection_driver(io, opts=nil, server=false) opts ||= {} opts[:container] = self opts[:handler] ||= @adapter ConnectionTask.new(self, io, opts, server) end # All new tasks are added here def add task @lock.synchronize do @active += 1 task.close @stop_err if @stopped end work_wake task end def rearm task @lock.synchronize do if task.finished? @active -= 1 check_stop_lh elsif @stopped task.close @stop_err work_wake task else @selectable << task end end @wake.wake end def check_stop_lh if @active.zero? && (@auto_stop || @stopped) @stopped = true work_wake nil # Signal threads to stop true end end def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end end end