lib/isomorfeus/transport.rb in isomorfeus-transport-1.0.0.zeta23 vs lib/isomorfeus/transport.rb in isomorfeus-transport-1.0.0.zeta24

- old
+ new

@@ -2,14 +2,10 @@ module Transport class << self if RUBY_ENGINE == 'opal' attr_accessor :socket - def delay(ms = 1000, &block) - `setTimeout(#{block.to_n}, #{ms})` - end - def init @requests_in_progress = { requests: {}, agent_ids: {} } @socket = nil @initialized = false promise_connect if Isomorfeus.on_browser? @@ -30,11 +26,11 @@ ws_url = Isomorfeus::TopLevel.transport_ws_url end @socket = Isomorfeus::Transport::Websocket.new(ws_url) @socket.on_error do @socket.close - delay 1000 do + after 1000 do Isomorfeus::Transport.promise_connect end end @socket.on_message do |event| json_hash = `Opal.Hash.$new(JSON.parse(event.data))` @@ -97,70 +93,60 @@ register_request_in_progress(request, agent.id) Isomorfeus.raise_error(message: 'No socket!') unless @socket begin @socket.send(`JSON.stringify(#{{request: { agent_ids: { agent.id => request }}}.to_n})`) agent.sent = true - delay(Isomorfeus.on_ssr? ? 8000 : 20000) do + after(Isomorfeus.on_ssr? ? 8000 : 20000) do unless agent.promise.realized? agent.promise.reject({agent_response: { error: 'Request timeout!' }, full_response: {}}) end end rescue @socket.close - delay 5000 do + after 5000 do Isomorfeus::Transport.promise_connect end end end agent.promise end - def send_notification(channel_class, channel, message) + def send_message(channel_class, channel, message) Isomorfeus.raise_error(message: 'No socket!') unless @socket - @socket.send(`JSON.stringify(#{{notification: { class: channel_class.name, channel: channel, message: message}}.to_n})`) + @socket.send(`JSON.stringify(#{{ notification: { class: channel_class.name, channel: channel, message: message }}.to_n})`) true end - def subscribe(channel_class, channel, &block) - request = { subscribe: true, class: channel_class.name, channel: channel } + def promise_subscribe(channel_class_name, channel) + request = { subscribe: true, class: channel_class_name, channel: channel } if request_in_progress?(request) agent = get_agent_for_request_in_progress(request) else agent = Isomorfeus::Transport::RequestAgent.new(request) register_request_in_progress(request, agent.id) Isomorfeus.raise_error(message: 'No socket!') unless @socket - @socket.send(`JSON.stringify(#{{subscribe: { agent_ids: { agent.id => request }}}.to_n})`) + @socket.send(`JSON.stringify(#{{ subscribe: { agent_ids: { agent.id => request }}}.to_n})`) end result_promise = agent.promise.then do |agent| agent.response end - if block_given? - result_promise = result_promise.then do |response| - block.call(response) - end - end result_promise end - def unsubscribe(channel_class, channel, &block) - request = { unsubscribe: true, class: channel_class.name, channel: channel } + def promise_unsubscribe(channel_class_name, channel) + request = { unsubscribe: true, class: channel_class_name, channel: channel } if request_in_progress?(request) agent = get_agent_for_request_in_progress(request) else agent = Isomorfeus::Transport::RequestAgent.new(request) register_request_in_progress(request, agent.id) Isomorfeus.raise_error(message: 'No socket!') unless @socket - @socket.send(`JSON.stringify(#{{unsubscribe: { agent_ids: { agent.id => request }}}.to_n})`) + @socket.send(`JSON.stringify(#{{ unsubscribe: { agent_ids: { agent.id => request }}}.to_n})`) end result_promise = agent.promise.then do |agent| agent.response end - if block_given? - result_promise = result_promise.then do |response| - block.call(response) - end - end result_promise end def busy? @requests_in_progress[:requests].size != 0 @@ -187,26 +173,23 @@ def unregister_request_in_progress(agent_id) request = @requests_in_progress[:agent_ids].delete(agent_id) @requests_in_progress[:requests].delete(request) end else # RUBY_ENGINE - def send_notification(channel_class, channel, message) - Thread.current[:isomorfeus_pub_sub_client].publish(Oj.dump({notification: { class: channel_class.name, channel: channel, message: message}}, mode: :strict)) + def send_message(channel_class, channel, message) + channel_class_name = channel_class.name + Isomorfeus.pub_sub_client.publish("#{channel_class_name}_#{channel}", Oj.dump({notification: { class: channel_class_name, channel: channel, message: message}}, mode: :strict)) true end - def subscribe(channel_class, channel, &block) - Thread.current[:isomorfeus_pub_sub_client].subscribe(channel) - result_promise = Promise.new - result_promise.resolve({ success: channel }) - result_promise + def promise_subscribe(channel_class, channel, &block) + Isomorfeus.pub_sub_client.subscribe(channel) + Promise.new.resolve({ success: channel }) end - def unsubscribe(channel_class, channel, &block) - Thread.current[:isomorfeus_pub_sub_client].unsubscribe(channel) - result_promise = Promise.new - result_promise.resolve({ success: channel }) - result_promise + def promise_unsubscribe(channel_class, channel, &block) + Isomorfeus.pub_sub_client.unsubscribe(channel) + Promise.new.resolve({ success: channel }) end end # RUBY_ENGINE end end end