java_import java.lang.System java_import java.nio.channels.Selector java_import java.util.concurrent.ConcurrentLinkedQueue require 'zmachine/hashed_wheel' require 'zmachine/connection_manager' module ZMachine class NotReactorOwner < Exception; end class NoReactorError < Exception; end class Reactor @mutex = Mutex.new def self.register_reactor(reactor) @mutex.synchronize do @reactors ||= [] @reactors << reactor end end def self.terminate_all_reactors @mutex.synchronize do @reactors.each(&:stop_event_loop) @reactors.clear end end def self.unregister_reactor(reactor) @mutex.synchronize do @reactors.delete(reactor) end end def initialize @heartbeat_interval = ZMachine.heartbeat_interval || 0.5 # coarse grained by default @next_tick_queue = ConcurrentLinkedQueue.new @running = false @shutdown_hooks = [] # a 10 ms tick wheel with 512 slots => ~5s for a round @wheel = HashedWheel.new(512, 10) end def add_shutdown_hook(&block) @shutdown_hooks << block end def add_timer(*args, &block) check_reactor_thread interval = args.shift callback = args.shift || block ZMachine.logger.debug("zmachine:reactor:#{__method__}", interval: interval, callback: callback) if ZMachine.debug return unless callback @wheel.add((interval * 1000).to_i, &callback) end def bind(server, port_or_type=nil, handler=nil, *args, &block) ZMachine.logger.debug("zmachine:reactor:#{__method__}", server: server, port_or_type: port_or_type) if ZMachine.debug check_reactor_thread @connection_manager.bind(server, port_or_type, handler, *args, &block) end def close_connection(connection, after_writing = false, reason = nil) return true unless @connection_manager @connection_manager.close_connection(connection, after_writing, reason) end def connect(server, port_or_type=nil, handler=nil, *args, &block) ZMachine.logger.debug("zmachine:reactor:#{__method__}", server: server, port_or_type: port_or_type) if ZMachine.debug check_reactor_thread @connection_manager.connect(server, port_or_type, handler, *args, &block) end def connections @connection_manager.connections end def heartbeat_interval @heartbeat_interval end def heartbeat_interval=(value) value = 0.01 if value < 0.01 @heartbeat_interval = value end def next_tick(callback=nil, &block) @next_tick_queue << (callback || block) wakeup if running? end def reconnect(server, port_or_type, handler) return handler if handler && handler.channel.is_a?(ZMQChannel) ZMachine.logger.debug("zmachine:reactor:#{__method__}", server: server, port_or_type: port_or_type) if ZMachine.debug connect(server, port_or_type, handler) end def run(callback=nil, shutdown_hook=nil, &block) ZMachine.logger.debug("zmachine:reactor:#{__method__}") if ZMachine.debug add_shutdown_hook(shutdown_hook) if shutdown_hook begin Reactor.register_reactor(self) @running = true if callback = (callback || block) add_timer(0) { callback.call(self) } end @selector = Selector.open @connection_manager = ConnectionManager.new(@selector) @run_reactor = true run_reactor while @run_reactor ensure ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :selector) if ZMachine.debug @selector.close rescue nil @selector = nil ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :connections) if ZMachine.debug @connection_manager.shutdown ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :shutdown_hooks) if ZMachine.debug @shutdown_hooks.pop.call until @shutdown_hooks.empty? @next_tick_queue = ConcurrentLinkedQueue.new @running = false Reactor.unregister_reactor(self) ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :zcontext) if ZMachine.debug ZMachine.reactor = nil end end def run_reactor ZMachine.logger.debug("zmachine:reactor:#{__method__}") if ZMachine.debug run_deferred_callbacks return unless @run_reactor @wheel.advance return unless @run_reactor @connection_manager.cleanup if @connection_manager.idle? ZMachine.logger.debug("zmachine:reactor:#{__method__}", select: @heartbeat_interval) if ZMachine.debug @selector.select(@heartbeat_interval * 1000) else ZMachine.logger.debug("zmachine:reactor:#{__method__}", select: :now) if ZMachine.debug @selector.select_now end @connection_manager.process end def running? @running end def stop_event_loop @run_reactor = false @connection_manager.shutdown wakeup end def stop_server(channel) channel.close end private def check_reactor_thread raise NoReactorError if !Thread.current[:reactor] raise NotReactorOwner if Thread.current[:reactor] != self end def run_deferred_callbacks ZMachine.logger.debug("zmachine:reactor:#{__method__}") if ZMachine.debug while callback = @next_tick_queue.poll callback.call end end def wakeup ZMachine.logger.debug("zmachine:reactor:#{__method__}") if ZMachine.debug @selector.wakeup if @selector end end end