Sha256: 6922d344761292fe44d2f5986ff9e0d2c6ea98cfbd6bda2206d9cf737354d8df

Contents?: true

Size: 1.96 KB

Versions: 11

Compression:

Stored size: 1.96 KB

Contents

# frozen_string_literal: true

require_relative '../../spec_helper'
require 'message_bus'

describe PUB_SUB_CLASS do
  def self.error!
    @error = true
  end

  def self.error?
    defined?(@error)
  end

  def new_bus
    PUB_SUB_CLASS.new(MESSAGE_BUS_CONFIG.merge(db: 10))
  end

  def work_it
    bus = new_bus
    $stdout.reopen("/dev/null", "w")
    $stderr.reopen("/dev/null", "w")
    # subscribe blocks, so we need a new bus to transmit
    new_bus.subscribe("/echo", 0) do |msg|
      bus.publish("/response", "#{msg.data}-#{Process.pid.to_s}")
    end
  ensure
    exit!(0)
  end

  def spawn_child
    r = fork
    if r.nil?
      work_it
    else
      r
    end
  end

  n = ENV['MULTI_PROCESS_TIMES'].to_i
  n = 1 if n < 1
  n.times do
    it 'gets every response from child processes' do
      test_never :memory
      skip("previous error") if self.class.error?
      GC.start
      new_bus.reset!
      begin
        pids = (1..10).map { spawn_child }
        expected_responses = pids.map { |x| (0...10).map { |i| "0#{i}-#{x}" } }.flatten
        unexpected_responses = []
        bus = new_bus
        t = Thread.new do
          bus.subscribe("/response", 0) do |msg|
            if expected_responses.include?(msg.data)
              expected_responses.delete(msg.data)
            else
              unexpected_responses << msg.data
            end
          end
        end
        10.times { |i| bus.publish("/echo", "0#{i}") }
        wait_for 4000 do
          expected_responses.empty?
        end
        bus.global_unsubscribe
        t.join

        expected_responses.must_be :empty?
        unexpected_responses.must_be :empty?
      rescue Exception
        self.class.error!
        raise
      ensure
        if pids
          pids.each do |pid|
            begin
              Process.kill("KILL", pid)
            rescue SystemCallError
            end
            Process.wait(pid)
          end
        end
        bus.global_unsubscribe
      end
    end
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
message_bus-3.3.6 spec/lib/message_bus/multi_process_spec.rb
message_bus-3.3.5 spec/lib/message_bus/multi_process_spec.rb
message_bus-3.3.4 spec/lib/message_bus/multi_process_spec.rb
message_bus-3.3.3 spec/lib/message_bus/multi_process_spec.rb
message_bus-3.3.2 spec/lib/message_bus/multi_process_spec.rb
message_bus-3.3.1 spec/lib/message_bus/multi_process_spec.rb
message_bus-3.3.0 spec/lib/message_bus/multi_process_spec.rb
message_bus-3.2.0 spec/lib/message_bus/multi_process_spec.rb
message_bus-3.1.0 spec/lib/message_bus/multi_process_spec.rb
message_bus-3.0.0 spec/lib/message_bus/multi_process_spec.rb
message_bus-2.2.4 spec/lib/message_bus/multi_process_spec.rb