lib/moleculer/transporters/redis.rb in moleculer-0.1.1 vs lib/moleculer/transporters/redis.rb in moleculer-0.2.0
- old
+ new
@@ -1,14 +1,15 @@
require "redis"
+require_relative "base"
# frozen_string_literal: true
module Moleculer
module Transporters
##
# The Moleculer Redis transporter
- class Redis
+ class Redis < Base
##
# @private
# Represents the publisher connection
class Publisher
def initialize(config)
@@ -19,11 +20,11 @@
##
# Publishes the packet to the packet's topic
def publish(packet)
topic = packet.topic
- @logger.trace "publishing packet to '#{topic}'", packet.as_json
+ @logger.debug "publishing packet to '#{topic}'", packet.as_json
connection.publish(topic, @serializer.serialize(packet))
end
##
# Connects to redis
@@ -153,25 +154,38 @@
def initialize(config)
@config = config
@uri = config.transporter
@logger = config.logger.get_child("[REDIS.TRANSPORTER]")
@subscriptions = Concurrent::Array.new
+ @started = false
end
def subscribe(channel, &block)
@logger.debug "subscribing to channel '#{channel}'"
@subscriptions << Subscription.new(
channel: channel,
block: block,
config: @config,
- ).connect
+ )
+
+ @subscriptions.last.connect if started?
end
def disconnect
@logger.debug "disconnecting subscriptions"
@subscriptions.each(&:disconnect)
end
+
+ def connect
+ @logger.debug "connecting subscriptions"
+ @subscriptions.each(&:connect)
+ @started = true
+ end
+
+ def started?
+ @started
+ end
end
def initialize(config)
@config = config
end
@@ -182,14 +196,15 @@
def publish(packet)
publisher.publish(packet)
end
- def connect
+ def start
publisher.connect
+ subscriber.connect
end
- def disconnect
+ def stop
publisher.disconnect
subscriber.disconnect
end
private