lib/tamashii/agent/connection.rb in tamashii-agent-0.1.11 vs lib/tamashii/agent/connection.rb in tamashii-agent-0.2.0
- old
+ new
@@ -1,21 +1,55 @@
require 'socket'
require 'websocket/driver'
require 'aasm'
require 'openssl'
+require 'json'
+require 'concurrent'
+require 'nio'
require 'tamashii/common'
require 'tamashii/agent/config'
+require 'tamashii/agent/event'
require 'tamashii/agent/component'
-require 'tamashii/agent/request_pool'
require 'tamashii/agent/handler'
module Tamashii
module Agent
class Connection < Component
+
+ class RequestTimeoutError < RuntimeError; end
+
+ class RequestObserver
+ include Common::Loggable
+ def initialize(connection, id, ev_type, ev_body, future)
+ @connection = connection
+ @id = id
+ @ev_type = ev_type
+ @ev_body = ev_body
+ @future = future
+ end
+
+ def update(time, ev_data, reason)
+ if @future.fulfilled?
+ res_ev_type = ev_data[:ev_type]
+ res_ev_body = ev_data[:ev_body]
+ case res_ev_type
+ when Type::RFID_RESPONSE_JSON
+ logger.debug "Handled: #{res_ev_type}: #{res_ev_body}"
+ @connection.handle_card_result(JSON.parse(res_ev_body))
+ else
+ logger.warn "Unhandled packet result: #{res_ev_type}: #{res_ev_body}"
+ end
+ else
+ logger.error "#{@id} Failed with #{reason}"
+ @connection.on_request_timeout(@ev_type, @ev_body)
+ end
+ end
+ end
+
include AASM
aasm do
state :init, initial: true
state :connecting
@@ -39,11 +73,10 @@
end
end
attr_reader :url
attr_reader :master
- attr_reader :request_pool
def initialize(master, host, port)
super()
@master = master
@url = "#{Config.use_ssl ? "wss" : "ws"}://#{host}:#{port}/#{Config.entry_point}"
@@ -51,61 +84,74 @@
@host = host
@port = port
@tag = 0
- @request_pool = RequestPool.new
- @request_pool.set_handler(:request_timedout, method(:handle_request_timedout))
- @request_pool.set_handler(:request_meet, method(:handle_request_meet))
- @request_pool.set_handler(:send_request, method(:handle_send_request))
+ @future_ivar_pool = Concurrent::Map.new
+ @driver_lock = Mutex.new
+ setup_resolver
+ end
+
+ def create_selector
+ @selector = NIO::Selector.new
+ end
+
+ def setup_resolver
env_data = {connection: self}
Resolver.config do
[Type::REBOOT, Type::POWEROFF, Type::RESTART, Type::UPDATE].each do |type|
handle type, Handler::System, env_data
end
+ [Type::LCD_MESSAGE, Type::LCD_SET_IDLE_TEXT].each do |type|
+ handle type, Handler::LCD, env_data
+ end
handle Type::BUZZER_SOUND, Handler::Buzzer, env_data
- handle Type::RFID_RESPONSE_JSON, Handler::RequestPoolResponse, env_data
+ handle Type::RFID_RESPONSE_JSON, Handler::RemoteResponse, env_data
end
end
- def handle_request_timedout(req)
- @master.send_event(EVENT_CONNECTION_NOT_READY, "Connection not ready for #{req.ev_type}:#{req.ev_body}")
+ def on_request_timeout(ev_type, ev_body)
+ @master.send_event(Event.new(Event::CONNECTION_NOT_READY, "Connection not ready for #{ev_type}:#{ev_body}"))
end
- def handle_request_meet(req, res)
- logger.debug "Got packet: #{res.ev_type}: #{res.ev_body}"
- case res.ev_type
- when Type::RFID_RESPONSE_JSON
- json = JSON.parse(res.ev_body)
- handle_card_result(json)
- else
- logger.warn "Unhandled packet result: #{res.ev_type}: #{res.ev_body}"
- end
- end
-
def handle_card_result(result)
if result["auth"]
- @master.send_event(EVENT_BEEP, "ok")
+ @master.send_event(Event.new(Event::BEEP, "ok"))
else
- @master.send_event(EVENT_BEEP, "no")
+ @master.send_event(Event.new(Event::BEEP, "no"))
end
+ if result["message"]
+ @master.send_event(Event.new(Event::LCD_MESSAGE, result["message"]))
+ end
end
- def handle_send_request(req)
+ def try_send_request(ev_type, ev_body)
if self.ready?
- @driver.binary(Packet.new(req.ev_type, @tag, req.wrap_body).dump)
+ @driver_lock.synchronize do
+ @driver.binary(Packet.new(ev_type, @tag, ev_body).dump)
+ end
true
else
false
end
end
- # override
- def worker_loop
+ def stop_threads
+ super
+ @websocket_thr.exit if @websocket_thr
+ @websocket_thr = nil
+ end
+
+ def run
+ super
+ @websocket_thr = Thread.start { run_websocket_loop }
+ end
+
+ def run_websocket_loop
+ create_selector
loop do
- @request_pool.update
ready = @selector.select(1)
ready.each { |m| m.value.call } if ready
if @io.nil?
@io = try_create_socket
if @io
@@ -216,25 +262,103 @@
logger.debug "Tag mismatch packet: tag: #{pkt.tag}, type: #{pkt.type}"
end
end
end
- def process_event(ev_type, ev_body)
- case ev_type
- when EVENT_CARD_DATA
- req = RequestPool::Request.new(Type::RFID_NUMBER , ev_body, ev_body)
- @request_pool.add_request(req)
+ # override
+ def process_event(event)
+ case event.type
+ when Event::CARD_DATA
+ id = event.body
+ wrapped_body = {
+ id: id,
+ ev_body: event.body
+ }.to_json
+ new_remote_request(id, Type::RFID_NUMBER, wrapped_body)
end
end
+ def schedule_task_runner(id, ev_type, ev_body, start_time, times)
+ logger.debug "Schedule send attemp #{id} : #{times + 1} time(s)"
+ if try_send_request(ev_type, ev_body)
+ # Request sent, do nothing
+ logger.debug "Request sent for id = #{id}"
+ else
+ if Time.now - start_time < Config.connection_timeout
+ # Re-schedule self
+ logger.warn "Reschedule #{id} after 1 sec"
+ schedule_next_task(1, id, ev_type, ev_body, start_time, times + 1)
+ else
+ # This job is expired. Do nothing
+ logger.warn "Abort scheduling #{id}"
+ end
+ end
+ end
+
+ def schedule_next_task(interval, id, ev_type, ev_body, start_time, times)
+ Concurrent::ScheduledTask.execute(interval, args: [id, ev_type, ev_body, start_time, times], &method(:schedule_task_runner))
+ end
+
+ def create_request_scheduler_task(id, ev_type, ev_body)
+ schedule_next_task(0, id, ev_type, ev_body, Time.now, 0)
+ end
+
+ def create_request_async(id, ev_type, ev_body)
+ req = Concurrent::Future.new do
+ # Create IVar for store result
+ ivar = Concurrent::IVar.new
+ @future_ivar_pool[id] = ivar
+ # Schedule to get the result
+ create_request_scheduler_task(id, ev_type, ev_body)
+ # Wait for result
+ if result = ivar.value(Config.connection_timeout)
+ # IVar is already removed from pool
+ result
+ else
+ # Manually remove IVar
+ # Any fulfill at this point is useless
+ logger.error "Timeout when getting IVar for #{id}"
+ @future_ivar_pool.delete(id)
+ raise RequestTimeoutError, "Request Timeout"
+ end
+ end
+ req.add_observer(RequestObserver.new(self, id, ev_type, ev_body, req))
+ req.execute
+ req
+ end
+
+ def new_remote_request(id, ev_type, ev_body)
+ # enqueue if not exists
+ if !@future_ivar_pool[id]
+ create_request_async(id, ev_type, ev_body)
+ logger.debug "Request created: #{id}"
+ else
+ logger.warn "Duplicated id: #{id}, ignored"
+ end
+ end
+
def clean_up
super
if @io
@driver.close
close_socket_io
end
rescue => e
logger.warn "Error occured when clean up: #{e.to_s}"
+ end
+
+ # When data is back
+ def handle_remote_response(ev_type, wrapped_ev_body)
+ logger.debug "Remote packet back: #{ev_type} #{wrapped_ev_body}"
+ result = JSON.parse(wrapped_ev_body)
+ id = result["id"]
+ ev_body = result["ev_body"]
+ # fetch ivar and delete it
+ if ivar = @future_ivar_pool.delete(id)
+ ivar.set(ev_type: ev_type, ev_body: ev_body)
+ else
+ logger.warn "IVar #{id} not in pool"
+ end
end
end
end
end