lib/tamashii/agent/connection.rb in tamashii-agent-0.2.8 vs lib/tamashii/agent/connection.rb in tamashii-agent-0.3.0
- old
+ new
@@ -1,69 +1,36 @@
-require 'socket'
-require 'websocket/driver'
require 'aasm'
-require 'openssl'
require 'json'
require 'concurrent'
-require 'nio'
require 'tamashii/common'
require 'tamashii/agent/config'
require 'tamashii/agent/event'
require 'tamashii/agent/component'
require 'tamashii/agent/handler'
+require 'tamashii/client'
+
module Tamashii
module Agent
class Connection < Component
+ autoload :RequestObserver, 'tamashii/agent/connection/request_observer'
+
class RequestTimeoutError < RuntimeError; end
- class RequestObserver
- include Common::Loggable
- def initialize(connection, id, ev_type, ev_body, future)
- @connection = connection
- @id = id
- @ev_type = ev_type
- @ev_body = ev_body
- @future = future
- end
-
- def update(time, ev_data, reason)
- if @future.fulfilled?
- res_ev_type = ev_data[:ev_type]
- res_ev_body = ev_data[:ev_body]
- case res_ev_type
- when Type::RFID_RESPONSE_JSON
- logger.debug "Handled: #{res_ev_type}: #{res_ev_body}"
- @connection.handle_card_result(JSON.parse(res_ev_body))
- else
- logger.warn "Unhandled packet result: #{res_ev_type}: #{res_ev_body}"
- end
- else
- logger.error "#{@id} Failed with #{reason}"
- @connection.on_request_timeout(@ev_type, @ev_body)
- end
- end
- end
-
include AASM
aasm do
state :init, initial: true
- state :connecting
state :auth_pending
state :ready
- event :connect do
- transitions from: :init, to: :connecting, after: Proc.new { logger.info "Start connecting" }
- end
-
event :auth_request do
- transitions from: :connecting, to: :auth_pending, after: Proc.new { logger.info "Sending authentication request" }
+ transitions from: :init, to: :auth_pending, after: Proc.new { logger.info "Sending authentication request" }
end
event :auth_success do
transitions from: :auth_pending, to: :ready, after: Proc.new { logger.info "Authentication finished. Tag = #{@tag}" }
end
@@ -76,28 +43,23 @@
attr_reader :url
attr_reader :master
def initialize(master)
super
- @host = @master.host
- @port = @master.port
- @url = "#{Config.use_ssl ? "wss" : "ws"}://#{@host}:#{@port}/#{Config.entry_point}"
+
self.reset
+ @client = Tamashii::Client::Base.new
@tag = 0
@future_ivar_pool = Concurrent::Map.new
- @driver_lock = Mutex.new
@last_error_report_time = Time.now
+ setup_callbacks
setup_resolver
end
- def create_selector
- @selector = NIO::Selector.new
- end
-
def setup_resolver
env_data = {connection: self}
Resolver.config do
[Type::REBOOT, Type::POWEROFF, Type::RESTART, Type::UPDATE].each do |type|
handle type, Handler::System, env_data
@@ -125,129 +87,57 @@
end
end
def try_send_request(ev_type, ev_body)
if self.ready?
- @driver_lock.synchronize do
- @driver.binary(Packet.new(ev_type, @tag, ev_body).dump)
- end
+ @client.transmit(Packet.new(ev_type, @tag, ev_body).dump)
true
else
false
end
end
def stop_threads
super
- @websocket_thr.exit if @websocket_thr
- @websocket_thr = nil
+ @client.close
end
- def run
- super
- @websocket_thr = Thread.start { run_websocket_loop }
- end
- def run_websocket_loop
- create_selector
- loop do
- ready = @selector.select(1)
- ready.each { |m| m.value.call } if ready
- if @io.nil?
- @io = try_create_socket
- if @io
- # socket io opened
- register_socket_io
- # start ws
- start_web_driver
- end
- end
- end
- end
-
def send_auth_request
# TODO: other types of auth
- @driver.binary(Packet.new(Type::AUTH_TOKEN, 0, [Type::CLIENT[:agent], @master.serial_number,Config.token].join(",")).dump)
+ if @client.transmit(Packet.new(Type::AUTH_TOKEN, 0, [Type::CLIENT[:agent], @master.serial_number,Config.token].join(",")).dump)
+ logger.debug "Auth sent!"
+ else
+ logger.error "Cannot sent auth request!"
+ end
end
- def start_web_driver
- # TODO: Improve below code
- @driver = WebSocket::Driver.client(self)
- @driver.on :open, proc { |e|
+ def setup_callbacks
+ @client.on :open, proc {
logger.info "Server opened"
self.auth_request
send_auth_request
}
- @driver.on :close, proc { |e|
- logger.info "Server closed"
- close_socket_io
+ @client.on :close, proc {
+ # Note: this only called when normally receive the WS close message
+ logger.info "Server closed normally"
+ }
+ @client.on :socket_closed, proc {
+ # Note: called when low-level IO is closed
+ logger.info "Server socket closed"
self.reset
}
- @driver.on :message, proc { |e|
- pkt = Packet.load(e.data)
+ @client.on :message, proc { |data|
+ pkt = Packet.load(data)
process_packet(pkt) if pkt
}
- @driver.on :error, proc { |e|
+ @client.on :error, proc { |e|
logger.error("#{e.message}")
}
- @driver.start
- self.connect
end
- def register_socket_io
- _monitor = @selector.register(@io, :r)
- _monitor.value = proc do
- begin
- msg = @io.read_nonblock(4096, exception: false)
- next if msg == :wait_readable
- if msg.nil?
- # socket closed
- logger.info "No message received from server. Connection reset"
- close_socket_io
- self.reset
- sleep 1
- else
- @driver.parse(msg)
- end
- rescue => e
- logger.error "#{e.message}"
- logger.debug "Backtrace:"
- e.backtrace.each {|msg| logger.debug msg}
- end
- end
- end
- def try_create_socket
- logger.info "try to open socket..."
- if Time.now - @last_error_report_time > 5.0
- @master.send_event(Event.new(Event::LCD_MESSAGE, "Initializing\nConnection..."))
- @last_error_report_time = Time.now
- end
- if Config.use_ssl
- OpenSSL::SSL::SSLSocket.new(TCPSocket.new(@host, @port)).connect
- else
- TCPSocket.new(@host, @port)
- end
- rescue
- nil
- end
-
- def close_socket_io
- logger.info "Socket IO Closed and Deregistered"
- @selector.deregister(@io)
- @io.close
- @io = nil
- end
-
- def write(string)
- @io.write(string)
- rescue
- logger.error "Write Error"
- close_socket_io
- self.reset
- end
-
def process_packet(pkt)
if self.auth_pending?
if pkt.type == Type::AUTH_RESPONSE
if pkt.body == Packet::STRING_TRUE
@tag = pkt.tag
@@ -341,19 +231,9 @@
create_request_async(id, ev_type, ev_body)
logger.debug "Request created: #{id}"
else
logger.warn "Duplicated id: #{id}, ignored"
end
- end
-
- def clean_up
- super
- if @io
- @driver.close
- close_socket_io
- end
- rescue => e
- logger.warn "Error occured when clean up: #{e.to_s}"
end
# When data is back
def handle_remote_response(ev_type, wrapped_ev_body)
logger.debug "Remote packet back: #{ev_type} #{wrapped_ev_body}"