lib/angelo/stash.rb in angelo-0.4.1 vs lib/angelo/stash.rb in angelo-0.5.0

- old
+ new

@@ -1,11 +1,11 @@ module Angelo # utility class for stashing connected websockets in arbitrary contexts # module Stash - include Celluloid::Logger + include Celluloid::Internals::Logger module ClassMethods # the underlying arrays of websockets, by context # @@ -27,10 +27,11 @@ # create a new instance with a context, creating the array if needed # def initialize server, context = :default raise ArgumentError.new "symbol required" unless Symbol === context @context, @server = context, server + @mutex = Mutex.new stashes[@context] ||= [] end # add a websocket to this context's stash, save peeraddr info, start # server handle_websocket task to read from the socket and fire events @@ -49,11 +50,12 @@ # iterate on each connected websocket in this context, handling errors # as needed # def each &block - stash.dup.each do |s| + stash_dup = @mutex.synchronize { stash.dup } + stash_dup.each do |s| begin yield s rescue Reel::SocketError, IOError, SystemCallError => e debug "context: #{@context} - #{e.message}" remove_socket s @@ -67,10 +69,11 @@ def remove_socket s s.close unless s.closed? if stash.include? s warn "removing socket from context ':#{@context}' (#{peeraddrs[s][2]})" stash.delete s + peeraddrs.delete s end end # utility method to create a new instance with a different context @@ -81,15 +84,17 @@ # iterate on *every* connected websocket in all contexts, mostly used for # ping_websockets task # def all_each - stashes.values.flatten.each do |s| + all_stashed = @mutex.synchronize { stashes.values.flatten } + all_stashed.each do |s| begin yield s rescue Reel::SocketError, IOError, SystemCallError => e debug "all - #{e.message}" remove_socket s + stashes.values.each {|_s| _s.delete s} end end end # pass the given block to the underlying stashed array's reject! method