Sha256: c628dfcf27e1c341baf2cd5b86050bcc91bb18a9384948d4ec4a66f33a050303
Contents?: true
Size: 1.58 KB
Versions: 1
Compression:
Stored size: 1.58 KB
Contents
require 'redis' require 'volt/server/message_bus/base_message_bus' module Volt module MessageBus class Redis < BaseMessageBus class Subscription def initialize(redis) @redis = redis end def remove @redis.unsubscribe end end def initialize(volt_app) @redis = new_connection end # Subscribe should return an object that you can call .remove on to stop # the subscription. def subscribe(channel_name, &block) sub_redis = new_connection Thread.new do # Since the Redis driver does not have a connection pool, we create a # new connection each time we subscribe. # Note: internally volt does only 1 subscription. sub_redis.subscribe(channel_name.to_sym) do |on| on.message do |channel_name, message| block.call(message) end end end Subscription.new(sub_redis) end # publish should push out to all subscribed within the volt cluster. def publish(channel_name, message) @redis.publish(channel_name.to_sym, message) end def new_connection msg_bus = Volt.config.message_bus if msg_bus && (opts = msg_bus.connect_options) ::Redis.new(opts) elsif ENV['REDIS_URL'] ::Redis.new(url: ENV["REDIS_URL"]) else ::Redis.new end end # waits for all messages to be flushed and closes connections def disconnect! raise "Not implemented" end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
volt-redis_message_bus-0.1.1 | lib/volt/message_bus/redis_message_bus.rb |