Sha256: 4d6413ff0f2aa2c1cd17f6bd3d06f3a4f9d47dfed6d92ea5153b6638ff63b895

Contents?: true

Size: 1.19 KB

Versions: 26

Compression:

Stored size: 1.19 KB

Contents

$:.unshift File.dirname(__FILE__) + '/../lib'
require 'mq'

MAX = 5000

def EM.fork &blk
  raise if reactor_running?

  unless @forks
    at_exit{
      @forks.each{ |pid| Process.kill('KILL', pid) }
    }
  end

  (@forks ||= []) << Kernel.fork do
    EM.run(&blk)
  end
end

def log *args
  p args
end

# MQ.logging = true

# worker

  workers = ARGV[0] ? (Integer(ARGV[0]) rescue 2) : 2

  workers.times do
    EM.fork{
      log "prime checker", Process.pid, :started

      class Fixnum
        def prime?
          ('1' * self) !~ /^1?$|^(11+?)\1+$/
        end
      end

      MQ.queue('prime checker').subscribe{ |info, num|
        log "prime checker #{Process.pid}", :prime?, num
        if Integer(num).prime?
          MQ.queue(info.reply_to).publish(num, :reply_to => Process.pid)
        end
      }
    }
  end

# controller

  EM.run{
    MQ.queue('prime collector').subscribe{ |info, prime|
      log 'prime collector', :received, prime, :from, info.reply_to
      (@primes ||= []) << Integer(prime)
      EM.stop_event_loop if prime == '499'
    }

    MAX.times do |i|
      EM.next_tick do
        MQ.queue('prime checker').publish((i+1).to_s, :reply_to => 'prime collector')
      end
    end
  }

Version data entries

26 entries across 26 versions & 8 rubygems

Version Path
brontes3d-amqp-0.6.7.1 research/primes-forked.rb
amqp-0.6.7 research/primes-forked.rb
amqp-0.6.6 research/primes-forked.rb
amqp-0.6.5 research/primes-forked.rb
amqp-0.6.4 research/primes-forked.rb
amqp-0.5.1 examples/primes-forked.rb