unless defined? $__rq_jobqueue__ module RQ #{{{ LIBDIR = File::dirname(File::expand_path(__FILE__)) + File::SEPARATOR unless defined? LIBDIR require LIBDIR + 'util' require LIBDIR + 'logging' require LIBDIR + 'qdb' class JobQueue #{{{ include Logging include Util class Error < StandardError; end MAX_JID = 2 ** 20 class << self #{{{ def create path, opts = {} #{{{ FileUtils::rm_rf path FileUtils::mkdir_p path db = File::join path, 'db' qdb = QDB.create db, opts opts['qdb'] = qdb q = new path, opts q #}}} end #}}} end attr :path attr :opts attr :qdb alias :db :qdb def initialize path, opts = {} #{{{ @path = path @opts = opts raise "q <#{ @path }> does not exist" unless test ?e, @path raise "q <#{ @path }> is not a directory" unless test ?d, @path @basename = File::basename(@path) @dirname = File::dirname(@path) @logger = getopt('logger', opts) || Logger::new(STDERR) @qdb = getopt('qdb', opts) || QDB::new(File::join(@path, 'db'), 'logger' => @logger) #}}} end def submit(*jobs) #{{{ now = Util::timestamp Time.now tuples = nil sql = '' jobs.each do |job| raise "no command for job <#{ job.inspect }>" unless job['command'] tuple = QDB::tuple tuple['command'] = job['command'] tuple['priority'] = job['priority'] || 0 tuple['tag'] = job['tag'] tuple['state'] = 'pending' tuple['submitted'] = now tuple['submitter'] = Util::hostname values = QDB::q tuple sql << "insert into jobs values (#{ values.join ',' });\n" end tuples = nil transaction do execute(sql){} if block_given? sql = "select * from jobs where submitted = '#{ now }'" tuples = execute sql end end tuples.each{|t| yield t} if tuples tuples = nil self #}}} end def list(*whats) #{{{ whats.replace(['pending', 'running', 'finished', 'dead']) if whats.empty? or whats.include?('all') whats.map! do |what| case what when %r/^\s*p/io 'pending' when %r/^\s*r/io 'running' when %r/^\s*f/io 'finished' when %r/^\s*d/io 'dead' else what end end where_clauses = [] whats.each do |what| if what =~ %r/^\s*\d+\s*$/o where_clauses << "jid=#{ QDB::q what }\n" else where_clauses << "state=#{ QDB::q what }\n" end end where_clause = where_clauses.join(" or \n") sql = <<-sql select * from jobs where #{ where_clause } sql tuples = ro_transaction{ execute sql } puts '---' fields = @qdb.fields tuples.each do |tuple| puts '-' fields.each{|f| puts " #{ f }: #{ tuple[f] }" } end tuples = nil self #}}} end def status(*whats) #{{{ whats.replace(['pending', 'running', 'finished', 'dead']) if whats.empty? or whats.include?('all') whats.map! do |what| case what when %r/^\s*p/io 'pending' when %r/^\s*r/io 'running' when %r/^\s*f/io 'finished' when %r/^\s*d/io 'dead' else what end end puts '---' ro_transaction do whats.each do |what| sql = <<-sql select count(*) from jobs where state = '#{ what }'; sql tuples = execute sql tuple = tuples.first count = (tuple ? tuple.first : 0) puts "#{ what } : #{ count }" end end self #}}} end def query where_clause = nil #{{{ sql = if where_clause # # turn =~ into like clauses # #where_clause.gsub!(/(=~\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "like '%#{ q }%'"} # # quote everything on the rhs of an '=' sign - helps with shell problems... # #where_clause.gsub!(/(==?\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "='#{ q }'"} "select * from jobs where #{ where_clause };" else "select * from jobs;" end tuples = ro_transaction{ execute sql } puts '---' fields = @qdb.fields tuples.each do |tuple| puts '-' fields.each{|f| puts " #{ f }: #{ tuple[f] }"} end tuples = nil self #}}} end def delete(*jids) #{{{ what = jids.first || 'all' case what when String sql = case what when %r/^\s*p/io "delete from jobs where state = 'pending';" when %r/^\s*r/io "delete from jobs where state = 'running';" when %r/^\s*f/io "delete from jobs where state = 'finished';" when %r/^\s*d/io "delete from jobs where state = 'dead';" when %r/^\s*a/io "delete from jobs;" else raise ArgumentError, "cannot delete <#{ what }>" end else sql = '' jids.each do |jid| jid = Integer jid sql << "delete from jobs where jid = #{ jid };" end end transaction{ execute sql } sql = nil @qdb.vacuum self #}}} end def update(kvs,*jids) #{{{ allowed = %w(priority command tag) kvs.keys.each do |key| raise "update of <#{ key }> not allowed" unless allowed.include? key end raise "no jids" if jids.empty? jids.sort! jids.uniq! pending = jids.delete 'pending' update_clause = kvs.map{|k,v| "#{ k }='#{ v }'"}.join(",\n") if pending update_sql = "update jobs\n" << "set\n#{ update_clause }\n" << "where\nstate='pending'" select_sql = "select * from jobs where state='pending'" else where_clause = jids.map{|jid| "jid=#{ jid }"}.join(" or\n") update_sql = "update jobs\n" << "set\n#{ update_clause }\n" << "where\nstate='pending' and\n#{ where_clause }" select_sql = "select * from jobs where #{ where_clause }" end tuples = nil transaction do execute update_sql tuples = execute select_sql end tuples or [] #}}} end def transaction(*args, &block) #{{{ @qdb.transaction(*args, &block) #}}} end def ro_transaction &block #{{{ @qdb.ro_transaction &block #}}} end def execute(*args, &block) #{{{ @qdb.execute(*args, &block) #}}} end def integrity_check(*args, &block) #{{{ @qdb.integrity_check(*args, &block) #}}} end def snapshot qtmp = "#{ @basename }.snapshot", retries = nil #{{{ qtmp ||= "#{ @basename }.snapshot" debug{ "snapshot <#{ @path }> -> <#{ qtmp }>" } retries = Integer(retries || 16) debug{ "retries <#{ retries }>" } qss = nil loopno = 0 take_snapshot = lambda do FileUtils::rm_rf qtmp FileUtils::mkdir_p qtmp %w(db db.schema lock).each do |base| src, dest = File::join(@path, base), File::join(qtmp, base) debug{ "cp <#{ src }> -> <#{ dest }>" } FileUtils::cp(src, dest) end ss = klass::new qtmp, @opts if ss.integrity_check ss else nil end end loop do break if loopno >= retries if((ss = take_snapshot.call)) debug{ "snapshot <#{ qtmp }> created" } qss = ss break else debug{ "failure <#{ loopno + 1}> of <#{ retries }> attempts to create snapshot <#{ qtmp }> - retrying..." } end loopno += 1 end unless qss debug{ "locking <#{ @path }> as last resort" } @qdb.write_lock do if((ss = take_snapshot.call)) debug{ "snapshot <#{ qtmp }> created" } qss = ss else raise "failed <#{ loopno }> times to create snapshot <#{ qtmp }>" end end end qss #}}} end def lock(*args, &block) #{{{ @qdb.lock(*args, &block) #}}} end def getjob #{{{ sql = <<-sql select * from jobs where state = 'pending' order by priority desc, submitted asc, jid asc limit 1; sql tuples = execute sql job = tuples.first job #}}} end def jobisrunning job #{{{ sql = <<-sql update jobs set pid = '#{ job['pid'] }', state = '#{ job['state'] }', started = '#{ job['started'] }', runner = '#{ job['runner'] }' where jid = #{ job['jid'] }; sql execute sql #}}} end def jobisdone job #{{{ sql = <<-sql update jobs set state = '#{ job['state'] }', exit_status = '#{ job['exit_status'] }', finished = '#{ job['finished'] }', elapsed = '#{ job['elapsed'] }' where jid = #{ job['jid'] }; sql execute sql #}}} end def mtime #{{{ File::stat(@path).mtime #}}} end def []= key, value #{{{ sql = "select count(*) from attributes where key='#{ key }';" tuples = @qdb.execute sql tuple = tuples.first count = Integer tuple['count(*)'] case count when 0 sql = "insert into attributes values('#{ key }','#{ value }');" @qdb.execute sql when 1 sql = "update attributes set key='#{ key }', value='#{ value }' where key='#{ key }';" @qdb.execute sql else raise "key <#{ key }> has become corrupt!" end #}}} end def attributes #{{{ h = {} tuples = @qdb.execute "select * from attributes;" tuples.map!{|t| h[t['key']] = t['value']} h #}}} end #}}} end # class JobQueue #}}} end # module RQ $__rq_jobqueue__ = __FILE__ end