Sha256: 7d93390dc75736ab23610b16bea25937d4c1ab116525a155a55ee414a826b335
Contents?: true
Size: 1.78 KB
Versions: 8
Compression:
Stored size: 1.78 KB
Contents
require 'celluloid/io' module Listen module TCP class Broadcaster include Celluloid::IO finalizer :finalize # Initializes a Celluloid::IO-powered TCP-broadcaster # # @param [String] host to broadcast on # @param [String] port to broadcast on # # Note: Listens on all addresses when host is nil # def initialize(host, port) @sockets = [] @server = TCPServer.new(host, port) rescue _log :error, "Broadcaster.initialize: #{$!.inspect}:#{$@.join("\n")}" raise end # Asynchronously start accepting connections def start async.run end # Cleans up sockets and server def finalize @sockets.map(&:close) if @sockets @sockets = nil return unless @server @server.close @server = nil end # Broadcasts given payload to all connected sockets def broadcast(payload) active_sockets = @sockets.select do |socket| _unicast(socket, payload) end @sockets.replace(active_sockets) end # Continuously accept and handle incoming connections def run while socket = @server.accept @sockets << socket end rescue Celluloid::Task::TerminatedError _log :debug, "TCP adapter was terminated: #{$!.inspect}" rescue _log :error, "Broadcaster.run: #{$!.inspect}:#{$@.join("\n")}" raise end private def _log(type, message) Celluloid.logger.send(type, message) end def _unicast(socket, payload) socket.write(payload) true rescue IOError, Errno::ECONNRESET, Errno::EPIPE _log :debug, "Broadcaster failed: #{socket.inspect}" false end end end end
Version data entries
8 entries across 6 versions & 3 rubygems