Sha256: 1e336d22a5270a158bcf2ca796b2aec68dd350f131d22c521101625d53f30943

Contents?: true

Size: 1.17 KB

Versions: 2

Compression:

Stored size: 1.17 KB

Contents

require 'redis'
require 'concurrent'

module Pakyow
  module Realtime
    # Manages channel subscriptions for this application instance's WebSockets.
    #
    # @api private
    class RedisSubscription
      include Concurrent::Async

      def initialize
        @redis = ::Redis.new(Config.realtime.redis)
        @channels = []

        ObjectSpace.define_finalizer(self, self.class.finalize)
      end

      def self.finalize
        -> {
          unsubscribe
          @redis.quit
        }
      end

      def subscribe(channels)
        return if channels.empty?
        @channels = channels

        run
      end

      def unsubscribe
        return if @channels.empty?
        @redis.unsubscribe(*@channels)
      end

      private

      def run
        @redis.subscribe(*@channels) do |on|
          on.message do |channel, msg|
            msg = JSON.parse(msg)

            if msg.is_a?(Hash)
              msg[:__propagated] = true
            elsif msg.is_a?(Array)
              msg << :__propagated
            end

            context = Pakyow::Realtime::Context.new(Pakyow.app)
            context.push(msg, channel)
          end
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
pakyow-realtime-0.10.1 pakyow-realtime/lib/pakyow-realtime/redis_subscription.rb
pakyow-realtime-0.10.0 pakyow-realtime/lib/pakyow-realtime/redis_subscription.rb