# frozen_string_literal: true require 'socket' require 'ostruct' module Steam module Networking # Represents a connection to Steam. Holds the session key, yields # packets to the caller # # @example Creating a Connection # connection = Connection.new(Server.new(ip, port)) # connection.each_packet do |packet| # end # # @see Packet class Connection # The internal list of Packet objects read from the TCP socket attr_reader :packets def initialize(server, socket = nil) @session_key = nil @packets = PacketList.new @socket = socket @mutex = Mutex.new @server = server self end # Opens the underlying socket def open self.session_key = nil self.disconnecting = false self.socket = TCPSocket.open(@server.host, @server.port) end # Closes the socket def disconnect self.session_key = nil self.disconnecting = true socket.close end # Determine if the connection is disconnected. This is determined # by the socket being opened or close. # # @return [Bool] def disconnected? socket.closed? end # Sets the connections session key. Used for crypto def session_key=(v) @mutex.synchronize { @session_key = v } end # Send a Message to the Steam server. If we are in an encrypted # session the Packet body is encrypted before it is sent. # # @see Protocol::Message # @see Networking::Packet # # @param msg [Networking::Message] the message to send def send_msg(msg) data = msg.encode data = Crypto.encrypt(data, session_key) if session_key packet = Packet.new(data) write_socket(packet.encode) end # Yields each Packet from the socket to the calling block. # # Starts a thread for reading from the socket into a queue. And another # thread to consume that queue. Packets that are consumed from the queue # will be yielded to the block # # @see Networking::Packet def each_packet(&block) start_read_thread start_listen_thread(&block) end private # @api private def start_listen_thread @listen_thread = Thread.new do until disconnected? packet = @packets.pop yield packet if packet end end @listen_thread.abort_on_exception = true end # @api private def start_read_thread @read_thread = Thread.new do until disconnected? packet = recv_packet_from_socet @packets.add(packet) if packet end end @read_thread.abort_on_exception = true end # @api private def recv_packet_from_socet packet_size = read_packet_header read_packet(packet_size) if packet_size end # @api private def read_packet_header data = read_socket(8) return nil if data.nil? header = ByteReader.new(StringIO.new(data)) size = header.signed_int32 magic = header.string(4) raise "invalid packet: size=#{size}, magic=#{magic}" unless size.nonzero? && magic == Packet::TCP_MAGIC size end # @api private def read_packet(packet_size) data = read_socket(packet_size) data = Crypto.decrypt(data, session_key) if session_key Packet.new(data) end # @api private def write_socket(data) handle_timeout { socket.write(data) } rescue IOError => e # If we are disconnecting, the socket is allowed to be closed return 0 if disconnecting raise e end # @api private def read_socket(len) handle_timeout { socket.read(len) } rescue IOError => e # If we are disconnecting, the socket is allowed to be closed return nil if disconnecting raise e end # @api private def socket @mutex.synchronize { @socket } end # @api private def socket=(socket) @mutex.synchronize { @socket = socket } end # @api private def disconnecting @mutex.synchronize { @disconnecting } end # @api private def disconnecting=(v) @mutex.synchronize { @disconnecting = v } end # @api private def session_key @mutex.synchronize { @session_key } end def handle_timeout(&block) raise ArgumentError, 'requires block' unless block_given? Retryable.retryable(tries: 3, sleep: 1, on: [Errno::ETIMEDOUT]) do block.call end end end end end