Sha256: f69bfb4468bc8da892e75fe69495c8e053499cfc821ebcea867d0d15187380d5

Contents?: true

Size: 971 Bytes

Versions: 1

Compression:

Stored size: 971 Bytes

Contents

# encoding: utf-8

$:.unshift File.dirname(__FILE__) + '/../lib'
require 'mq'

MAX = 500

def log(*args)
  p args
end

# MQ.logging = true

EM.run {

  # worker

  log "prime checker", Process.pid, :started

  class Fixnum
    def prime?
      ('1' * self) !~ /^1?$|^(11+?)\1+$/
    end
  end

  MQ.queue('prime checker').subscribe { |info, num|
    EM.defer(proc {

      log "prime checker #{Process.pid}-#{Thread.current.object_id}", :prime?, num
      if Integer(num).prime?
        MQ.queue(info.reply_to).publish(num, :reply_to => "#{Process.pid}-#{Thread.current.object_id}")
        EM.stop_event_loop if num == '499'
      end

    })
  }

  # controller

  MQ.queue('prime collector').subscribe { |info, prime|
    log 'prime collector', :received, prime, :from, info.reply_to
    (@primes ||= []) << Integer(prime)
  }

  MAX.times do |i|
    EM.next_tick do
      MQ.queue('prime checker').publish((i+1).to_s, :reply_to => 'prime collector')
    end
  end

}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
amqp-0.7.0 research/primes-threaded.rb