lib/isomorfeus/transport.rb in isomorfeus-transport-1.0.0.zeta25 vs lib/isomorfeus/transport.rb in isomorfeus-transport-2.0.0.rc1
- old
+ new
@@ -1,196 +1,200 @@
-module Isomorfeus
- module Transport
- class << self
- if RUBY_ENGINE == 'opal'
- attr_accessor :socket
-
- def init
- @requests_in_progress = { requests: {}, agent_ids: {} }
- @socket = nil
- @initialized = false
- promise_connect if Isomorfeus.on_browser?
- true
- end
-
- def promise_connect
- promise = Promise.new
- if @socket && @socket.ready_state < 2
- promise.resolve(true)
- return promise
- end
- if Isomorfeus.on_browser?
- window_protocol = `window.location.protocol`
- ws_protocol = window_protocol == 'https:' ? 'wss:' : 'ws:'
- ws_url = "#{ws_protocol}//#{`window.location.host`}#{Isomorfeus.api_websocket_path}"
- else
- ws_url = Isomorfeus::TopLevel.transport_ws_url
- end
- @socket = Isomorfeus::Transport::Websocket.new(ws_url)
- @socket.on_error do
- @socket.close
- after 1000 do
- Isomorfeus::Transport.promise_connect
- end
- end
- @socket.on_message do |event|
- json_hash = `Opal.Hash.$new(JSON.parse(event.data))`
- Isomorfeus::Transport::ClientProcessor.process(json_hash)
- end
- @socket.on_open do |event|
- if @initialized
- requests_in_progress[:requests].each_key do |request|
- agent = get_agent_for_request_in_progress(request)
- promise_send_request(request) if agent && !agent.sent
- end
- promise.resolve(true)
- else
- @initialized = true
- init_promises = []
- Isomorfeus.transport_init_class_names.each do |constant|
- result = constant.constantize.send(:init)
- init_promises << result if result.class == Promise
- end
- if init_promises.size > 0
- Promise.when(*init_promises).then { promise.resolve(true) }
- end
- end
- end
- promise
- end
-
- def disconnect
- @socket.close if @socket
- @socket = nil
- end
-
- def promise_send_path(*path, &block)
- request = {}
- inject_path = path[0...-1]
- last_inject_path_el = inject_path.last
- last_path_el = path.last
- inject_path.inject(request) do |memo, key|
- if key == last_inject_path_el
- memo[key] = last_path_el
- else
- memo[key] = {}
- end
- end
- Isomorfeus::Transport.promise_send_request(request, &block)
- end
-
- def promise_send_request(request, &block)
- agent = if request_in_progress?(request)
- get_agent_for_request_in_progress(request)
- else
- Isomorfeus::Transport::RequestAgent.new(request)
- end
- unless agent.sent
- if block_given?
- agent.promise.then do |response|
- block.call(response)
- end
- end
- register_request_in_progress(request, agent.id)
- Isomorfeus.raise_error(message: 'No socket!') unless @socket
- begin
- @socket.send(`JSON.stringify(#{{request: { agent_ids: { agent.id => request }}}.to_n})`)
- agent.sent = true
- after(Isomorfeus.on_ssr? ? 8000 : 20000) do
- unless agent.promise.realized?
- agent.promise.reject({agent_response: { error: 'Request timeout!' }, full_response: {}})
- end
- end
- rescue
- @socket.close
- after 5000 do
- Isomorfeus::Transport.promise_connect
- end
- end
- end
- agent.promise
- end
-
- def send_message(channel_class, channel, message)
- Isomorfeus.raise_error(message: 'No socket!') unless @socket
- @socket.send(`JSON.stringify(#{{ notification: { class: channel_class.name, channel: channel, message: message }}.to_n})`)
- true
- end
-
- def promise_subscribe(channel_class_name, channel)
- request = { subscribe: true, class: channel_class_name, channel: channel }
- if request_in_progress?(request)
- agent = get_agent_for_request_in_progress(request)
- else
- agent = Isomorfeus::Transport::RequestAgent.new(request)
- register_request_in_progress(request, agent.id)
- Isomorfeus.raise_error(message: 'No socket!') unless @socket
- @socket.send(`JSON.stringify(#{{ subscribe: { agent_ids: { agent.id => request }}}.to_n})`)
- end
- result_promise = agent.promise.then do |agent|
- agent.response
- end
- result_promise
- end
-
- def promise_unsubscribe(channel_class_name, channel)
- request = { unsubscribe: true, class: channel_class_name, channel: channel }
- if request_in_progress?(request)
- agent = get_agent_for_request_in_progress(request)
- else
- agent = Isomorfeus::Transport::RequestAgent.new(request)
- register_request_in_progress(request, agent.id)
- Isomorfeus.raise_error(message: 'No socket!') unless @socket
- @socket.send(`JSON.stringify(#{{ unsubscribe: { agent_ids: { agent.id => request }}}.to_n})`)
- end
- result_promise = agent.promise.then do |agent|
- agent.response
- end
- result_promise
- end
-
- def busy?
- @requests_in_progress[:requests].size != 0
- end
-
- def requests_in_progress
- @requests_in_progress
- end
-
- def request_in_progress?(request)
- @requests_in_progress[:requests].key?(request)
- end
-
- def get_agent_for_request_in_progress(request)
- agent_id = @requests_in_progress[:requests][request]
- Isomorfeus::Transport::RequestAgent.get(agent_id)
- end
-
- def register_request_in_progress(request, agent_id)
- @requests_in_progress[:requests][request] = agent_id
- @requests_in_progress[:agent_ids][agent_id] = request
- end
-
- def unregister_request_in_progress(agent_id)
- request = @requests_in_progress[:agent_ids].delete(agent_id)
- @requests_in_progress[:requests].delete(request)
- end
- else # RUBY_ENGINE
- def send_message(channel_class, channel, message)
- channel_class_name = channel_class.name
- Isomorfeus.pub_sub_client.publish("#{channel_class_name}_#{channel}", Oj.dump({notification: { class: channel_class_name, channel: channel, message: message}}, mode: :strict))
- true
- end
-
- def promise_subscribe(channel_class, channel, &block)
- Isomorfeus.pub_sub_client.subscribe(channel)
- Promise.new.resolve({ success: channel })
- end
-
- def promise_unsubscribe(channel_class, channel, &block)
- Isomorfeus.pub_sub_client.unsubscribe(channel)
- Promise.new.resolve({ success: channel })
- end
- end # RUBY_ENGINE
- end
- end
-end
+module Isomorfeus
+ module Transport
+ class << self
+ if RUBY_ENGINE == 'opal'
+ attr_accessor :socket
+
+ def init
+ @socket = nil
+ @initialized = false
+ promise_connect if Isomorfeus.on_browser? || Isomorfeus.on_mobile?
+ true
+ end
+
+ def promise_connect
+ promise = Promise.new
+ if @socket && @socket.ready_state < 2
+ promise.resolve(true)
+ return promise
+ end
+ if Isomorfeus.on_browser?
+ window_protocol = `window.location.protocol`
+ ws_protocol = window_protocol == 'https:' ? 'wss:' : 'ws:'
+ ws_url = "#{ws_protocol}//#{`window.location.host`}#{Isomorfeus.api_websocket_path}"
+ else
+ ws_url = "#{Isomorfeus::TopLevel.transport_ws_url}"
+ end
+ @socket = Isomorfeus::Transport::WebsocketClient.new(ws_url)
+ @socket.on_error do |error|
+ if Isomorfeus.on_browser?
+ `console.warn('Isomorfeus::Transport: Will try again, but so far error connecting:', error)`
+ @socket.close
+ after 1000 do
+ Isomorfeus::Transport.promise_connect
+ end
+ else
+ Isomorfeus.raise_error(message: error.JS[:message], stack: error.JS[:stack])
+ end
+ end
+ @socket.on_message do |event|
+ json_hash = `Opal.Hash.$new(JSON.parse(event.data))`
+ Isomorfeus::Transport::ClientProcessor.process(json_hash)
+ end
+ @socket.on_open do |event|
+ if @initialized
+ requests_in_progress[:requests].each_key do |request|
+ agent = get_agent_for_request_in_progress(request)
+ promise_send_request(request) if agent && !agent.sent
+ end
+ promise.resolve(true)
+ else
+ @initialized = true
+ init_promises = []
+ Isomorfeus.transport_init_class_names.each do |constant|
+ result = constant.constantize.send(:init)
+ init_promises << result if result.class == Promise
+ end
+ if init_promises.size > 0
+ Promise.when(*init_promises).then { promise.resolve(true) }
+ end
+ end
+ end
+ promise
+ end
+
+ def disconnect
+ @socket.close if @socket
+ @socket = nil
+ end
+
+ def promise_send_path(*path, &block)
+ request = {}
+ inject_path = path[0...-1]
+ last_inject_path_el = inject_path.last
+ last_path_el = path.last
+ inject_path.inject(request) do |memo, key|
+ if key == last_inject_path_el
+ memo[key] = last_path_el
+ else
+ memo[key] = {}
+ end
+ end
+ Isomorfeus::Transport.promise_send_request(request, &block)
+ end
+
+ def promise_send_request(request, &block)
+ agent = if request_in_progress?(request)
+ get_agent_for_request_in_progress(request)
+ else
+ Isomorfeus::Transport::RequestAgent.new(request)
+ end
+ unless agent.sent
+ if block_given?
+ agent.promise.then do |response|
+ block.call(response)
+ end
+ end
+ register_request_in_progress(request, agent.id)
+ Isomorfeus.raise_error(message: 'No socket!') unless @socket
+ begin
+ @socket.send(`JSON.stringify(#{{request: { agent_ids: { agent.id => request }}}.to_n})`)
+ agent.sent = true
+ after(Isomorfeus.on_ssr? ? 8000 : 20000) do
+ unless agent.promise.realized?
+ agent.promise.reject({agent_response: { error: 'Request timeout!' }, full_response: {}})
+ end
+ end
+ rescue
+ @socket.close
+ after 5000 do
+ Isomorfeus::Transport.promise_connect
+ end
+ end
+ end
+ agent.promise
+ end
+
+ def send_message(channel_class, channel, message)
+ Isomorfeus.raise_error(message: 'No socket!') unless @socket
+ @socket.send(`JSON.stringify(#{{ notification: { class: channel_class.name, channel: channel, message: message }}.to_n})`)
+ true
+ end
+
+ def promise_subscribe(channel_class_name, channel)
+ request = { subscribe: true, class: channel_class_name, channel: channel }
+ if request_in_progress?(request)
+ agent = get_agent_for_request_in_progress(request)
+ else
+ agent = Isomorfeus::Transport::RequestAgent.new(request)
+ register_request_in_progress(request, agent.id)
+ Isomorfeus.raise_error(message: 'No socket!') unless @socket
+ @socket.send(`JSON.stringify(#{{ subscribe: { agent_ids: { agent.id => request }}}.to_n})`)
+ end
+ result_promise = agent.promise.then do |agent|
+ agent.response
+ end
+ result_promise
+ end
+
+ def promise_unsubscribe(channel_class_name, channel)
+ request = { unsubscribe: true, class: channel_class_name, channel: channel }
+ if request_in_progress?(request)
+ agent = get_agent_for_request_in_progress(request)
+ else
+ agent = Isomorfeus::Transport::RequestAgent.new(request)
+ register_request_in_progress(request, agent.id)
+ Isomorfeus.raise_error(message: 'No socket!') unless @socket
+ @socket.send(`JSON.stringify(#{{ unsubscribe: { agent_ids: { agent.id => request }}}.to_n})`)
+ end
+ result_promise = agent.promise.then do |agent|
+ agent.response
+ end
+ result_promise
+ end
+
+ def busy?
+ requests_in_progress[:requests].size != 0
+ end
+
+ def requests_in_progress
+ @requests_in_progress ||= { requests: {}, agent_ids: {} }
+ end
+
+ def request_in_progress?(request)
+ requests_in_progress[:requests].key?(request)
+ end
+
+ def get_agent_for_request_in_progress(request)
+ agent_id = requests_in_progress[:requests][request]
+ Isomorfeus::Transport::RequestAgent.get(agent_id)
+ end
+
+ def register_request_in_progress(request, agent_id)
+ requests_in_progress[:requests][request] = agent_id
+ requests_in_progress[:agent_ids][agent_id] = request
+ end
+
+ def unregister_request_in_progress(agent_id)
+ request = requests_in_progress[:agent_ids].delete(agent_id)
+ requests_in_progress[:requests].delete(request)
+ end
+ else # RUBY_ENGINE
+ def send_message(channel_class, channel, message)
+ channel_class_name = channel_class.name
+ Isomorfeus.pub_sub_client.publish("#{channel_class_name}_#{channel}", Oj.dump({notification: { class: channel_class_name, channel: channel, message: message}}, mode: :strict))
+ true
+ end
+
+ def promise_subscribe(channel_class, channel, &block)
+ Isomorfeus.pub_sub_client.subscribe(channel)
+ Promise.new.resolve({ success: channel })
+ end
+
+ def promise_unsubscribe(channel_class, channel, &block)
+ Isomorfeus.pub_sub_client.unsubscribe(channel)
+ Promise.new.resolve({ success: channel })
+ end
+ end # RUBY_ENGINE
+ end
+ end
+end