Sha256: ef7ca547821b4d85c6c113dc2aee96c72955c92a14e50ca6011904595db50518
Contents?: true
Size: 1.27 KB
Versions: 5
Compression:
Stored size: 1.27 KB
Contents
# encoding: utf-8 $:.unshift File.dirname(__FILE__) + '/../lib' require 'mq' MAX = 5000 def EM.fork &blk raise if reactor_running? unless @forks at_exit { @forks.each { |pid| Process.kill('KILL', pid) } } end (@forks ||= []) << Kernel.fork do EM.run(&blk) end end def log(*args) p args end # AMQP::Channel.logging = true # worker workers = ARGV[0] ? (Integer(ARGV[0]) rescue 2) : 2 workers.times do EM.fork { log "prime checker", Process.pid, :started class Fixnum def prime? ('1' * self) !~ /^1?$|^(11+?)\1+$/ end end AMQP::Channel.queue('prime checker').subscribe { |info, num| log "prime checker #{Process.pid}", :prime?, num if Integer(num).prime? AMQP::Channel.queue(info.reply_to).publish(num, :reply_to => Process.pid) end } } end # controller EM.run { AMQP::Channel.queue('prime collector').subscribe { |info, prime| log 'prime collector', :received, prime, :from, info.reply_to (@primes ||= []) << Integer(prime) EM.stop_event_loop if prime == '499' } MAX.times do |i| EM.next_tick do AMQP::Channel.queue('prime checker').publish((i+1).to_s, :reply_to => 'prime collector') end end }
Version data entries
5 entries across 5 versions & 1 rubygems
Version | Path |
---|---|
amqp-0.7.5 | research/primes-forked.rb |
amqp-0.7.4 | research/primes-forked.rb |
amqp-0.7.3 | research/primes-forked.rb |
amqp-0.7.2 | research/primes-forked.rb |
amqp-0.7.1 | research/primes-forked.rb |