#!/usr/bin/env ruby $LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) require 'getoptlong' require 'rubygems' require 'daemons' require 'racoon' require 'csv' require 'yaml' require 'zmqmachine' require 'beanstalk-client' def usage puts "Usage: racoon-worker [switches]" puts " --beanstalk <127.0.0.1:11300> csv list of ip:port for beanstalk servers" puts " --tube the beanstalk tube to use" puts " --firehose <127.0.0.1:11555> the address of the firehose" puts " --pid the path to store the pid" puts " --log the path to store the log" puts " --daemon to daemonize the server" puts " --help this message" end def daemonize Daemonize.daemonize(@log_file, 'racoon-worker') open(@pid_file,"w") { |f| f.write(Process.pid) } open(@pid_file,"w") do |f| f.write(Process.pid) File.chmod(0644, @pid_file) end end opts = GetoptLong.new( ["--beanstalk", "-b", GetoptLong::REQUIRED_ARGUMENT], ["--tube", "-t", GetoptLong::REQUIRED_ARGUMENT], ["--firehose", "-f", GetoptLong::REQUIRED_ARGUMENT], ["--pid", "-i", GetoptLong::REQUIRED_ARGUMENT], ["--log", "-l", GetoptLong::REQUIRED_ARGUMENT], ["--help", "-h", GetoptLong::NO_ARGUMENT], ["--daemon", "-d", GetoptLong::NO_ARGUMENT] ) @beanstalks = ["127.0.0.1:11300"] firehose_address = "127.0.0.1:11555".split(":") @tube = 'racoon' @pid_file = '/var/run/racoon-worker.pid' @log_file = '/var/log/racoon-worker.log' daemon = false opts.each do |opt, arg| case opt when '--help' usage exit 1 when '--beanstalk' @beanstalks = CSV.parse(arg)[0] when '--tube' @tube = arg when '--firehose' firehose_address = arg.split(":") when '--pid' @pid_file = arg when '--log' @log_file = arg when '--daemon' daemon = true end end Racoon::Config.logger = Logger.new(@log_file) def beanstalk return @beanstalk if @beanstalk @beanstalk = Beanstalk::Pool.new(@beanstalks) %w{use watch}.each { |s| @beanstalk.send(s, @tube) } @beanstalk.ignore('default') @beanstalk end # Expects json ala: # json = { # "project":{ # "name":"foobar", # "certificate":"...", # "sandbox":false # }, # "bytes":"..." # } def process(job, worker) packet = job.ybody project = packet[:project] notification = Racoon::Notification.create_from_packet(packet) data = { :project => project, :bytes => notification.to_bytes } worker.send_message(YAML::dump(data)) end if daemon daemonize else puts "Starting racoon worker." end ZM::Reactor.new(:worker).run do |context| worker = Racoon::Worker.new(context, ZM::Address.new(firehose_address[0], firehose_address[1], :tcp)) context.push_socket(worker) context.periodical_timer(0.1) do begin if beanstalk.peek_ready job = beanstalk.reserve(1) process(job, worker) job.delete end rescue Beanstalk::TimedOut Racoon::Config.logger.info "[Beanstalk] Unable to secure job, timed out." end end end.join