lib/tamashii/agent/connection.rb in tamashii-agent-0.2.8 vs lib/tamashii/agent/connection.rb in tamashii-agent-0.3.0

- old
+ new

@@ -1,69 +1,36 @@ -require 'socket' -require 'websocket/driver' require 'aasm' -require 'openssl' require 'json' require 'concurrent' -require 'nio' require 'tamashii/common' require 'tamashii/agent/config' require 'tamashii/agent/event' require 'tamashii/agent/component' require 'tamashii/agent/handler' +require 'tamashii/client' + module Tamashii module Agent class Connection < Component + autoload :RequestObserver, 'tamashii/agent/connection/request_observer' + class RequestTimeoutError < RuntimeError; end - class RequestObserver - include Common::Loggable - def initialize(connection, id, ev_type, ev_body, future) - @connection = connection - @id = id - @ev_type = ev_type - @ev_body = ev_body - @future = future - end - - def update(time, ev_data, reason) - if @future.fulfilled? - res_ev_type = ev_data[:ev_type] - res_ev_body = ev_data[:ev_body] - case res_ev_type - when Type::RFID_RESPONSE_JSON - logger.debug "Handled: #{res_ev_type}: #{res_ev_body}" - @connection.handle_card_result(JSON.parse(res_ev_body)) - else - logger.warn "Unhandled packet result: #{res_ev_type}: #{res_ev_body}" - end - else - logger.error "#{@id} Failed with #{reason}" - @connection.on_request_timeout(@ev_type, @ev_body) - end - end - end - include AASM aasm do state :init, initial: true - state :connecting state :auth_pending state :ready - event :connect do - transitions from: :init, to: :connecting, after: Proc.new { logger.info "Start connecting" } - end - event :auth_request do - transitions from: :connecting, to: :auth_pending, after: Proc.new { logger.info "Sending authentication request" } + transitions from: :init, to: :auth_pending, after: Proc.new { logger.info "Sending authentication request" } end event :auth_success do transitions from: :auth_pending, to: :ready, after: Proc.new { logger.info "Authentication finished. Tag = #{@tag}" } end @@ -76,28 +43,23 @@ attr_reader :url attr_reader :master def initialize(master) super - @host = @master.host - @port = @master.port - @url = "#{Config.use_ssl ? "wss" : "ws"}://#{@host}:#{@port}/#{Config.entry_point}" + self.reset + @client = Tamashii::Client::Base.new @tag = 0 @future_ivar_pool = Concurrent::Map.new - @driver_lock = Mutex.new @last_error_report_time = Time.now + setup_callbacks setup_resolver end - def create_selector - @selector = NIO::Selector.new - end - def setup_resolver env_data = {connection: self} Resolver.config do [Type::REBOOT, Type::POWEROFF, Type::RESTART, Type::UPDATE].each do |type| handle type, Handler::System, env_data @@ -125,129 +87,57 @@ end end def try_send_request(ev_type, ev_body) if self.ready? - @driver_lock.synchronize do - @driver.binary(Packet.new(ev_type, @tag, ev_body).dump) - end + @client.transmit(Packet.new(ev_type, @tag, ev_body).dump) true else false end end def stop_threads super - @websocket_thr.exit if @websocket_thr - @websocket_thr = nil + @client.close end - def run - super - @websocket_thr = Thread.start { run_websocket_loop } - end - def run_websocket_loop - create_selector - loop do - ready = @selector.select(1) - ready.each { |m| m.value.call } if ready - if @io.nil? - @io = try_create_socket - if @io - # socket io opened - register_socket_io - # start ws - start_web_driver - end - end - end - end - def send_auth_request # TODO: other types of auth - @driver.binary(Packet.new(Type::AUTH_TOKEN, 0, [Type::CLIENT[:agent], @master.serial_number,Config.token].join(",")).dump) + if @client.transmit(Packet.new(Type::AUTH_TOKEN, 0, [Type::CLIENT[:agent], @master.serial_number,Config.token].join(",")).dump) + logger.debug "Auth sent!" + else + logger.error "Cannot sent auth request!" + end end - def start_web_driver - # TODO: Improve below code - @driver = WebSocket::Driver.client(self) - @driver.on :open, proc { |e| + def setup_callbacks + @client.on :open, proc { logger.info "Server opened" self.auth_request send_auth_request } - @driver.on :close, proc { |e| - logger.info "Server closed" - close_socket_io + @client.on :close, proc { + # Note: this only called when normally receive the WS close message + logger.info "Server closed normally" + } + @client.on :socket_closed, proc { + # Note: called when low-level IO is closed + logger.info "Server socket closed" self.reset } - @driver.on :message, proc { |e| - pkt = Packet.load(e.data) + @client.on :message, proc { |data| + pkt = Packet.load(data) process_packet(pkt) if pkt } - @driver.on :error, proc { |e| + @client.on :error, proc { |e| logger.error("#{e.message}") } - @driver.start - self.connect end - def register_socket_io - _monitor = @selector.register(@io, :r) - _monitor.value = proc do - begin - msg = @io.read_nonblock(4096, exception: false) - next if msg == :wait_readable - if msg.nil? - # socket closed - logger.info "No message received from server. Connection reset" - close_socket_io - self.reset - sleep 1 - else - @driver.parse(msg) - end - rescue => e - logger.error "#{e.message}" - logger.debug "Backtrace:" - e.backtrace.each {|msg| logger.debug msg} - end - end - end - def try_create_socket - logger.info "try to open socket..." - if Time.now - @last_error_report_time > 5.0 - @master.send_event(Event.new(Event::LCD_MESSAGE, "Initializing\nConnection...")) - @last_error_report_time = Time.now - end - if Config.use_ssl - OpenSSL::SSL::SSLSocket.new(TCPSocket.new(@host, @port)).connect - else - TCPSocket.new(@host, @port) - end - rescue - nil - end - - def close_socket_io - logger.info "Socket IO Closed and Deregistered" - @selector.deregister(@io) - @io.close - @io = nil - end - - def write(string) - @io.write(string) - rescue - logger.error "Write Error" - close_socket_io - self.reset - end - def process_packet(pkt) if self.auth_pending? if pkt.type == Type::AUTH_RESPONSE if pkt.body == Packet::STRING_TRUE @tag = pkt.tag @@ -341,19 +231,9 @@ create_request_async(id, ev_type, ev_body) logger.debug "Request created: #{id}" else logger.warn "Duplicated id: #{id}, ignored" end - end - - def clean_up - super - if @io - @driver.close - close_socket_io - end - rescue => e - logger.warn "Error occured when clean up: #{e.to_s}" end # When data is back def handle_remote_response(ev_type, wrapped_ev_body) logger.debug "Remote packet back: #{ev_type} #{wrapped_ev_body}"