Sha256: c628dfcf27e1c341baf2cd5b86050bcc91bb18a9384948d4ec4a66f33a050303

Contents?: true

Size: 1.58 KB

Versions: 1

Compression:

Stored size: 1.58 KB

Contents

require 'redis'
require 'volt/server/message_bus/base_message_bus'

module Volt
  module MessageBus
    class Redis < BaseMessageBus
      class Subscription
        def initialize(redis)
          @redis = redis
        end

        def remove
          @redis.unsubscribe
        end
      end

      def initialize(volt_app)
        @redis = new_connection
      end

      # Subscribe should return an object that you can call .remove on to stop
      # the subscription.
      def subscribe(channel_name, &block)
        sub_redis = new_connection

        Thread.new do
          # Since the Redis driver does not have a connection pool, we create a
          # new connection each time we subscribe.
          # Note: internally volt does only 1 subscription.
          sub_redis.subscribe(channel_name.to_sym) do |on|
            on.message do |channel_name, message|
              block.call(message)
            end
          end
        end

        Subscription.new(sub_redis)
      end

      # publish should push out to all subscribed within the volt cluster.
      def publish(channel_name, message)
        @redis.publish(channel_name.to_sym, message)
      end

      def new_connection
        msg_bus = Volt.config.message_bus
        if msg_bus && (opts = msg_bus.connect_options)
          ::Redis.new(opts)
        elsif ENV['REDIS_URL']
          ::Redis.new(url: ENV["REDIS_URL"])
        else
          ::Redis.new
        end
      end

      # waits for all messages to be flushed and closes connections
      def disconnect!
        raise "Not implemented"
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
volt-redis_message_bus-0.1.1 lib/volt/message_bus/redis_message_bus.rb