lib/pubsubstub/channel.rb in pubsubstub-0.0.15 vs lib/pubsubstub/channel.rb in pubsubstub-0.1.0.beta1

- old
+ new

@@ -1,54 +1,40 @@ module Pubsubstub class Channel - attr_reader :name, :pubsub - - def initialize(name) - @name = name - @pubsub = RedisPubSub.new(name) - @connections = [] + class << self + def name_from_pubsub_key(key) + key.sub(/\.pubsub$/, '') + end end - def subscribe(connection, options = {}) - listen if @connections.empty? - @connections << connection - scrollback(connection, options[:last_event_id]) - end + attr_reader :name - def subscribed?(connection) - @connections.include?(connection) + def initialize(name) + @name = name.to_s end - def unsubscribe(connection) - @connections.delete(connection) - stop_listening if @connections.empty? - end - def publish(event) - pubsub.publish(event) + redis.pipelined do + redis.zadd(scrollback_key, event.id, event.to_json) + redis.zremrangebyrank(scrollback_key, 0, -Pubsubstub.channels_scrollback_size) + redis.expire(scrollback_key, Pubsubstub.channels_scrollback_ttl) + redis.publish(pubsub_key, event.to_json) + end end - def scrollback(connection, last_event_id) - return unless last_event_id - pubsub.scrollback(last_event_id) do |event| - connection << event.to_message - end + def scrollback(since: ) + redis.zrangebyscore(scrollback_key, Integer(since) + 1, '+inf').map(&Event.method(:from_json)) end - private - - def broadcast(json) - string = Event.from_json(json).to_message - @connections.each do |connection| - connection << string - end + def scrollback_key + "#{name}.scrollback" end - def listen - pubsub.subscribe(method(:broadcast)) + def pubsub_key + "#{name}.pubsub" end - def stop_listening - pubsub.unsubscribe(method(:broadcast)) + def redis + Pubsubstub.redis end end end