Sha256: 02f29ce36df5a1c399a83cfbae9b15f98d40b14c3347f7b99a1c3c5231b45c76

Contents?: true

Size: 1.99 KB

Versions: 12

Compression:

Stored size: 1.99 KB

Contents

require 'celluloid/io'

module Listen
  module TCP
    class Broadcaster
      include Celluloid::IO

      finalizer :finalize

      # Initializes a Celluloid::IO-powered TCP-broadcaster
      #
      # @param [String] host to broadcast on
      # @param [String] port to broadcast on
      #
      # Note: Listens on all addresses when host is nil
      #
      def initialize(host, port)
        @sockets = []
        _log :debug, format('Broadcaster: tcp server listening on: %s:%s',
                            host, port)
        @server = TCPServer.new(host, port)
      rescue
        _log :error, format('Broadcaster.initialize: %s:%s', $ERROR_INFO,
                            $ERROR_POSITION * "\n")
        raise
      end

      # Asynchronously start accepting connections
      def start
        async.run
      end

      # Cleans up sockets and server
      def finalize
        @sockets.map(&:close) if @sockets
        @sockets = nil

        return unless @server
        @server.close
        @server = nil
      end

      # Broadcasts given payload to all connected sockets
      def broadcast(payload)
        active_sockets = @sockets.select do |socket|
          _unicast(socket, payload)
        end
        @sockets.replace(active_sockets)
      end

      # Continuously accept and handle incoming connections
      def run
        while (socket = @server.accept)
          @sockets << socket
        end
      rescue Celluloid::Task::TerminatedError
        _log :debug, "TCP adapter was terminated: #{$ERROR_INFO}"
      rescue
        _log :error, format('Broadcaster.run: %s:%s', $ERROR_INFO,
                            $ERROR_POSITION * "\n")
        raise
      end

      private

      def _log(type, message)
        Celluloid::Logger.send(type, message)
      end

      def _unicast(socket, payload)
        socket.write(payload)
        true
      rescue IOError, Errno::ECONNRESET, Errno::EPIPE
        _log :debug, "Broadcaster failed: #{socket.inspect}"
        false
      end
    end
  end
end

Version data entries

12 entries across 12 versions & 2 rubygems

Version Path
listen-2.10.1 lib/listen/tcp/broadcaster.rb
vagrant-cloudstack-1.1.0 vendor/bundle/gems/listen-2.8.6/lib/listen/tcp/broadcaster.rb
listen-2.10.0 lib/listen/tcp/broadcaster.rb
listen-2.9.0 lib/listen/tcp/broadcaster.rb
listen-2.8.6 lib/listen/tcp/broadcaster.rb
listen-2.8.5 lib/listen/tcp/broadcaster.rb
listen-2.8.4 lib/listen/tcp/broadcaster.rb
listen-2.8.3 lib/listen/tcp/broadcaster.rb
listen-2.8.2 lib/listen/tcp/broadcaster.rb
listen-2.8.1 lib/listen/tcp/broadcaster.rb
listen-2.8.0 lib/listen/tcp/broadcaster.rb
listen-2.7.12 lib/listen/tcp/broadcaster.rb