lib/vines/stream.rb in vines-0.1.0 vs lib/vines/stream.rb in vines-0.1.1

- old
+ new

@@ -8,17 +8,16 @@ include Vines::Log ERROR = 'error'.freeze PAD = 20 + attr_reader :domain attr_accessor :user def post_init router << self - @remote_addr, @local_addr = [get_peername, get_sockname].map do |addr| - addr ? Socket.unpack_sockaddr_in(addr)[0, 2].reverse.join(':') : 'unknown' - end + @remote_addr, @local_addr = addresses @user, @closed, @stanza_size = nil, false, 0 @bucket = TokenBucket.new(100, 10) @store = Store.new @nodes = EM::Queue.new @@ -47,24 +46,14 @@ else error(StreamErrors::PolicyViolation.new('max stanza size reached')) end end - # Send the stanza to all recipients, stamping it with from and - # to addresses first. - def broadcast(stanza, recipients) - stanza['from'] = @user.jid.to_s - recipients.each do |recipient| - stanza['to'] = recipient.user.jid.to_s - recipient.write(stanza) - end - end - # Returns the storage system for the domain. If no domain is given, # the stream's storage mechanism is returned. - def storage(domain=@domain) - @config.vhosts[domain] + def storage(domain=nil) + @config.vhosts[domain || self.domain] end # Reload the user's information into their active connections. Call this # after storage.save_user() to sync the new user state with their other # connections. @@ -112,10 +101,12 @@ log.info { "%s %21s -> %s" % ['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] } log.info { "Streams connected: #{router.size}" } end + # Advance the stream's state machine to the new state. XML nodes received + # by the stream will be passed to this state's +node+ method. def advance(state) @state = state end # Stream level errors close the stream while stanza and SASL errors are @@ -124,25 +115,40 @@ def error(e) case e when SaslError, StanzaError write(e.to_xml) when StreamError - write(e.to_xml) + send_stream_error(e) close_stream else log.error(e) - write(StreamErrors::InternalServerError.new.to_xml) + send_stream_error(StreamErrors::InternalServerError.new) close_stream end end def router Router.instance end private + # Return the remote and local socket addresses used by this connection. + def addresses + [get_peername, get_sockname].map do |addr| + addr ? Socket.unpack_sockaddr_in(addr)[0, 2].reverse.join(':') : 'unknown' + end + end + + # Write the StreamError's xml to the stream. Subclasses can override + # this method with custom error writing behavior. + def send_stream_error(e) + write(e.to_xml) + end + + # Write a closing stream tag to the stream then close the stream. Subclasses + # can override this method for custom close behavior. def close_stream write('</stream:stream>') close_connection_after_writing end @@ -168,11 +174,11 @@ @stanza_size = 0 enforce_rate_limit if error?(node) close_stream else - @state.node(node) + state.node(node) end rescue Exception => e error(e) end @@ -189,10 +195,16 @@ label = (direction == :out) ? 'Sent' : 'Received' log.debug("%s %21s -> %s\n%s\n" % ["#{label} stanza:".ljust(PAD), from, to, node]) end + # Returns the current state of the stream's state machine. Provided as a + # method so subclasses can override the behavior. + def state + @state + end + def tls_files - %w[crt key].map {|ext| File.join(VINES_ROOT, 'conf', 'certs', "#{@domain}.#{ext}") } + %w[crt key].map {|ext| File.join(VINES_ROOT, 'conf', 'certs', "#{domain}.#{ext}") } end end end