require 'set' require 'protobuf/rpc/servers/socket/worker' module Protobuf module Rpc module Socket class Server include ::Protobuf::Logging AUTO_COLLECT_TIMEOUT = 5 # seconds private attr_accessor :threshold, :host, :port, :backlog attr_writer :running public attr_reader :running alias_method :running?, :running def initialize(options) self.running = false self.host = options.fetch(:host) self.port = options.fetch(:port) self.backlog = options.fetch(:backlog, 100) self.threshold = options.fetch(:threshold, 100) end def threads @threads ||= [] end def working @working ||= Set.new end def cleanup? # every `threshold` connections run a cleanup routine after closing the response threads.size > 0 && threads.size % threshold == 0 end def cleanup_threads logger.debug { sign_message("Thread cleanup - #{threads.size} - start") } threads.delete_if do |hash| unless (thread = hash.fetch(:thread)).alive? thread.join working.delete(hash.fetch(:socket)) end end logger.debug { sign_message("Thread cleanup - #{threads.size} - complete") } end def log_signature @_log_signature ||= "[server-#{self.class.name}]" end def new_worker(socket) Thread.new(socket) do |sock| ::Protobuf::Rpc::Socket::Worker.new(sock) do |s| s.close end end end def run logger.debug { sign_message("Run") } server = ::TCPServer.new(host, port) fail "The server was unable to start properly." if server.closed? begin server.listen(backlog) listen_fds = [server] self.running = true while running? logger.debug { sign_message("Waiting for connections") } ready_cnxns = begin IO.select(listen_fds, [], [], AUTO_COLLECT_TIMEOUT) rescue IOError nil end if ready_cnxns ready_cnxns.first.each do |client| case when !running? # no-op when client == server logger.debug { sign_message("Accepted new connection") } client, _sockaddr = server.accept listen_fds << client else unless working.include?(client) working << listen_fds.delete(client) logger.debug { sign_message("Working") } threads << { :thread => new_worker(client), :socket => client } cleanup_threads if cleanup? end end end else # Run a cleanup if select times out while waiting cleanup_threads if threads.size > 1 end end ensure server.close end end def stop self.running = false end end end end end