lib/isomorfeus/transport.rb in isomorfeus-transport-1.0.0.zeta23 vs lib/isomorfeus/transport.rb in isomorfeus-transport-1.0.0.zeta24
- old
+ new
@@ -2,14 +2,10 @@
module Transport
class << self
if RUBY_ENGINE == 'opal'
attr_accessor :socket
- def delay(ms = 1000, &block)
- `setTimeout(#{block.to_n}, #{ms})`
- end
-
def init
@requests_in_progress = { requests: {}, agent_ids: {} }
@socket = nil
@initialized = false
promise_connect if Isomorfeus.on_browser?
@@ -30,11 +26,11 @@
ws_url = Isomorfeus::TopLevel.transport_ws_url
end
@socket = Isomorfeus::Transport::Websocket.new(ws_url)
@socket.on_error do
@socket.close
- delay 1000 do
+ after 1000 do
Isomorfeus::Transport.promise_connect
end
end
@socket.on_message do |event|
json_hash = `Opal.Hash.$new(JSON.parse(event.data))`
@@ -97,70 +93,60 @@
register_request_in_progress(request, agent.id)
Isomorfeus.raise_error(message: 'No socket!') unless @socket
begin
@socket.send(`JSON.stringify(#{{request: { agent_ids: { agent.id => request }}}.to_n})`)
agent.sent = true
- delay(Isomorfeus.on_ssr? ? 8000 : 20000) do
+ after(Isomorfeus.on_ssr? ? 8000 : 20000) do
unless agent.promise.realized?
agent.promise.reject({agent_response: { error: 'Request timeout!' }, full_response: {}})
end
end
rescue
@socket.close
- delay 5000 do
+ after 5000 do
Isomorfeus::Transport.promise_connect
end
end
end
agent.promise
end
- def send_notification(channel_class, channel, message)
+ def send_message(channel_class, channel, message)
Isomorfeus.raise_error(message: 'No socket!') unless @socket
- @socket.send(`JSON.stringify(#{{notification: { class: channel_class.name, channel: channel, message: message}}.to_n})`)
+ @socket.send(`JSON.stringify(#{{ notification: { class: channel_class.name, channel: channel, message: message }}.to_n})`)
true
end
- def subscribe(channel_class, channel, &block)
- request = { subscribe: true, class: channel_class.name, channel: channel }
+ def promise_subscribe(channel_class_name, channel)
+ request = { subscribe: true, class: channel_class_name, channel: channel }
if request_in_progress?(request)
agent = get_agent_for_request_in_progress(request)
else
agent = Isomorfeus::Transport::RequestAgent.new(request)
register_request_in_progress(request, agent.id)
Isomorfeus.raise_error(message: 'No socket!') unless @socket
- @socket.send(`JSON.stringify(#{{subscribe: { agent_ids: { agent.id => request }}}.to_n})`)
+ @socket.send(`JSON.stringify(#{{ subscribe: { agent_ids: { agent.id => request }}}.to_n})`)
end
result_promise = agent.promise.then do |agent|
agent.response
end
- if block_given?
- result_promise = result_promise.then do |response|
- block.call(response)
- end
- end
result_promise
end
- def unsubscribe(channel_class, channel, &block)
- request = { unsubscribe: true, class: channel_class.name, channel: channel }
+ def promise_unsubscribe(channel_class_name, channel)
+ request = { unsubscribe: true, class: channel_class_name, channel: channel }
if request_in_progress?(request)
agent = get_agent_for_request_in_progress(request)
else
agent = Isomorfeus::Transport::RequestAgent.new(request)
register_request_in_progress(request, agent.id)
Isomorfeus.raise_error(message: 'No socket!') unless @socket
- @socket.send(`JSON.stringify(#{{unsubscribe: { agent_ids: { agent.id => request }}}.to_n})`)
+ @socket.send(`JSON.stringify(#{{ unsubscribe: { agent_ids: { agent.id => request }}}.to_n})`)
end
result_promise = agent.promise.then do |agent|
agent.response
end
- if block_given?
- result_promise = result_promise.then do |response|
- block.call(response)
- end
- end
result_promise
end
def busy?
@requests_in_progress[:requests].size != 0
@@ -187,26 +173,23 @@
def unregister_request_in_progress(agent_id)
request = @requests_in_progress[:agent_ids].delete(agent_id)
@requests_in_progress[:requests].delete(request)
end
else # RUBY_ENGINE
- def send_notification(channel_class, channel, message)
- Thread.current[:isomorfeus_pub_sub_client].publish(Oj.dump({notification: { class: channel_class.name, channel: channel, message: message}}, mode: :strict))
+ def send_message(channel_class, channel, message)
+ channel_class_name = channel_class.name
+ Isomorfeus.pub_sub_client.publish("#{channel_class_name}_#{channel}", Oj.dump({notification: { class: channel_class_name, channel: channel, message: message}}, mode: :strict))
true
end
- def subscribe(channel_class, channel, &block)
- Thread.current[:isomorfeus_pub_sub_client].subscribe(channel)
- result_promise = Promise.new
- result_promise.resolve({ success: channel })
- result_promise
+ def promise_subscribe(channel_class, channel, &block)
+ Isomorfeus.pub_sub_client.subscribe(channel)
+ Promise.new.resolve({ success: channel })
end
- def unsubscribe(channel_class, channel, &block)
- Thread.current[:isomorfeus_pub_sub_client].unsubscribe(channel)
- result_promise = Promise.new
- result_promise.resolve({ success: channel })
- result_promise
+ def promise_unsubscribe(channel_class, channel, &block)
+ Isomorfeus.pub_sub_client.unsubscribe(channel)
+ Promise.new.resolve({ success: channel })
end
end # RUBY_ENGINE
end
end
end