Sha256: 1e336d22a5270a158bcf2ca796b2aec68dd350f131d22c521101625d53f30943
Contents?: true
Size: 1.17 KB
Versions: 2
Compression:
Stored size: 1.17 KB
Contents
require 'redis' require 'concurrent' module Pakyow module Realtime # Manages channel subscriptions for this application instance's WebSockets. # # @api private class RedisSubscription include Concurrent::Async def initialize @redis = ::Redis.new(Config.realtime.redis) @channels = [] ObjectSpace.define_finalizer(self, self.class.finalize) end def self.finalize -> { unsubscribe @redis.quit } end def subscribe(channels) return if channels.empty? @channels = channels run end def unsubscribe return if @channels.empty? @redis.unsubscribe(*@channels) end private def run @redis.subscribe(*@channels) do |on| on.message do |channel, msg| msg = JSON.parse(msg) if msg.is_a?(Hash) msg[:__propagated] = true elsif msg.is_a?(Array) msg << :__propagated end context = Pakyow::Realtime::Context.new(Pakyow.app) context.push(msg, channel) end end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
pakyow-realtime-0.10.1 | pakyow-realtime/lib/pakyow-realtime/redis_subscription.rb |
pakyow-realtime-0.10.0 | pakyow-realtime/lib/pakyow-realtime/redis_subscription.rb |