require 'nats/client' require 'google/protobuf' require 'rubypitaya/core/protos/nats_connector_pb' module RubyPitaya class NatsConnector REQUEST_TYPE_SYS = 0 REQUEST_TYPE_USER = 1 MESSAGE_TYPE_REQUEST = 0 MESSAGE_TYPE_NOTIFY = 1 MESSAGE_TYPE_RESPONSE = 2 MESSAGE_TYPE_PUSH = 3 def initialize(nats_address, server_uuid, server_name) @server_uuid = server_uuid @server_name = server_name @nats_address = nats_address @nats = nil @subscribe = nil end def connect self.disconnect @nats = NATS.connect(servers: [@nats_address]) @subscribe = @nats.subscribe(subscribe_topic) loop do message = @subscribe.pending_queue.pop @subscribe.synchronize do @subscribe.pending_size -= message.data.size end request = NatsRequest.decode(message.data).to_h response = yield request nats_response = NatsResponse.new(data: response.force_encoding('ascii-8bit')) nats_response_encoded = NatsResponse.encode(nats_response) @nats.publish(message.reply, nats_response_encoded) end end def disconnect @subscribe.unsubscribe unless @subscribe.nil? @nats.close unless @nats.nil? @nats = nil @subscribe = nil end def push_to_frontend(session, message_route, payload) frontend_topic = get_frontend_topic(session.frontend_id) nats_message = NatsMessage.new( route: message_route, data: payload, reply: @server_uuid, type: MESSAGE_TYPE_REQUEST, ) nats_session = NatsSession.new( id: session.id, uid: session.uid, data: session.data.to_json, ) nats_request = NatsRequest.new( type: REQUEST_TYPE_USER, session: nats_session, msg: nats_message, frontendID: session.frontend_id, metadata: session.metadata.to_json, ) request = NatsRequest.encode(nats_request) nats_response = @nats.request(frontend_topic, request) response = NatsResponse.decode(nats_response.data).to_h response end def push_to_user(uid, message_route, payload) user_topic = get_user_message_topic(uid) nats_push = NatsPush.new( route: message_route, uid: uid, data: payload.to_json, ) @nats.publish(user_topic, nats_push) 0 end def subscribe_topic "pitaya/servers/#{@server_name}/#{@server_uuid}" end def get_frontend_topic(frontend_id) "pitaya/servers/connector/#{frontend_id}" end def get_user_message_topic(uid) "pitaya/connector/user/#{uid}/push" end end end