lib/swarmclient/communication.rb in swarmclient-0.1.1 vs lib/swarmclient/communication.rb in swarmclient-0.1.3

- old
+ new

@@ -1,40 +1,45 @@ +require 'google/protobuf' +require 'base64' require 'websocket-client-simple' require 'eventmachine' require 'json' -DEFAULT_IP = 'ws://127.0.0.1' -DEFAULT_PORT = 8080 +require_relative './protobuf/bluzelle_pb' +require_relative './protobuf/database_pb' -module Swarmclient +DEFAULT_UUID = '8c073d96-7291-11e8-adc0-fa7ae01bbebc' +DEFAULT_IP = '13.78.131.94' # '127.0.0.1' +DEFAULT_PORT = 51010 # 8100 +module Swarmclient class Communication - @req_id_range = 100 + attr_accessor :transaction_id_limit, :ws_set_timeout - def initialize endpoint:, port:, uuid: + @transaction_id_limit = 100 + @ws_set_timeout = 5 - @_endpoint = endpoint || DEFAULT_IP - @_port = port || DEFAULT_PORT + def initialize endpoint: DEFAULT_IP, port: DEFAULT_PORT, uuid: DEFAULT_UUID, secure: false + + @_endpoint = endpoint + @_port = port @_uuid = uuid + @_protocol_prefix = secure ? 'wss://' : 'ws://' end def create key, value - send cmd: 'create', data: { key: key, value: value } + send cmd: 'create', data: { key: key, value: value.to_s } end def read key send cmd: 'read', data: { key: key } end - def read_multiple keys - send_multiple cmd: 'read', keys: keys - end - def update key, value - send cmd: 'update', data: { key: key, value: value } + send cmd: 'update', data: { key: key, value: value.to_s } end def remove key send cmd: 'delete', data: { key: key } end @@ -45,91 +50,114 @@ def keys send cmd: 'keys', data: nil end + def size + send cmd: 'size', data: nil + end + private - def send_multiple cmd:, keys: - keys.map do |key| - res = send cmd: cmd, data: { key: key } - Hash[key, res ? res[:value] : nil] - end + def encoded_protobuf_msg cmd:, protobuf_cmd_data: + db_msg = Database_msg.new + db_msg.header = Database_header.new db_uuid: @_uuid, transaction_id: rand(@transaction_id_limit).to_i + db_msg[cmd] = protobuf_cmd_data + bzn_msg = Bzn_msg.new db: db_msg + Bzn_msg.encode bzn_msg end - def send cmd:, data: + def generate_req cmd:, data: + protobuf_cmd = cmd_to_protobuf cmd + protobuf_cmd_msg = data.nil? ? protobuf_cmd.new : protobuf_cmd.new(data) - endpoint, req = [ - [@_endpoint, ':', @_port.to_s].join(''), - { "bzn-api": "crud", "cmd": cmd, "data": data, "db-uuid": @_uuid, "request-id": rand(@req_id_range) } - ] + encoded_msg = encoded_protobuf_msg cmd: cmd, protobuf_cmd_data: protobuf_cmd_msg + encoded64_msg = Base64.strict_encode64 encoded_msg - raw_data = get endpoint: endpoint, req: req - err, res = raw_data.map { |data| data ? eval(data.gsub(/\s+/, "")) : false } + {"bzn-api": "database","msg": encoded64_msg}.to_json + end - if res - case res[:error] - when 'NOT_THE_LEADER' + def cmd_to_protobuf cmd + processed_cmd = + case cmd + when 'keys', 'size' then 'empty' + else cmd + end - @_endpoint, @_port = [ - "ws://#{res[:data][:'leader-host']}", - res[:data][:'leader-port'] - ] + Object.const_get "Database_#{processed_cmd}" + end - return send cmd: cmd, data: data + def generate_endpoint + [@_protocol_prefix, @_endpoint, ':', @_port.to_s].join('') + end - when "RECORD_EXISTS", "RECORD_NOT_FOUND" + def send cmd:, data: + endpoint, req = [ + generate_endpoint, + generate_req({ cmd: cmd, data: data }) + ] - return res[:error] + err, res = get req: req, endpoint: endpoint + return err if err + raise 'No Response' if res.nil? - when nil + db_response = Database_response.decode res - return res[:data] + if db_response.redirect + puts 'Switching leader_host: ' + db_response.redirect.leader_name + @_endpoint, @_port = [ + db_response.redirect.leader_host, + db_response.redirect.leader_port + ] - else + return send cmd: cmd, data: data - return res[:error] + elsif !db_response.resp.nil? && !db_response.resp.error.empty? + return db_response.resp.error - end else - return err + case cmd + when 'create', 'update', 'delete' then nil + when 'read' then db_response.resp.value + else db_response.resp[cmd] + end + end + end def get req:, endpoint: - res, err = [nil, nil] begin EventMachine.run do - ws = WebSocket::Client::Simple.connect endpoint ws.on :message do |msg| res = msg.data EventMachine::stop_event_loop end ws.on :open do - ws.send req.to_json + ws.send req end ws.on :close do |e| EventMachine::stop_event_loop end ws.on :error do |e| - err = e + err ||= e EventMachine::stop_event_loop end + EventMachine::Timer.new(5) { ws.close } end rescue => e err = e end - [err, res] + return [err, res] end end - end