lib/pubsubstub/channel.rb in pubsubstub-0.0.15 vs lib/pubsubstub/channel.rb in pubsubstub-0.1.0.beta1
- old
+ new
@@ -1,54 +1,40 @@
module Pubsubstub
class Channel
- attr_reader :name, :pubsub
-
- def initialize(name)
- @name = name
- @pubsub = RedisPubSub.new(name)
- @connections = []
+ class << self
+ def name_from_pubsub_key(key)
+ key.sub(/\.pubsub$/, '')
+ end
end
- def subscribe(connection, options = {})
- listen if @connections.empty?
- @connections << connection
- scrollback(connection, options[:last_event_id])
- end
+ attr_reader :name
- def subscribed?(connection)
- @connections.include?(connection)
+ def initialize(name)
+ @name = name.to_s
end
- def unsubscribe(connection)
- @connections.delete(connection)
- stop_listening if @connections.empty?
- end
-
def publish(event)
- pubsub.publish(event)
+ redis.pipelined do
+ redis.zadd(scrollback_key, event.id, event.to_json)
+ redis.zremrangebyrank(scrollback_key, 0, -Pubsubstub.channels_scrollback_size)
+ redis.expire(scrollback_key, Pubsubstub.channels_scrollback_ttl)
+ redis.publish(pubsub_key, event.to_json)
+ end
end
- def scrollback(connection, last_event_id)
- return unless last_event_id
- pubsub.scrollback(last_event_id) do |event|
- connection << event.to_message
- end
+ def scrollback(since: )
+ redis.zrangebyscore(scrollback_key, Integer(since) + 1, '+inf').map(&Event.method(:from_json))
end
- private
-
- def broadcast(json)
- string = Event.from_json(json).to_message
- @connections.each do |connection|
- connection << string
- end
+ def scrollback_key
+ "#{name}.scrollback"
end
- def listen
- pubsub.subscribe(method(:broadcast))
+ def pubsub_key
+ "#{name}.pubsub"
end
- def stop_listening
- pubsub.unsubscribe(method(:broadcast))
+ def redis
+ Pubsubstub.redis
end
end
end