bin/racoon-worker in racoon-0.5.0 vs bin/racoon-worker in racoon-0.6.0

- old
+ new

@@ -4,15 +4,19 @@ require 'getoptlong' require 'rubygems' require 'daemons' require 'racoon' require 'csv' +require 'yaml' +require 'zmqmachine' +require 'beanstalk-client' def usage puts "Usage: racoon-worker [switches]" puts " --beanstalk <127.0.0.1:11300> csv list of ip:port for beanstalk servers" puts " --tube <racoon> the beanstalk tube to use" + puts " --firehose <127.0.0.1:11555> the address of the firehose" puts " --pid </var/run/racoon-worker.pid> the path to store the pid" puts " --log </var/log/racoon-worker.log> the path to store the log" puts " --daemon to daemonize the server" puts " --help this message" end @@ -27,31 +31,35 @@ end opts = GetoptLong.new( ["--beanstalk", "-b", GetoptLong::REQUIRED_ARGUMENT], ["--tube", "-t", GetoptLong::REQUIRED_ARGUMENT], + ["--firehose", "-f", GetoptLong::REQUIRED_ARGUMENT], ["--pid", "-i", GetoptLong::REQUIRED_ARGUMENT], ["--log", "-l", GetoptLong::REQUIRED_ARGUMENT], ["--help", "-h", GetoptLong::NO_ARGUMENT], ["--daemon", "-d", GetoptLong::NO_ARGUMENT] ) -beanstalks = ["127.0.0.1:11300"] -tube = 'racoon' +@beanstalks = ["127.0.0.1:11300"] +firehose_address = "127.0.0.1:11555".split(":") +@tube = 'racoon' @pid_file = '/var/run/racoon-worker.pid' @log_file = '/var/log/racoon-worker.log' daemon = false opts.each do |opt, arg| case opt when '--help' usage exit 1 when '--beanstalk' - beanstalks = CSV.parse(arg)[0] + @beanstalks = CSV.parse(arg)[0] when '--tube' - tube = arg + @tube = arg + when '--firehose' + firehose_address = arg.split(":") when '--pid' @pid_file = arg when '--log' @log_file = arg when '--daemon' @@ -59,11 +67,54 @@ end end Racoon::Config.logger = Logger.new(@log_file) +def beanstalk + return @beanstalk if @beanstalk + @beanstalk = Beanstalk::Pool.new(@beanstalks) + %w{use watch}.each { |s| @beanstalk.send(s, @tube) } + @beanstalk.ignore('default') + @beanstalk +end + +# Expects json ala: +# json = { +# "project":{ +# "name":"foobar", +# "certificate":"...", +# "sandbox":false +# }, +# "bytes":"..." +# } +def process(job, worker) + packet = job.ybody + project = packet[:project] + + notification = Racoon::Notification.create_from_packet(packet) + + data = { :project => project, :bytes => notification.to_bytes } + worker.send_message(YAML::dump(data)) +end + if daemon daemonize else puts "Starting racoon worker." end -Racoon::Worker.new(beanstalks, tube).start! + +ZM::Reactor.new(:worker).run do |context| + worker = Racoon::Worker.new(context, ZM::Address.new(firehose_address[0], firehose_address[1], :tcp)) + context.push_socket(worker) + + context.periodical_timer(0.1) do + begin + if beanstalk.peek_ready + job = beanstalk.reserve(1) + process(job, worker) + job.delete + end + rescue Beanstalk::TimedOut + Racoon::Config.logger.info "[Beanstalk] Unable to secure job, timed out." + end + end +end.join