lib/moleculer/transporters/redis.rb in moleculer-0.1.1 vs lib/moleculer/transporters/redis.rb in moleculer-0.2.0

- old
+ new

@@ -1,14 +1,15 @@ require "redis" +require_relative "base" # frozen_string_literal: true module Moleculer module Transporters ## # The Moleculer Redis transporter - class Redis + class Redis < Base ## # @private # Represents the publisher connection class Publisher def initialize(config) @@ -19,11 +20,11 @@ ## # Publishes the packet to the packet's topic def publish(packet) topic = packet.topic - @logger.trace "publishing packet to '#{topic}'", packet.as_json + @logger.debug "publishing packet to '#{topic}'", packet.as_json connection.publish(topic, @serializer.serialize(packet)) end ## # Connects to redis @@ -153,25 +154,38 @@ def initialize(config) @config = config @uri = config.transporter @logger = config.logger.get_child("[REDIS.TRANSPORTER]") @subscriptions = Concurrent::Array.new + @started = false end def subscribe(channel, &block) @logger.debug "subscribing to channel '#{channel}'" @subscriptions << Subscription.new( channel: channel, block: block, config: @config, - ).connect + ) + + @subscriptions.last.connect if started? end def disconnect @logger.debug "disconnecting subscriptions" @subscriptions.each(&:disconnect) end + + def connect + @logger.debug "connecting subscriptions" + @subscriptions.each(&:connect) + @started = true + end + + def started? + @started + end end def initialize(config) @config = config end @@ -182,14 +196,15 @@ def publish(packet) publisher.publish(packet) end - def connect + def start publisher.connect + subscriber.connect end - def disconnect + def stop publisher.disconnect subscriber.disconnect end private