lib/vines/stream.rb in vines-0.1.0 vs lib/vines/stream.rb in vines-0.1.1
- old
+ new
@@ -8,17 +8,16 @@
include Vines::Log
ERROR = 'error'.freeze
PAD = 20
+ attr_reader :domain
attr_accessor :user
def post_init
router << self
- @remote_addr, @local_addr = [get_peername, get_sockname].map do |addr|
- addr ? Socket.unpack_sockaddr_in(addr)[0, 2].reverse.join(':') : 'unknown'
- end
+ @remote_addr, @local_addr = addresses
@user, @closed, @stanza_size = nil, false, 0
@bucket = TokenBucket.new(100, 10)
@store = Store.new
@nodes = EM::Queue.new
@@ -47,24 +46,14 @@
else
error(StreamErrors::PolicyViolation.new('max stanza size reached'))
end
end
- # Send the stanza to all recipients, stamping it with from and
- # to addresses first.
- def broadcast(stanza, recipients)
- stanza['from'] = @user.jid.to_s
- recipients.each do |recipient|
- stanza['to'] = recipient.user.jid.to_s
- recipient.write(stanza)
- end
- end
-
# Returns the storage system for the domain. If no domain is given,
# the stream's storage mechanism is returned.
- def storage(domain=@domain)
- @config.vhosts[domain]
+ def storage(domain=nil)
+ @config.vhosts[domain || self.domain]
end
# Reload the user's information into their active connections. Call this
# after storage.save_user() to sync the new user state with their other
# connections.
@@ -112,10 +101,12 @@
log.info { "%s %21s -> %s" %
['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] }
log.info { "Streams connected: #{router.size}" }
end
+ # Advance the stream's state machine to the new state. XML nodes received
+ # by the stream will be passed to this state's +node+ method.
def advance(state)
@state = state
end
# Stream level errors close the stream while stanza and SASL errors are
@@ -124,25 +115,40 @@
def error(e)
case e
when SaslError, StanzaError
write(e.to_xml)
when StreamError
- write(e.to_xml)
+ send_stream_error(e)
close_stream
else
log.error(e)
- write(StreamErrors::InternalServerError.new.to_xml)
+ send_stream_error(StreamErrors::InternalServerError.new)
close_stream
end
end
def router
Router.instance
end
private
+ # Return the remote and local socket addresses used by this connection.
+ def addresses
+ [get_peername, get_sockname].map do |addr|
+ addr ? Socket.unpack_sockaddr_in(addr)[0, 2].reverse.join(':') : 'unknown'
+ end
+ end
+
+ # Write the StreamError's xml to the stream. Subclasses can override
+ # this method with custom error writing behavior.
+ def send_stream_error(e)
+ write(e.to_xml)
+ end
+
+ # Write a closing stream tag to the stream then close the stream. Subclasses
+ # can override this method for custom close behavior.
def close_stream
write('</stream:stream>')
close_connection_after_writing
end
@@ -168,11 +174,11 @@
@stanza_size = 0
enforce_rate_limit
if error?(node)
close_stream
else
- @state.node(node)
+ state.node(node)
end
rescue Exception => e
error(e)
end
@@ -189,10 +195,16 @@
label = (direction == :out) ? 'Sent' : 'Received'
log.debug("%s %21s -> %s\n%s\n" %
["#{label} stanza:".ljust(PAD), from, to, node])
end
+ # Returns the current state of the stream's state machine. Provided as a
+ # method so subclasses can override the behavior.
+ def state
+ @state
+ end
+
def tls_files
- %w[crt key].map {|ext| File.join(VINES_ROOT, 'conf', 'certs', "#{@domain}.#{ext}") }
+ %w[crt key].map {|ext| File.join(VINES_ROOT, 'conf', 'certs', "#{domain}.#{ext}") }
end
end
end