Sha256: cfdd8a2d8885275cc26671e26c4813e7e0236dbcdc1d78506d8dc8161092ee5b
Contents?: true
Size: 1.98 KB
Versions: 4
Compression:
Stored size: 1.98 KB
Contents
# -*- encoding: utf-8 -*- # An IO processor that does its work on its own thread. class OnStomp::Components::ThreadedProcessor # Creates a new processor for the {OnStomp::Client client} # @param [OnStomp::Client] client def initialize client @client = client @run_thread = nil @closing = false end # Returns true if its IO thread has been created and is alive, otherwise # false. # @return [true,false] def running? @run_thread && @run_thread.alive? end # Starts the processor by creating a new thread that continually invokes # {OnStomp::Connections::Base#io_process} while the client is # {OnStomp::Client#connected? connected}. # @return [self] def start @run_thread = Thread.new do begin while @client.connected? @client.connection.io_process Thread.stop if @closing end rescue OnStomp::StopReceiver rescue Exception raise end end self end # Prepares the conneciton for closing by flushing its write buffer. def prepare_to_close if running? @closing = true Thread.pass until @run_thread.stop? @client.connection.flush_write_buffer @closing = false @run_thread.wakeup end end # Causes the thread this method was invoked in to `pass` until the # processor is no longer {#running? running}. # @return [self] def join Thread.pass while running? @run_thread && @run_thread.join self end # Forcefully stops the processor and joins its IO thread to the # callee's thread. # @return [self] # @raise [IOError, SystemCallError] if either were raised in the IO thread # and the {OnStomp::Client client} is still # {OnStomp::Client#connected? connected} after the thread is joined. def stop begin @run_thread.raise OnStomp::StopReceiver if @run_thread.alive? @run_thread.join rescue IOError, SystemCallError raise if @client.connected? end @run_thread = nil self end end
Version data entries
4 entries across 4 versions & 1 rubygems