module Isomorfeus module Transport class << self if RUBY_ENGINE == 'opal' PING = `JSON.stringify({iso_ping: true})` attr_accessor :socket def init @socket = nil @actions = [] @actions_promise = Promise.new promise_connect true end def promise_connect(session_id = nil, promise = nil) promise = Promise.new unless promise if @socket && @socket.ready_state < 2 promise.resolve(true) return promise end window_protocol = `window.location.protocol` ws_protocol = window_protocol == 'https:' ? 'wss:' : 'ws:' ws_url = "#{ws_protocol}//#{`window.location.host`}#{Isomorfeus.api_websocket_path}" headers = (session_id.nil? || session_id.empty?) ? nil : { 'Cookie': "session=#{session_id}" } @socket = Isomorfeus::Transport::WebsocketClient.new(ws_url, nil, headers) @socket.on_error do |error| `console.warn('Isomorfeus::Transport: Will try again, but so far error connecting:', error)` @socket.close after 1000 do Isomorfeus::Transport.promise_connect(session_id, promise) end end @socket.on_message do |event| json_hash = JSON.parse(`event.data`) Isomorfeus::Transport::ClientProcessor.process(json_hash) end @socket.on_open do |event| init_promises = [] Isomorfeus.transport_init_class_names.each do |constant| result = constant.constantize.send(:init) init_promises << result if result.class == Promise end open_promise = if init_promises.size > 0 Promise.when(*init_promises) else Promise.new.resolve(true) end requests_in_progress[:requests].each_key do |request| agent = get_agent_for_request_in_progress(request) open_promise.then { promise_send_request(request) } if agent && !agent.sent end open_promise.then { promise.resolve(true) } keep_session_alive end promise end def keep_session_alive after 480000 do @socket.send(PING) keep_session_alive end end def disconnect @socket.close if @socket @socket = nil end def promise_send_path(*path) request = {} inject_path = path[0...-1] last_inject_path_el = inject_path.last last_path_el = path.last inject_path.inject(request) do |memo, key| if key == last_inject_path_el if last_path_el.is_a?(Hash) memo[key] = last_path_el else memo[key] = { last_path_el => nil } end else memo[key] = {} end end self.promise_send_request(request) end def promise_send_action(action) @actions.push(action) if @actions.length == 1 after(0) do acts = @actions @actions = [] pr = @actions_promise @actions_promise = Promise.new promise_send_request('Isomorfeus::Redux::Handler' => acts).then do |agnt| pr.resolve(agnt) end end end @actions_promise end def promise_send_request(request) agent = if request_in_progress?(request) get_agent_for_request_in_progress(request) else Isomorfeus::Transport::RequestAgent.new(request) end unless agent.queued || agent.sent agent.queued = true register_request_in_progress(request, agent.id) begin Isomorfeus.raise_error(message: 'No socket!') unless @socket @socket.send(JSON.dump({request: { agent_ids: { agent.id => request }}})) agent.sent = true after(20000) do unless agent.promise.realized? agent.promise.reject({agent_response: { error: 'Request timeout!' }, full_response: {}}) end end rescue @socket.close after(3000) do @reconnect = true Isomorfeus::Transport.promise_connect end end end agent.promise end def send_message(channel_class, channel, message) Isomorfeus.raise_error(message: 'No socket!') unless @socket @socket.send(`JSON.stringify(#{{ notification: { class: channel_class.name, channel: channel, message: message }}.to_n})`) true end def promise_subscribe(channel_class_name, channel) request = { subscribe: true, class: channel_class_name, channel: channel } if request_in_progress?(request) agent = get_agent_for_request_in_progress(request) else agent = Isomorfeus::Transport::RequestAgent.new(request) register_request_in_progress(request, agent.id) Isomorfeus.raise_error(message: 'No socket!') unless @socket @socket.send(`JSON.stringify(#{{ subscribe: { agent_ids: { agent.id => request }}}.to_n})`) end result_promise = agent.promise.then do |agent| agent.response end result_promise end def promise_unsubscribe(channel_class_name, channel) request = { unsubscribe: true, class: channel_class_name, channel: channel } if request_in_progress?(request) agent = get_agent_for_request_in_progress(request) else agent = Isomorfeus::Transport::RequestAgent.new(request) register_request_in_progress(request, agent.id) Isomorfeus.raise_error(message: 'No socket!') unless @socket @socket.send(`JSON.stringify(#{{ unsubscribe: { agent_ids: { agent.id => request }}}.to_n})`) end result_promise = agent.promise.then do |agent| agent.response end result_promise end def busy? requests_in_progress[:requests].size != 0 end def requests_in_progress @requests_in_progress ||= { requests: {}, agent_ids: {} } end def request_in_progress?(request) requests_in_progress[:requests].key?(request) end def get_agent_for_request_in_progress(request) agent_id = requests_in_progress[:requests][request] Isomorfeus::Transport::RequestAgent.get(agent_id) end def register_request_in_progress(request, agent_id) requests_in_progress[:requests][request] = agent_id requests_in_progress[:agent_ids][agent_id] = request end def unregister_request_in_progress(agent_id) request = requests_in_progress[:agent_ids].delete(agent_id) requests_in_progress[:requests].delete(request) end else # RUBY_ENGINE def send_message(channel_class, channel, message) channel_class_name = channel_class.name Isomorfeus.pub_sub_client.publish("#{channel_class_name}_#{channel}", Oj.dump({notification: { class: channel_class_name, channel: channel, message: message}}, mode: :strict)) true end def promise_subscribe(channel_class, channel, &block) Isomorfeus.pub_sub_client.subscribe(channel) Promise.new.resolve({ success: channel }) end def promise_unsubscribe(channel_class, channel, &block) Isomorfeus.pub_sub_client.unsubscribe(channel) Promise.new.resolve({ success: channel }) end end # RUBY_ENGINE end end end