lib/angelo/stash.rb in angelo-0.4.1 vs lib/angelo/stash.rb in angelo-0.5.0
- old
+ new
@@ -1,11 +1,11 @@
module Angelo
# utility class for stashing connected websockets in arbitrary contexts
#
module Stash
- include Celluloid::Logger
+ include Celluloid::Internals::Logger
module ClassMethods
# the underlying arrays of websockets, by context
#
@@ -27,10 +27,11 @@
# create a new instance with a context, creating the array if needed
#
def initialize server, context = :default
raise ArgumentError.new "symbol required" unless Symbol === context
@context, @server = context, server
+ @mutex = Mutex.new
stashes[@context] ||= []
end
# add a websocket to this context's stash, save peeraddr info, start
# server handle_websocket task to read from the socket and fire events
@@ -49,11 +50,12 @@
# iterate on each connected websocket in this context, handling errors
# as needed
#
def each &block
- stash.dup.each do |s|
+ stash_dup = @mutex.synchronize { stash.dup }
+ stash_dup.each do |s|
begin
yield s
rescue Reel::SocketError, IOError, SystemCallError => e
debug "context: #{@context} - #{e.message}"
remove_socket s
@@ -67,10 +69,11 @@
def remove_socket s
s.close unless s.closed?
if stash.include? s
warn "removing socket from context ':#{@context}' (#{peeraddrs[s][2]})"
stash.delete s
+
peeraddrs.delete s
end
end
# utility method to create a new instance with a different context
@@ -81,15 +84,17 @@
# iterate on *every* connected websocket in all contexts, mostly used for
# ping_websockets task
#
def all_each
- stashes.values.flatten.each do |s|
+ all_stashed = @mutex.synchronize { stashes.values.flatten }
+ all_stashed.each do |s|
begin
yield s
rescue Reel::SocketError, IOError, SystemCallError => e
debug "all - #{e.message}"
remove_socket s
+ stashes.values.each {|_s| _s.delete s}
end
end
end
# pass the given block to the underlying stashed array's reject! method