Sha256: 054f9d531911563ecdcf0b52b05c41c0ce15b0e150664d0420d88759f342e685
Contents?: true
Size: 811 Bytes
Versions: 1
Compression:
Stored size: 811 Bytes
Contents
module Kafkr class MessageBroker attr_accessor :last_sent, :subscribers def initialize @subscribers = [] @last_sent = {} end def add_subscriber(socket) @subscribers << socket @last_sent[socket] = nil end def broadcast(message) encrypted_message = Kafkr::Encryptor.new.encrypt(message) @subscribers.each do |subscriber| if !subscriber.closed? subscriber.puts(encrypted_message) @last_sent[subscriber] = encrypted_message end rescue Errno::EPIPE # Optionally, handle broken pipe error rescue IOError begin @subscribers.delete(subscriber) @last_sent.delete(subscriber) rescue puts "clean up subscribers" end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
kafkr-0.18.0 | lib/kafkr/message_broker.rb |