lib/angelo/stash.rb in angelo-0.1.14 vs lib/angelo/stash.rb in angelo-0.1.15
- old
+ new
@@ -1,65 +1,78 @@
module Angelo
# utility class for stashing connected websockets in arbitrary contexts
#
- class Stash
+ module Stash
include Celluloid::Logger
- # hold the peeraddr info for use after the socket is closed (logging)
- #
- @@peeraddrs = {}
+ module ClassMethods
- # the underlying arrays of websockets, by context
- #
- @@stashes = {}
+ # the underlying arrays of websockets, by context
+ #
+ def stashes
+ @stashes ||= {}
+ end
+ # hold the peeraddr info for use after the socket is closed (logging)
+ #
+ def peeraddrs
+ @peeraddrs ||= {}
+ end
+
+ end
+
+ def stashes; self.class.stashes; end
+ def peeraddrs; self.class.peeraddrs; end
+
# create a new instance with a context, creating the array if needed
#
def initialize server, context = :default
raise ArgumentError.new "symbol required" unless Symbol === context
@context, @server = context, server
- @@stashes[@context] ||= []
+ stashes[@context] ||= []
end
# add a websocket to this context's stash, save peeraddr info, start
# server handle_websocket task to read from the socket and fire events
# as needed
#
- def << ws
- @@peeraddrs[ws] = ws.peeraddr
- @server.async.handle_websocket ws
- @@stashes[@context] << ws
+ def << s
+ peeraddrs[s] = s.peeraddr
+ yield if block_given?
+ stashes[@context] << s
end
# access the underlying array of this context
#
def stash
- @@stashes[@context]
+ stashes[@context]
end
# iterate on each connected websocket in this context, handling errors
# as needed
#
def each &block
- stash.each do |ws|
+ stash.each do |s|
begin
- yield ws
+ yield s
rescue Reel::SocketError, IOError, SystemCallError => e
debug e.message
- remove_socket ws
+ remove_socket s
end
end
+ nil
end
- # remove a websocket from the stash, warn user, drop peeraddr info
+ # remove a socket from the stash, warn user, drop peeraddr info
#
- def remove_socket ws
- if stash.include? ws
- warn "removing socket from context ':#{@context}' (#{@@peeraddrs[ws][2]})"
- stash.delete ws
- @@peeraddrs.delete ws
+ def remove_socket s
+ s.close unless s.closed?
+ if stash.include? s
+ warn "removing socket from context ':#{@context}' (#{peeraddrs[s][2]})"
+ stash.delete s
+ peeraddrs.delete s
end
end
# utility method to create a new instance with a different context
#
@@ -69,16 +82,16 @@
# iterate on *every* connected websocket in all contexts, mostly used for
# ping_websockets task
#
def all_each
- @@stashes.values.flatten.each do |ws|
+ stashes.values.flatten.each do |s|
begin
- yield ws
+ yield s
rescue Reel::SocketError, IOError, SystemCallError => e
debug e.message
- remove_socket ws
+ remove_socket s
end
end
end
# pass the given block to the underlying stashed array's reject! method
@@ -87,18 +100,48 @@
stash.reject! &block
end
# access the peeraddr info for a given websocket
#
- def peeraddr ws
- @@peeraddrs[ws]
+ def peeraddr s
+ peeraddrs[s]
end
# return the number of websockets in this context (some are potentially
# disconnected)
#
def length
stash.length
end
+ class Websocket
+ extend Stash::ClassMethods
+ include Stash
+
+ def << ws
+ super do
+ @server.async.handle_websocket ws
+ end
+ end
+
+ end
+
+ class SSE
+ extend Stash::ClassMethods
+ include Stash
+
+ def event data
+ raise ArgumentError.new 'use #message method for "messages"' if @context == :default
+ each {|s| s.write Angelo::Base.sse_event(@context, data)}
+ nil
+ end
+
+ def message data
+ each {|s| s.write Angelo::Base.sse_message(data)}
+ nil
+ end
+
+ end
+
end
+
end