lib/angelo/stash.rb in angelo-0.1.14 vs lib/angelo/stash.rb in angelo-0.1.15

- old
+ new

@@ -1,65 +1,78 @@ module Angelo # utility class for stashing connected websockets in arbitrary contexts # - class Stash + module Stash include Celluloid::Logger - # hold the peeraddr info for use after the socket is closed (logging) - # - @@peeraddrs = {} + module ClassMethods - # the underlying arrays of websockets, by context - # - @@stashes = {} + # the underlying arrays of websockets, by context + # + def stashes + @stashes ||= {} + end + # hold the peeraddr info for use after the socket is closed (logging) + # + def peeraddrs + @peeraddrs ||= {} + end + + end + + def stashes; self.class.stashes; end + def peeraddrs; self.class.peeraddrs; end + # create a new instance with a context, creating the array if needed # def initialize server, context = :default raise ArgumentError.new "symbol required" unless Symbol === context @context, @server = context, server - @@stashes[@context] ||= [] + stashes[@context] ||= [] end # add a websocket to this context's stash, save peeraddr info, start # server handle_websocket task to read from the socket and fire events # as needed # - def << ws - @@peeraddrs[ws] = ws.peeraddr - @server.async.handle_websocket ws - @@stashes[@context] << ws + def << s + peeraddrs[s] = s.peeraddr + yield if block_given? + stashes[@context] << s end # access the underlying array of this context # def stash - @@stashes[@context] + stashes[@context] end # iterate on each connected websocket in this context, handling errors # as needed # def each &block - stash.each do |ws| + stash.each do |s| begin - yield ws + yield s rescue Reel::SocketError, IOError, SystemCallError => e debug e.message - remove_socket ws + remove_socket s end end + nil end - # remove a websocket from the stash, warn user, drop peeraddr info + # remove a socket from the stash, warn user, drop peeraddr info # - def remove_socket ws - if stash.include? ws - warn "removing socket from context ':#{@context}' (#{@@peeraddrs[ws][2]})" - stash.delete ws - @@peeraddrs.delete ws + def remove_socket s + s.close unless s.closed? + if stash.include? s + warn "removing socket from context ':#{@context}' (#{peeraddrs[s][2]})" + stash.delete s + peeraddrs.delete s end end # utility method to create a new instance with a different context # @@ -69,16 +82,16 @@ # iterate on *every* connected websocket in all contexts, mostly used for # ping_websockets task # def all_each - @@stashes.values.flatten.each do |ws| + stashes.values.flatten.each do |s| begin - yield ws + yield s rescue Reel::SocketError, IOError, SystemCallError => e debug e.message - remove_socket ws + remove_socket s end end end # pass the given block to the underlying stashed array's reject! method @@ -87,18 +100,48 @@ stash.reject! &block end # access the peeraddr info for a given websocket # - def peeraddr ws - @@peeraddrs[ws] + def peeraddr s + peeraddrs[s] end # return the number of websockets in this context (some are potentially # disconnected) # def length stash.length end + class Websocket + extend Stash::ClassMethods + include Stash + + def << ws + super do + @server.async.handle_websocket ws + end + end + + end + + class SSE + extend Stash::ClassMethods + include Stash + + def event data + raise ArgumentError.new 'use #message method for "messages"' if @context == :default + each {|s| s.write Angelo::Base.sse_event(@context, data)} + nil + end + + def message data + each {|s| s.write Angelo::Base.sse_message(data)} + nil + end + + end + end + end