Sha256: a08905108129d62d24ff19836a84876e65aa5d789baf7969ad643feff37e8d88

Contents?: true

Size: 1.69 KB

Versions: 2

Compression:

Stored size: 1.69 KB

Contents

require 'rflow/daemon_process'
require 'rflow/shard'
require 'rflow/broker'

class RFlow
  # The master/watchdog process for RFlow. Mostly exists to receive +SIGCHLD+ from subprocesses
  # so it can kill them all with +SIGQUIT+ and get restarted.
  class Master < DaemonProcess
    # The {Shard}s being managed by the {Master}.
    # @return [Array<Shard>]
    attr_reader :shards

    # The {Broker}s being managed by the {Master}.
    # @return [Array<Broker>]
    attr_reader :brokers

    def initialize(config)
      super(config['rflow.application_name'], 'Master', pid_file_path: config['rflow.pid_file_path'])
      @shards = config.shards.map {|config| Shard.new(config) }
      RFlow.logger.context_width = @shards.flat_map(&:workers).map(&:name).map(&:length).max
      @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) }
    end

    # Override of {spawn_subprocesses} that actually spawns them,
    # then calls {Shard#run!} on each.
    # @return [void]
    def spawn_subprocesses
      RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0
      brokers.each(&:spawn!)
      RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(', ')}" if brokers.count > 0

      shards.each(&:run!)
    end

    # Override of {subprocesses} that includes the {Broker}s and
    # every {Shard::Worker} of every {Shard}.
    # @return [Array<ChildProcess>]
    def subprocesses
      brokers + shards.flat_map(&:workers)
    end

    # Override that starts EventMachine and waits until it gets stopped.
    # @return [void]
    def run_process
      EM.run do
        # TODO: Monitor the workers
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rflow-1.3.2 lib/rflow/master.rb
rflow-1.3.1 lib/rflow/master.rb