Sha256: db3ad076bbc01f50475bb77a57eebeb3e6e0d071010db85ae03cfd4a8fc1109a

Contents?: true

Size: 1.06 KB

Versions: 1

Compression:

Stored size: 1.06 KB

Contents

# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018, by Huba Nagy.
# Copyright, 2018-2024, by Samuel Williams.

require_relative "generic"

module Async
	module Redis
		module Context
			class Subscribe < Generic
				MESSAGE = "message"
				
				def initialize(pool, channels)
					super(pool)
					
					subscribe(channels)
				end
				
				def close
					# There is no way to reset subscription state. On Redis v6+ you can use RESET, but this is not supported in <= v6.
					@connection&.close
					
					super
				end
				
				def listen
					while response = @connection.read_response
						return response if response.first == MESSAGE
					end
				end
				
				def each
					return to_enum unless block_given?
					
					while response = self.listen
						yield response
					end
				end
				
				def subscribe(channels)
					@connection.write_request ["SUBSCRIBE", *channels]
					@connection.flush
				end
				
				def unsubscribe(channels)
					@connection.write_request ["UNSUBSCRIBE", *channels]
					@connection.flush
				end
			end
		end
	end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
async-redis-0.11.0 lib/async/redis/context/subscribe.rb