require 'protobuf/rpc/servers/zmq/util' require 'protobuf/rpc/servers/zmq/worker' require 'protobuf/rpc/servers/zmq/broker' require 'protobuf/rpc/dynamic_discovery.pb' require 'securerandom' require 'thread' module Protobuf module Rpc module Zmq class Server include ::Protobuf::Rpc::Zmq::Util DEFAULT_OPTIONS = { :beacon_interval => 5, :broadcast_beacons => false, :broadcast_busy => false, :zmq_inproc => true, } attr_accessor :options, :workers attr_reader :zmq_context def initialize(options) @options = DEFAULT_OPTIONS.merge(options) @workers = [] init_zmq_context init_beacon_socket if broadcast_beacons? init_shutdown_pipe rescue teardown raise end def add_worker @total_workers = total_workers + 1 end def all_workers_busy? workers.all? { |thread| !!thread[:busy] } end def backend_port options[:worker_port] || frontend_port + 1 end def backend_uri if inproc? "inproc://#{backend_ip}:#{backend_port}" else "tcp://#{backend_ip}:#{backend_port}" end end def beacon_interval [options[:beacon_interval].to_i, 1].max end def beacon_ip "255.255.255.255" end def beacon_port @beacon_port ||= options.fetch( :beacon_port, ::Protobuf::Rpc::ServiceDirectory.port, ).to_i end def beacon_uri "udp://#{beacon_ip}:#{beacon_port}" end def broadcast_beacons? !brokerless? && options[:broadcast_beacons] end def broadcast_busy? broadcast_beacons? && options[:broadcast_busy] end def broadcast_flatline flatline = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new( :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE, :server => to_proto, ) @beacon_socket.send(flatline.encode, 0) end def broadcast_heartbeat @last_beacon = Time.now.to_i heartbeat = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new( :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT, :server => to_proto, ) @beacon_socket.send(heartbeat.encode, 0) logger.debug { sign_message("sent heartbeat to #{beacon_uri}") } end def broadcast_heartbeat? Time.now.to_i >= next_beacon && broadcast_beacons? end def brokerless? !!options[:workers_only] end def busy_worker_count workers.count { |thread| !!thread[:busy] } end def frontend_ip @frontend_ip ||= resolve_ip(options[:host]) end alias_method :backend_ip, :frontend_ip def frontend_port options[:port] end def frontend_uri "tcp://#{frontend_ip}:#{frontend_port}" end def inproc? !!options[:zmq_inproc] end def maintenance_timeout next_maintenance - Time.now.to_i end def next_maintenance cycles = [next_reaping] cycles << next_beacon if broadcast_beacons? cycles.min end def minimum_timeout 0.1 end def next_beacon if @last_beacon.nil? 0 else @last_beacon + beacon_interval end end def next_reaping if @last_reaping.nil? 0 else @last_reaping + reaping_interval end end def reap_dead_workers @last_reaping = Time.now.to_i @workers.keep_if do |worker| worker.alive? || worker.join && false end end def reap_dead_workers? Time.now.to_i >= next_reaping end def reaping_interval 5 end def run @running = true yield if block_given? # runs on startup wait_for_shutdown_signal broadcast_flatline if broadcast_beacons? Thread.pass until reap_dead_workers.empty? @broker_thread.join unless brokerless? ensure @running = false teardown end def running? !!@running end def start_missing_workers missing_workers = total_workers - @workers.size if missing_workers > 0 missing_workers.times { start_worker } logger.debug { sign_message("#{total_workers} workers started") } end end def stop @running = false @shutdown_w.write('.') end def teardown @shutdown_r.try(:close) @shutdown_w.try(:close) @beacon_socket.try(:close) @zmq_context.try(:terminate) @last_reaping = @last_beacon = @timeout = nil end def timeout if @timeout.nil? @timeout = 0 else @timeout = [minimum_timeout, maintenance_timeout].max end end def total_workers @total_workers ||= [@options[:threads].to_i, 1].max end def to_proto @proto ||= ::Protobuf::Rpc::DynamicDiscovery::Server.new( :uuid => uuid, :address => frontend_ip, :port => frontend_port.to_s, :ttl => (beacon_interval * 1.5).ceil, :services => ::Protobuf::Rpc::Service.implemented_services, ) end def uuid @uuid ||= SecureRandom.uuid end def wait_for_shutdown_signal loop do break if IO.select([@shutdown_r], nil, nil, timeout) start_broker unless brokerless? reap_dead_workers if reap_dead_workers? start_missing_workers next unless broadcast_heartbeat? if broadcast_busy? && all_workers_busy? broadcast_flatline else broadcast_heartbeat end end end private def init_beacon_socket @beacon_socket = UDPSocket.new @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_BROADCAST, true) @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEADDR, true) if defined?(::Socket::SO_REUSEPORT) @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEPORT, true) end @beacon_socket.bind(frontend_ip, beacon_port) @beacon_socket.connect(beacon_ip, beacon_port) end def init_shutdown_pipe @shutdown_r, @shutdown_w = IO.pipe end def init_zmq_context @zmq_context = ZMQ::Context.new end def start_broker return if @broker && @broker.running? && !@broker_thread.stop? if @broker && !@broker.running? broadcast_flatline if broadcast_busy? @broker_thread.join if @broker_thread init_zmq_context # need a new context to restart the broker end @broker = ::Protobuf::Rpc::Zmq::Broker.new(self) @broker_thread = Thread.new(@broker) do |broker| begin broker.run rescue => e message = "Broker failed: #{e.inspect}\n #{e.backtrace.join($INPUT_RECORD_SEPARATOR)}" $stderr.puts(message) logger.error { message } end end end def start_worker @workers << Thread.new(self, @broker) do |server, broker| begin ::Protobuf::Rpc::Zmq::Worker.new(server, broker).run rescue => e message = "Worker failed: #{e.inspect}\n #{e.backtrace.join($INPUT_RECORD_SEPARATOR)}" $stderr.puts(message) logger.error { message } end end end end end end end