module Steep module Server class Master LSP = LanguageServer::Protocol class TypeCheckRequest attr_reader :guid attr_reader :library_paths attr_reader :signature_paths attr_reader :code_paths attr_reader :priority_paths attr_reader :checked_paths def initialize(guid:) @guid = guid @library_paths = Set[] @signature_paths = Set[] @code_paths = Set[] @priority_paths = Set[] @checked_paths = Set[] end def uri(path) URI.parse(path.to_s).tap do |uri| uri.scheme = "file" end end def as_json(assignment:) { guid: guid, library_uris: library_paths.grep(assignment).map {|path| uri(path).to_s }, signature_uris: signature_paths.grep(assignment).map {|path| uri(path).to_s }, code_uris: code_paths.grep(assignment).map {|path| uri(path).to_s }, priority_uris: priority_paths.map {|path| uri(path).to_s } } end def total library_paths.size + signature_paths.size + code_paths.size end def percentage checked_paths.size * 100 / total end def all_paths library_paths + signature_paths + code_paths end def checking_path?(path) library_paths.include?(path) || signature_paths.include?(path) || code_paths.include?(path) end def checked(path) raise unless checking_path?(path) checked_paths << path end def finished? unchecked_paths.empty? end def unchecked_paths all_paths - checked_paths end def unchecked_code_paths code_paths - checked_paths end def unchecked_library_paths library_paths - checked_paths end def unchecked_signature_paths signature_paths - checked_paths end end class TypeCheckController attr_reader :project attr_reader :priority_paths attr_reader :changed_paths attr_reader :target_paths class TargetPaths attr_reader :project attr_reader :target attr_reader :code_paths attr_reader :signature_paths attr_reader :library_paths def initialize(project:, target:) @project = project @target = target @code_paths = Set[] @signature_paths = Set[] @library_paths = Set[] end def all_paths code_paths + signature_paths + library_paths end def library_path?(path) library_paths.include?(path) end def signature_path?(path) signature_paths.include?(path) end def code_path?(path) code_paths.include?(path) end def add(path) return if library_path?(path) || signature_path?(path) || code_path?(path) relative_path = project.relative_path(path) case when target.source_pattern =~ relative_path code_paths << path when target.signature_pattern =~ relative_path signature_paths << path else library_paths << path end end alias << add end def initialize(project:) @project = project @priority_paths = Set[] @changed_paths = Set[] @target_paths = project.targets.each.map {|target| TargetPaths.new(project: project, target: target) } end def load(command_line_args:) loader = Services::FileLoader.new(base_dir: project.base_dir) target_paths.each do |paths| target = paths.target signature_service = Services::SignatureService.load_from(target.new_env_loader(project: project)) paths.library_paths.merge(signature_service.env_rbs_paths) loader.each_path_in_patterns(target.source_pattern, command_line_args) do |path| paths.code_paths << project.absolute_path(path) end loader.each_path_in_patterns(target.signature_pattern) do |path| paths.signature_paths << project.absolute_path(path) end changed_paths.merge(paths.all_paths) end end def push_changes(path) return if target_paths.any? {|paths| paths.library_path?(path) } target_paths.each {|paths| paths << path } if target_paths.any? {|paths| paths.code_path?(path) || paths.signature_path?(path) } changed_paths << path end end def update_priority(open: nil, close: nil) path = open || close target_paths.each {|paths| paths << path } case when open priority_paths << path when close priority_paths.delete path end end def make_request(guid: SecureRandom.uuid, last_request: nil, include_unchanged: false) return if changed_paths.empty? && !include_unchanged TypeCheckRequest.new(guid: guid).tap do |request| if last_request request.library_paths.merge(last_request.unchecked_library_paths) request.signature_paths.merge(last_request.unchecked_signature_paths) request.code_paths.merge(last_request.unchecked_code_paths) end if include_unchanged target_paths.each do |paths| request.signature_paths.merge(paths.signature_paths) request.library_paths.merge(paths.library_paths) request.code_paths.merge(paths.code_paths) end else updated_paths = target_paths.select {|paths| changed_paths.intersect?(paths.all_paths) } updated_paths.each do |paths| case when paths.signature_paths.intersect?(changed_paths) request.signature_paths.merge(paths.signature_paths) request.library_paths.merge(paths.library_paths) request.code_paths.merge(paths.code_paths) when paths.code_paths.intersect?(changed_paths) request.code_paths.merge(paths.code_paths & changed_paths) end end end request.priority_paths.merge(priority_paths) changed_paths.clear() end end end class ResultHandler attr_reader :request attr_reader :completion_handler attr_reader :response def initialize(request:) @request = request @response = nil @completion_handler = nil @completed = false end def process_response(message) if request[:id] == message[:id] completion_handler&.call(message) @response = message true else false end end def result response&.dig(:result) end def completed? !!@response end def on_completion(&block) @completion_handler = block end end class GroupHandler attr_reader :request attr_reader :handlers attr_reader :completion_handler def initialize() @handlers = {} @waiting_handlers = Set[] @completion_handler = nil end def process_response(message) if handler = handlers[message[:id]] handler.process_response(message) if completed? completion_handler&.call(handlers.values) end true else false end end def completed? handlers.each_value.all? {|handler| handler.completed? } end def <<(handler) handlers[handler.request[:id]] = handler end def on_completion(&block) @completion_handler = block end end class ResultController attr_reader :handlers def initialize() @handlers = [] end def <<(handler) @handlers << handler end def request_group() group = GroupHandler.new() yield group group end def process_response(message) handlers.each do |handler| return true if handler.process_response(message) end false ensure handlers.reject!(&:completed?) end end ReceiveMessageJob = Struct.new(:source, :message, keyword_init: true) do def response? message.key?(:id) && !message.key?(:method) end end SendMessageJob = Struct.new(:dest, :message, keyword_init: true) do def self.to_worker(worker, message:) new(dest: worker, message: message) end def self.to_client(message:) new(dest: :client, message: message) end end attr_reader :steepfile attr_reader :project attr_reader :reader, :writer attr_reader :commandline_args attr_reader :interaction_worker attr_reader :typecheck_workers attr_reader :job_queue attr_reader :current_type_check_request attr_reader :controller attr_reader :result_controller attr_reader :initialize_params attr_accessor :typecheck_automatically def initialize(project:, reader:, writer:, interaction_worker:, typecheck_workers:, queue: Queue.new) @project = project @reader = reader @writer = writer @interaction_worker = interaction_worker @typecheck_workers = typecheck_workers @current_type_check_request = nil @typecheck_automatically = true @commandline_args = [] @job_queue = queue @controller = TypeCheckController.new(project: project) @result_controller = ResultController.new() end def start Steep.logger.tagged "master" do tags = Steep.logger.formatter.current_tags.dup worker_threads = [] if interaction_worker worker_threads << Thread.new do Steep.logger.formatter.push_tags(*tags, "from-worker@interaction") interaction_worker.reader.read do |message| job_queue << ReceiveMessageJob.new(source: interaction_worker, message: message) end end end typecheck_workers.each do |worker| worker_threads << Thread.new do Steep.logger.formatter.push_tags(*tags, "from-worker@#{worker.name}") worker.reader.read do |message| job_queue << ReceiveMessageJob.new(source: worker, message: message) end end end read_client_thread = Thread.new do reader.read do |message| job_queue << ReceiveMessageJob.new(source: :client, message: message) break if message[:method] == "exit" end end Steep.logger.tagged "main" do while job = job_queue.deq case job when ReceiveMessageJob src = if job.source == :client :client else job.source.name end Steep.logger.tagged("ReceiveMessageJob(#{src}/#{job.message[:method]}/#{job.message[:id]})") do if job.response? && result_controller.process_response(job.message) # nop Steep.logger.info { "Processed by ResultController" } else case job.source when :client process_message_from_client(job.message) if job.message[:method] == "exit" job_queue.close() end when WorkerProcess process_message_from_worker(job.message, worker: job.source) end end end when SendMessageJob case job.dest when :client Steep.logger.info { "Processing SendMessageJob: dest=client, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" } writer.write job.message when WorkerProcess Steep.logger.info { "Processing SendMessageJob: dest=#{job.dest.name}, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" } job.dest << job.message end end end read_client_thread.join() worker_threads.each do |thread| thread.join end end end end def each_worker(&block) if block_given? yield interaction_worker if interaction_worker typecheck_workers.each &block else enum_for :each_worker end end def pathname(uri) Pathname(URI.parse(uri).path) end def work_done_progress_supported? initialize_params&.dig(:capabilities, :window, :workDoneProgress) end def process_message_from_client(message) Steep.logger.info "Processing message from client: method=#{message[:method]}, id=#{message[:id]}" id = message[:id] case message[:method] when "initialize" @initialize_params = message[:params] result_controller << group_request do |group| each_worker do |worker| group << send_request(method: "initialize", params: message[:params], worker: worker) end group.on_completion do controller.load(command_line_args: commandline_args) job_queue << SendMessageJob.to_client( message: { id: id, result: LSP::Interface::InitializeResult.new( capabilities: LSP::Interface::ServerCapabilities.new( text_document_sync: LSP::Interface::TextDocumentSyncOptions.new( change: LSP::Constant::TextDocumentSyncKind::INCREMENTAL, save: LSP::Interface::SaveOptions.new(include_text: false), open_close: true ), hover_provider: { workDoneProgress: true, partialResults: true, partialResult: true }, completion_provider: LSP::Interface::CompletionOptions.new( trigger_characters: [".", "@"], work_done_progress: true ), workspace_symbol_provider: true ) ) } ) end end when "initialized" if typecheck_automatically request = controller.make_request(include_unchanged: true) start_type_check(request, last_request: nil, start_progress: request.total > 10) end when "textDocument/didChange" broadcast_notification(message) path = pathname(message[:params][:textDocument][:uri]) controller.push_changes(path) when "textDocument/didSave" if typecheck_automatically request = controller.make_request(last_request: current_type_check_request) start_type_check( request, last_request: current_type_check_request, start_progress: request.total > 10 ) end when "textDocument/didOpen" path = pathname(message[:params][:textDocument][:uri]) controller.update_priority(open: path) when "textDocument/didClose" path = pathname(message[:params][:textDocument][:uri]) controller.update_priority(close: path) when "textDocument/hover", "textDocument/completion" if interaction_worker result_controller << send_request(method: message[:method], params: message[:params], worker: interaction_worker) do |handler| handler.on_completion do |response| job_queue << SendMessageJob.to_client( message: { id: message[:id], result: response[:result] } ) end end end when "workspace/symbol" result_controller << group_request do |group| typecheck_workers.each do |worker| group << send_request(method: "workspace/symbol", params: message[:params], worker: worker) end group.on_completion do |handlers| result = handlers.flat_map(&:result) job_queue << SendMessageJob.to_client(message: { id: message[:id], result: result }) end end when "workspace/executeCommand" case message[:params][:command] when "steep/stats" result_controller << group_request do |group| typecheck_workers.each do |worker| group << send_request(method: "workspace/executeCommand", params: message[:params], worker: worker) end group.on_completion do |handlers| stats = handlers.flat_map(&:result) job_queue << SendMessageJob.to_client( message: { id: message[:id], result: stats } ) end end end when "$/typecheck" request = controller.make_request( guid: message[:params][:guid], last_request: current_type_check_request, include_unchanged: true ) start_type_check( request, last_request: current_type_check_request, start_progress: true ) when "shutdown" result_controller << group_request do |group| each_worker do |worker| group << send_request(method: "shutdown", worker: worker) end group.on_completion do job_queue << SendMessageJob.to_client(message: { id: message[:id], result: nil }) end end when "exit" broadcast_notification(message) end end def process_message_from_worker(message, worker:) Steep.logger.tagged "#process_message_from_worker (worker=#{worker.name})" do Steep.logger.info { "Processing message from worker: method=#{message[:method] || "-"}, id=#{message[:id] || "*"}" } case when message.key?(:id) && !message.key?(:method) Steep.logger.tagged "response(id=#{message[:id]})" do Steep.logger.error { "Received unexpected response" } Steep.logger.debug { "result = #{message[:result].inspect}" } end when message.key?(:method) && !message.key?(:id) case message[:method] when "$/typecheck/progress" on_type_check_update( guid: message[:params][:guid], path: Pathname(message[:params][:path]) ) else # Forward other notifications job_queue << SendMessageJob.to_client(message: message) end end end end def start_type_check(request, last_request:, start_progress:) Steep.logger.tagged "#start_type_check(#{request.guid}, #{last_request&.guid}" do unless request Steep.logger.info "Skip start type checking" return end if last_request Steep.logger.info "Cancelling last request" job_queue << SendMessageJob.to_client( message: { method: "$/progress", params: { token: last_request.guid, value: { kind: "end" } } } ) end if start_progress Steep.logger.info "Starting new progress..." @current_type_check_request = request if work_done_progress_supported? job_queue << SendMessageJob.to_client( message: { id: fresh_request_id, method: "window/workDoneProgress/create", params: { token: request.guid } } ) end job_queue << SendMessageJob.to_client( message: { method: "$/progress", params: { token: request.guid, value: { kind: "begin", title: "Type checking", percentage: 0 } } } ) if request.finished? job_queue << SendMessageJob.to_client( message: { method: "$/progress", params: { token: request.guid, value: { kind: "end" } } } ) end else @current_type_check_request = nil end Steep.logger.info "Sending $/typecheck/start notifications" typecheck_workers.each do |worker| assignment = Services::PathAssignment.new(max_index: typecheck_workers.size, index: worker.index) job_queue << SendMessageJob.to_worker( worker, message: { method: "$/typecheck/start", params: request.as_json(assignment: assignment) } ) end end end def on_type_check_update(guid:, path:) if current = current_type_check_request() if current.guid == guid current.checked(path) Steep.logger.info { "Request updated: checked=#{path}, unchecked=#{current.unchecked_paths.size}" } percentage = current.percentage value = if percentage == 100 { kind: "end" } else progress_string = ("▮"*(percentage/5)) + ("▯"*(20 - percentage/5)) { kind: "report", percentage: percentage, message: "#{progress_string} (#{percentage}%)" } end job_queue << SendMessageJob.to_client( message: { method: "$/progress", params: { token: current.guid, value: value } } ) @current_type_check_request = nil if current.finished? end end end def broadcast_notification(message) Steep.logger.info "Broadcasting notification #{message[:method]}" each_worker do |worker| job_queue << SendMessageJob.new(dest: worker, message: message) end end def send_notification(message, worker:) Steep.logger.info "Sending notification #{message[:method]} to #{worker.name}" job_queue << SendMessageJob.new(dest: worker, message: message) end def fresh_request_id SecureRandom.alphanumeric(10) end def send_request(method:, id: fresh_request_id(), params: nil, worker:, &block) Steep.logger.info "Sending request #{method}(#{id}) to #{worker.name}" message = { method: method, id: id, params: params } ResultHandler.new(request: message).tap do |handler| yield handler if block_given? job_queue << SendMessageJob.to_worker(worker, message: message) end end def group_request() GroupHandler.new().tap do |group| yield group end end def kill each_worker do |worker| worker.kill end end end end end