bin/racoon-worker in racoon-0.5.0 vs bin/racoon-worker in racoon-0.6.0
- old
+ new
@@ -4,15 +4,19 @@
require 'getoptlong'
require 'rubygems'
require 'daemons'
require 'racoon'
require 'csv'
+require 'yaml'
+require 'zmqmachine'
+require 'beanstalk-client'
def usage
puts "Usage: racoon-worker [switches]"
puts " --beanstalk <127.0.0.1:11300> csv list of ip:port for beanstalk servers"
puts " --tube <racoon> the beanstalk tube to use"
+ puts " --firehose <127.0.0.1:11555> the address of the firehose"
puts " --pid </var/run/racoon-worker.pid> the path to store the pid"
puts " --log </var/log/racoon-worker.log> the path to store the log"
puts " --daemon to daemonize the server"
puts " --help this message"
end
@@ -27,31 +31,35 @@
end
opts = GetoptLong.new(
["--beanstalk", "-b", GetoptLong::REQUIRED_ARGUMENT],
["--tube", "-t", GetoptLong::REQUIRED_ARGUMENT],
+ ["--firehose", "-f", GetoptLong::REQUIRED_ARGUMENT],
["--pid", "-i", GetoptLong::REQUIRED_ARGUMENT],
["--log", "-l", GetoptLong::REQUIRED_ARGUMENT],
["--help", "-h", GetoptLong::NO_ARGUMENT],
["--daemon", "-d", GetoptLong::NO_ARGUMENT]
)
-beanstalks = ["127.0.0.1:11300"]
-tube = 'racoon'
+@beanstalks = ["127.0.0.1:11300"]
+firehose_address = "127.0.0.1:11555".split(":")
+@tube = 'racoon'
@pid_file = '/var/run/racoon-worker.pid'
@log_file = '/var/log/racoon-worker.log'
daemon = false
opts.each do |opt, arg|
case opt
when '--help'
usage
exit 1
when '--beanstalk'
- beanstalks = CSV.parse(arg)[0]
+ @beanstalks = CSV.parse(arg)[0]
when '--tube'
- tube = arg
+ @tube = arg
+ when '--firehose'
+ firehose_address = arg.split(":")
when '--pid'
@pid_file = arg
when '--log'
@log_file = arg
when '--daemon'
@@ -59,11 +67,54 @@
end
end
Racoon::Config.logger = Logger.new(@log_file)
+def beanstalk
+ return @beanstalk if @beanstalk
+ @beanstalk = Beanstalk::Pool.new(@beanstalks)
+ %w{use watch}.each { |s| @beanstalk.send(s, @tube) }
+ @beanstalk.ignore('default')
+ @beanstalk
+end
+
+# Expects json ala:
+# json = {
+# "project":{
+# "name":"foobar",
+# "certificate":"...",
+# "sandbox":false
+# },
+# "bytes":"..."
+# }
+def process(job, worker)
+ packet = job.ybody
+ project = packet[:project]
+
+ notification = Racoon::Notification.create_from_packet(packet)
+
+ data = { :project => project, :bytes => notification.to_bytes }
+ worker.send_message(YAML::dump(data))
+end
+
if daemon
daemonize
else
puts "Starting racoon worker."
end
-Racoon::Worker.new(beanstalks, tube).start!
+
+ZM::Reactor.new(:worker).run do |context|
+ worker = Racoon::Worker.new(context, ZM::Address.new(firehose_address[0], firehose_address[1], :tcp))
+ context.push_socket(worker)
+
+ context.periodical_timer(0.1) do
+ begin
+ if beanstalk.peek_ready
+ job = beanstalk.reserve(1)
+ process(job, worker)
+ job.delete
+ end
+ rescue Beanstalk::TimedOut
+ Racoon::Config.logger.info "[Beanstalk] Unable to secure job, timed out."
+ end
+ end
+end.join