Sha256: 6ab0d8a8359dcde5679b24da9dcd0f6fff241365747d3e2b87f96adc4c2ba4ca
Contents?: true
Size: 1.67 KB
Versions: 1
Compression:
Stored size: 1.67 KB
Contents
# frozen_string_literal: true require 'grpc_kit/session/io' require 'grpc_kit/sessions/server_session' module GrpcKit class Server def initialize(interceptors: []) @sessions = [] @rpc_descs = {} @interceptors = interceptors @mutex = Mutex.new GrpcKit.logger.debug("Launched grpc_kit(v#{GrpcKit::VERSION})") end # @params handler [object] def handle(handler) handler.class.rpc_descs.each do |path, rpc_desc| if @rpc_descs[path] raise "Duplicated method registered #{path}, class: #{handler}" end @rpc_descs[path] = rpc_desc.build_server(handler, interceptors: @interceptors) end end def run(conn) establish_session(conn) do |s| s.submit_settings([]) s.start end end def shutdown GrpcKit.logger.debug('Shutdown grpc_kit') @mutex.synchronize do @sessions.each(&:finish) end end def graceful_shutdown @mutex.synchronize do @sessions.each(&:drain) end end # @params path [String] # @params stream [GrpcKit::Streams::ServerStream] def dispatch(path, stream) rpc = @rpc_descs[path] unless rpc e = GrpcKit::Errors::Unimplemented.new(path) stream.send_status(status: e.code, msg: e.message) return end stream.invoke(rpc) end private def establish_session(conn) session = GrpcKit::Sessions::ServerSession.new(GrpcKit::Session::IO.new(conn), self) begin @mutex.synchronize { @sessions << session } yield(session) ensure @mutex.synchronize { @sessions.delete(session) } end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
grpc_kit-0.1.4 | lib/grpc_kit/server.rb |