Sha256: d0a52ff4f6ad617d6b3ea3180d49ddd4a03449c605205ed925e4b94d0ac5c6f0

Contents?: true

Size: 1.69 KB

Versions: 1

Compression:

Stored size: 1.69 KB

Contents

module Pubsubstub
  class RedisPubSub
    EVENT_SCORE_THRESHOLD = 1000
    EXPIRE_THRESHOLD = 24 * 60 * 60

    def initialize(channel_name)
      @channel_name = channel_name
    end

    def subscribe(callback)
      self.class.sub.subscribe(key('pubsub'), callback)
    end

    def unsubscribe(callback)
      self.class.sub.unsubscribe_proc(key('pubsub'), callback)
    end

    def publish(event)
      self.class.publish(@channel_name, event)
    end

    def scrollback(since_event_id)
      redis = if EventMachine.reactor_running?
        self.class.nonblocking_redis
      else
        self.class.blocking_redis
      end

      redis.zrangebyscore(key('scrollback'), "(#{since_event_id.to_i}", '+inf') do |events|
        events.each do |json|
          yield Pubsubstub::Event.from_json(json)
        end
      end
    end

    private

    def key(purpose)
      [@channel_name, purpose].join(".")
    end

    class << self
      def publish(channel_name, event)
        scrollback = "#{channel_name}.scrollback"
        blocking_redis.pipelined do
          blocking_redis.publish("#{channel_name}.pubsub", event.to_json)
          blocking_redis.zadd(scrollback, event.id, event.to_json)
          blocking_redis.zremrangebyrank(scrollback, 0, -EVENT_SCORE_THRESHOLD)
          blocking_redis.expire(scrollback, EXPIRE_THRESHOLD)
        end
      end

      def sub
        @sub ||= nonblocking_redis.pubsub
      end

      def blocking_redis
        @blocking_redis ||= Redis.new(url: redis_url)
      end

      def nonblocking_redis
        @nonblocking_redis ||= EM::Hiredis.connect(redis_url)
      end

      def redis_url
        ENV['REDIS_URL'] || "redis://localhost:6379"
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pubsubstub-0.0.11 lib/pubsubstub/redis_pub_sub.rb