Sha256: 00361ab48607ad031a550858237302de2babc1823e29b72a25fd7e07c012685c
Contents?: true
Size: 1.31 KB
Versions: 1
Compression:
Stored size: 1.31 KB
Contents
module StompServer class MemoryQueue attr_accessor :checkpoint_interval def initialize @@log = Logger.new(STDOUT) @@log.level = StompServer::LogHelper.get_loglevel() @frame_index =0 @stompid = StompServer::StompId.new @stats = Hash.new @messages = Hash.new { Array.new } @@log.debug "MemoryQueue initialized" end def stop(session_id) @@log.debug("#{session_id} memory queue shutdown") end def monitor stats = Hash.new @messages.keys.each do |dest| stats[dest] = {'size' => @messages[dest].size, 'enqueued' => @stats[dest][:enqueued], 'dequeued' => @stats[dest][:dequeued]} end stats end def dequeue(dest, session_id) if frame = @messages[dest].shift @stats[dest][:dequeued] += 1 return frame else return false end end def enqueue(dest,frame) @frame_index += 1 if @stats[dest] @stats[dest][:enqueued] += 1 else @stats[dest] = Hash.new @stats[dest][:enqueued] = 1 @stats[dest][:dequeued] = 0 end assign_id(frame, dest) requeue(dest, frame) end def requeue(dest,frame) @messages[dest] += [frame] end def message_for?(dest, session_id) !@messages[dest].empty? end def assign_id(frame, dest) frame.headers['message-id'] = @stompid[@frame_index] end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
stompserver_ng-1.0.6 | lib/stomp_server_ng/queue/memory_queue.rb |