module Steep module Server class Master LSP = LanguageServer::Protocol attr_reader :steepfile attr_reader :project attr_reader :reader, :writer attr_reader :queue attr_reader :worker_count attr_reader :worker_to_paths attr_reader :interaction_worker attr_reader :signature_worker attr_reader :code_workers include Utils def initialize(project:, reader:, writer:, interaction_worker:, signature_worker:, code_workers:, queue: Queue.new) @project = project @reader = reader @writer = writer @queue = queue @interaction_worker = interaction_worker @signature_worker = signature_worker @code_workers = code_workers @worker_to_paths = {} @shutdown_request_id = nil end def start source_paths = project.all_source_files bin_size = (source_paths.size / code_workers.size) + 1 source_paths.each_slice(bin_size).with_index do |paths, index| register_code_to_worker(paths, worker: code_workers[index]) end tags = Steep.logger.formatter.current_tags.dup tags << "master" Thread.new do Steep.logger.formatter.push_tags(*tags, "from-worker@interaction") interaction_worker.reader.read do |message| process_message_from_worker(message) end end Thread.new do Steep.logger.formatter.push_tags(*tags, "from-worker@signature") signature_worker.reader.read do |message| process_message_from_worker(message) end end code_workers.each do |worker| Thread.new do Steep.logger.formatter.push_tags(*tags, "from-worker@#{worker.name}") worker.reader.read do |message| process_message_from_worker(message) end end end Thread.new do Steep.logger.formatter.push_tags(*tags, "from-client") reader.read do |request| process_message_from_client(request) end end while job = queue.pop if @shutdown_request_id if job[:id] == @shutdown_request_id writer.write(job) break end else writer.write(job) end end writer.io.close each_worker do |w| w.shutdown() end end def each_worker(&block) if block_given? yield interaction_worker yield signature_worker code_workers.each &block else enum_for :each_worker end end def process_message_from_client(message) id = message[:id] case message[:method] when "initialize" queue << { id: id, result: LSP::Interface::InitializeResult.new( capabilities: LSP::Interface::ServerCapabilities.new( text_document_sync: LSP::Interface::TextDocumentSyncOptions.new( change: LSP::Constant::TextDocumentSyncKind::INCREMENTAL ), hover_provider: true, completion_provider: LSP::Interface::CompletionOptions.new( trigger_characters: [".", "@"] ) ) ) } each_worker do |worker| worker << message end when "textDocument/didChange" update_source(message) uri = URI.parse(message[:params][:textDocument][:uri]) path = project.relative_path(Pathname(uri.path)) unless registered_path?(path) register_code_to_worker [path], worker: least_busy_worker() end each_worker do |worker| worker << message end when "textDocument/hover" interaction_worker << message when "textDocument/completion" interaction_worker << message when "textDocument/open" # Ignores open notification when "shutdown" queue << { id: id, result: nil } @shutdown_request_id = id when "exit" queue << nil end end def process_message_from_worker(message) queue << message end def paths_for(worker) worker_to_paths[worker] ||= Set[] end def least_busy_worker code_workers.min_by do |w| paths_for(w).size end end def registered_path?(path) worker_to_paths.each_value.any? {|set| set.include?(path) } end def register_code_to_worker(paths, worker:) paths_for(worker).merge(paths) worker << { method: "workspace/executeCommand", params: LSP::Interface::ExecuteCommandParams.new( command: "steep/registerSourceToWorker", arguments: paths.map do |path| "file://#{project.absolute_path(path)}" end ) } end end end end