Sha256: 054f9d531911563ecdcf0b52b05c41c0ce15b0e150664d0420d88759f342e685

Contents?: true

Size: 811 Bytes

Versions: 1

Compression:

Stored size: 811 Bytes

Contents

module Kafkr
  class MessageBroker
    attr_accessor :last_sent, :subscribers

    def initialize
      @subscribers = []
      @last_sent = {}
    end

    def add_subscriber(socket)
      @subscribers << socket
      @last_sent[socket] = nil
    end

    def broadcast(message)
      encrypted_message = Kafkr::Encryptor.new.encrypt(message) 
      @subscribers.each do |subscriber|
        if !subscriber.closed?
          subscriber.puts(encrypted_message)
          @last_sent[subscriber] = encrypted_message
        end
      rescue Errno::EPIPE
        # Optionally, handle broken pipe error
      rescue IOError
        begin
           @subscribers.delete(subscriber)
          @last_sent.delete(subscriber)
        rescue
          puts "clean up subscribers"
        end

      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
kafkr-0.18.0 lib/kafkr/message_broker.rb