lib/cosmos/streams/tcpip_socket_stream.rb in cosmos-3.5.1 vs lib/cosmos/streams/tcpip_socket_stream.rb in cosmos-3.5.2

- old
+ new

@@ -1,143 +1,143 @@ -# encoding: ascii-8bit - -# Copyright 2014 Ball Aerospace & Technologies Corp. -# All Rights Reserved. -# -# This program is free software; you can modify and/or redistribute it -# under the terms of the GNU General Public License -# as published by the Free Software Foundation; version 3 with -# attribution addendums as found in the LICENSE.txt - -require 'socket' -require 'thread' # For Mutex -require 'timeout' # For Timeout::Error -require 'cosmos/streams/stream' -require 'cosmos/config/config_parser' - -module Cosmos - - # Data {Stream} which reads and writes from Tcpip Sockets. - class TcpipSocketStream < Stream - attr_reader :write_socket - - # @param write_socket [Socket] Socket to write - # @param read_socket [Socket] Socket to read - # @param write_timeout [Float|nil] Number of seconds to wait for the write - # to complete or nil to block until the socket is ready to write. - # @param read_timeout [Float|nil] Number of seconds to wait for the read - # to complete or nil to block until the socket is ready to read. - def initialize(write_socket, read_socket, write_timeout, read_timeout) - super() - - @write_socket = write_socket - @read_socket = read_socket - @write_timeout = ConfigParser.handle_nil(write_timeout) - @write_timeout = @write_timeout.to_f if @write_timeout - @read_timeout = ConfigParser.handle_nil(read_timeout) - @read_timeout = @read_timeout.to_f if @read_timeout - - # Mutex on write is needed to protect from commands coming in from more - # than one tool - @write_mutex = Mutex.new - @connected = false - end - - # @return [String] Returns a binary string of data from the socket - def read - raise "Attempt to read from write only stream" unless @read_socket - - # No read mutex is needed because there is only one stream procesor - # reading - begin - data = @read_socket.recv_nonblock(65535) - @raw_logger_pair.read_logger.write(data) if @raw_logger_pair - rescue IO::WaitReadable - # Wait for the socket to be ready for reading or for the timeout - begin - result = IO.fast_select([@read_socket], nil, nil, @read_timeout) - - # If select returns something it means the socket is now available for - # reading so retry the read. If it returns nil it means we timed out. - if result - retry - else - raise Timeout::Error, "Read Timeout" - end - rescue IOError, Errno::ENOTSOCK - # These can happen with the socket being closed while waiting on select - data = '' - end - rescue Errno::ECONNRESET, Errno::ECONNABORTED, IOError, Errno::ENOTSOCK - data = '' - end - - data - end - - # @return [String] Returns a binary string of data from the socket. Always returns immediately - def read_nonblock - # No read mutex is needed because there is only one stream procesor - # reading - begin - data = @read_socket.recv_nonblock(65535) - @raw_logger_pair.read_logger.write(data) if @raw_logger_pair - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::ECONNRESET, Errno::ECONNABORTED - data = '' - end - - data - end - - # @param data [String] A binary string of data to write to the socket - def write(data) - raise "Attempt to write to read only stream" unless @write_socket - - @write_mutex.synchronize do - num_bytes_to_send = data.length - total_bytes_sent = 0 - bytes_sent = 0 - data_to_send = data - - loop do - begin - bytes_sent = @write_socket.write_nonblock(data_to_send) - @raw_logger_pair.write_logger.write(data_to_send[0..(bytes_sent - 1)]) if @raw_logger_pair and bytes_sent > 0 - rescue Errno::EAGAIN, Errno::EWOULDBLOCK - # Wait for the socket to be ready for writing or for the timeout - result = IO.fast_select(nil, [@write_socket], nil, @write_timeout) - # If select returns something it means the socket is now available for - # writing so retry the write. If it returns nil it means we timed out. - if result - retry - else - raise Timeout::Error, "Write Timeout" - end - end - total_bytes_sent += bytes_sent - break if total_bytes_sent >= num_bytes_to_send - data_to_send = data[total_bytes_sent..-1] - end - end - end - - # Connect the stream - def connect - # If called directly this class is acting as a server and does not need to connect the sockets - @connected = true - end - - # @return [Boolean] Whether the sockets are connected - def connected? - @connected - end - - # Disconnect by closing the sockets - def disconnect - Cosmos.close_socket(@write_socket) - Cosmos.close_socket(@read_socket) - @connected = false - end - - end # class TcpipSocketStream - -end # module Cosmos +# encoding: ascii-8bit + +# Copyright 2014 Ball Aerospace & Technologies Corp. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt + +require 'socket' +require 'thread' # For Mutex +require 'timeout' # For Timeout::Error +require 'cosmos/streams/stream' +require 'cosmos/config/config_parser' + +module Cosmos + + # Data {Stream} which reads and writes from Tcpip Sockets. + class TcpipSocketStream < Stream + attr_reader :write_socket + + # @param write_socket [Socket] Socket to write + # @param read_socket [Socket] Socket to read + # @param write_timeout [Float|nil] Number of seconds to wait for the write + # to complete or nil to block until the socket is ready to write. + # @param read_timeout [Float|nil] Number of seconds to wait for the read + # to complete or nil to block until the socket is ready to read. + def initialize(write_socket, read_socket, write_timeout, read_timeout) + super() + + @write_socket = write_socket + @read_socket = read_socket + @write_timeout = ConfigParser.handle_nil(write_timeout) + @write_timeout = @write_timeout.to_f if @write_timeout + @read_timeout = ConfigParser.handle_nil(read_timeout) + @read_timeout = @read_timeout.to_f if @read_timeout + + # Mutex on write is needed to protect from commands coming in from more + # than one tool + @write_mutex = Mutex.new + @connected = false + end + + # @return [String] Returns a binary string of data from the socket + def read + raise "Attempt to read from write only stream" unless @read_socket + + # No read mutex is needed because there is only one stream procesor + # reading + begin + data = @read_socket.recv_nonblock(65535) + @raw_logger_pair.read_logger.write(data) if @raw_logger_pair + rescue IO::WaitReadable + # Wait for the socket to be ready for reading or for the timeout + begin + result = IO.fast_select([@read_socket], nil, nil, @read_timeout) + + # If select returns something it means the socket is now available for + # reading so retry the read. If it returns nil it means we timed out. + if result + retry + else + raise Timeout::Error, "Read Timeout" + end + rescue IOError, Errno::ENOTSOCK + # These can happen with the socket being closed while waiting on select + data = '' + end + rescue Errno::ECONNRESET, Errno::ECONNABORTED, IOError, Errno::ENOTSOCK + data = '' + end + + data + end + + # @return [String] Returns a binary string of data from the socket. Always returns immediately + def read_nonblock + # No read mutex is needed because there is only one stream procesor + # reading + begin + data = @read_socket.recv_nonblock(65535) + @raw_logger_pair.read_logger.write(data) if @raw_logger_pair + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::ECONNRESET, Errno::ECONNABORTED + data = '' + end + + data + end + + # @param data [String] A binary string of data to write to the socket + def write(data) + raise "Attempt to write to read only stream" unless @write_socket + + @write_mutex.synchronize do + num_bytes_to_send = data.length + total_bytes_sent = 0 + bytes_sent = 0 + data_to_send = data + + loop do + begin + bytes_sent = @write_socket.write_nonblock(data_to_send) + @raw_logger_pair.write_logger.write(data_to_send[0..(bytes_sent - 1)]) if @raw_logger_pair and bytes_sent > 0 + rescue Errno::EAGAIN, Errno::EWOULDBLOCK + # Wait for the socket to be ready for writing or for the timeout + result = IO.fast_select(nil, [@write_socket], nil, @write_timeout) + # If select returns something it means the socket is now available for + # writing so retry the write. If it returns nil it means we timed out. + if result + retry + else + raise Timeout::Error, "Write Timeout" + end + end + total_bytes_sent += bytes_sent + break if total_bytes_sent >= num_bytes_to_send + data_to_send = data[total_bytes_sent..-1] + end + end + end + + # Connect the stream + def connect + # If called directly this class is acting as a server and does not need to connect the sockets + @connected = true + end + + # @return [Boolean] Whether the sockets are connected + def connected? + @connected + end + + # Disconnect by closing the sockets + def disconnect + Cosmos.close_socket(@write_socket) + Cosmos.close_socket(@read_socket) + @connected = false + end + + end # class TcpipSocketStream + +end # module Cosmos