lib/swarmclient/communication.rb in swarmclient-0.1.3 vs lib/swarmclient/communication.rb in swarmclient-0.1.4

- old
+ new

@@ -1,163 +1,105 @@ -require 'google/protobuf' require 'base64' -require 'websocket-client-simple' -require 'eventmachine' require 'json' -require_relative './protobuf/bluzelle_pb' -require_relative './protobuf/database_pb' +require_relative './constants' +require_relative './proto_serializer' +require_relative './connection' -DEFAULT_UUID = '8c073d96-7291-11e8-adc0-fa7ae01bbebc' -DEFAULT_IP = '13.78.131.94' # '127.0.0.1' -DEFAULT_PORT = 51010 # 8100 - module Swarmclient class Communication + include Constants + include ProtoSerializer + include Connection - attr_accessor :transaction_id_limit, :ws_set_timeout - - @transaction_id_limit = 100 - @ws_set_timeout = 5 - def initialize endpoint: DEFAULT_IP, port: DEFAULT_PORT, uuid: DEFAULT_UUID, secure: false @_endpoint = endpoint @_port = port @_uuid = uuid @_protocol_prefix = secure ? 'wss://' : 'ws://' + @_redirect_attempt = 0 end def create key, value - send cmd: 'create', data: { key: key, value: value.to_s } + send_request cmd: 'create', data: { key: key, value: value.to_s } end def read key - send cmd: 'read', data: { key: key } + send_request cmd: 'read', data: { key: key } end def update key, value - send cmd: 'update', data: { key: key, value: value.to_s } + send_request cmd: 'update', data: { key: key, value: value.to_s } end def remove key - send cmd: 'delete', data: { key: key } + send_request cmd: 'delete', data: { key: key } end def has key - send cmd: 'has', data: { key: key } + send_request cmd: 'has', data: { key: key } end def keys - send cmd: 'keys', data: nil + send_request cmd: 'keys', data: nil end def size - send cmd: 'size', data: nil + send_request cmd: 'size', data: nil end private - def encoded_protobuf_msg cmd:, protobuf_cmd_data: - db_msg = Database_msg.new - db_msg.header = Database_header.new db_uuid: @_uuid, transaction_id: rand(@transaction_id_limit).to_i - db_msg[cmd] = protobuf_cmd_data - bzn_msg = Bzn_msg.new db: db_msg - Bzn_msg.encode bzn_msg - end - - def generate_req cmd:, data: - protobuf_cmd = cmd_to_protobuf cmd - protobuf_cmd_msg = data.nil? ? protobuf_cmd.new : protobuf_cmd.new(data) - - encoded_msg = encoded_protobuf_msg cmd: cmd, protobuf_cmd_data: protobuf_cmd_msg + def generate_req **options + db_msg = generate_db_msg options.merge db_uuid: @_uuid + encoded_msg = encode_msg db_msg encoded64_msg = Base64.strict_encode64 encoded_msg {"bzn-api": "database","msg": encoded64_msg}.to_json end - def cmd_to_protobuf cmd - processed_cmd = - case cmd - when 'keys', 'size' then 'empty' - else cmd - end - - Object.const_get "Database_#{processed_cmd}" - end - def generate_endpoint [@_protocol_prefix, @_endpoint, ':', @_port.to_s].join('') end - def send cmd:, data: - endpoint, req = [ - generate_endpoint, - generate_req({ cmd: cmd, data: data }) - ] + def send_request **options + raise StandardError.new "Max Leader redirect attempt reached" if @_redirect_attempt >= MAX_REDIRECT_ATTEMPT - err, res = get req: req, endpoint: endpoint - return err if err - raise 'No Response' if res.nil? + endpoint = generate_endpoint + req = generate_req options - db_response = Database_response.decode res + err, res = connect_and_send req: req, endpoint: endpoint + raise err unless err.nil? + db_response = decode_res res + if db_response.redirect puts 'Switching leader_host: ' + db_response.redirect.leader_name + @_redirect_attempt += 1 @_endpoint, @_port = [ db_response.redirect.leader_host, - db_response.redirect.leader_port + db_response.redirect.leader_port, ] - return send cmd: cmd, data: data + return send_request options - elsif !db_response.resp.nil? && !db_response.resp.error.empty? - return db_response.resp.error + elsif !(db_response.resp.nil? || db_response.resp.error.empty?) + raise db_response.resp.error else - case cmd - when 'create', 'update', 'delete' then nil - when 'read' then db_response.resp.value - else db_response.resp[cmd] + @_redirect_attempt = 0 + case options[:cmd] + when 'create', 'update', 'delete' then true + when 'read' then db_response.resp.value + else db_response.resp[options[:cmd]] end - end - end - - def get req:, endpoint: - res, err = [nil, nil] - - begin - EventMachine.run do - ws = WebSocket::Client::Simple.connect endpoint - - ws.on :message do |msg| - res = msg.data - EventMachine::stop_event_loop - end - - ws.on :open do - ws.send req - end - - ws.on :close do |e| - EventMachine::stop_event_loop - end - - ws.on :error do |e| - err ||= e - EventMachine::stop_event_loop - end - - EventMachine::Timer.new(5) { ws.close } - end - rescue => e - err = e - end - - return [err, res] + rescue => e + @_redirect_attempt = 0 + e.message end end end