unless defined? $__rq_jobqueue__
  module RQ 
#--{{{
    LIBDIR = File::dirname(File::expand_path(__FILE__)) + File::SEPARATOR unless
      defined? LIBDIR

    require 'tempfile'

    require LIBDIR + 'util'
    require LIBDIR + 'logging'
    require LIBDIR + 'qdb'
    require LIBDIR + 'orderedhash'
    require LIBDIR + 'orderedautohash'

    #
    # the JobQueue class is responsible for high level access to the job queue
    #
    class  JobQueue
#--{{{
      include Logging
      include Util
      class Error < StandardError; end
    
      MAX_JID = 2 ** 20 
    
      class << self
#--{{{
        def create path, opts = {}
#--{{{
          FileUtils::rm_rf path
          FileUtils::mkdir_p path
          db = File::join path, 'db'
          qdb = QDB.create db, opts
          opts['qdb'] = qdb
          q = new path, opts
          FileUtils::mkdir_p q.bin
          FileUtils::mkdir_p q.stdin 
          FileUtils::mkdir_p q.stdout
          FileUtils::mkdir_p q.stderr
          FileUtils::mkdir_p q.data
          q
#--}}}
        end
#--}}}
      end

      attr :path
      attr :bin
      attr :stdin
      attr :stdout
      attr :stderr
      attr :data
      attr :opts
      attr :qdb
      alias :db :qdb

      def initialize path, opts = {}
#--{{{
        @path = path # do NOT expand this or it'll be fubar from misc nfs mounts!!
        @bin = File::join @path, 'bin' 
        @stdin = File::join @path, 'stdin' 
        @stdout = File::join @path, 'stdout' 
        @stderr = File::join @path, 'stderr' 
        @data = File::join @path, 'data' 
        @opts = opts
        raise "q <#{ @path }> does not exist" unless test ?e, @path
        raise "q <#{ @path }> is not a directory" unless test ?d, @path
        @basename = File::basename(@path)
        @dirname = File::dirname(@path)
        @logger = getopt('logger', opts) || Logger::new(STDERR)
        @qdb = getopt('qdb', opts) || QDB::new(File::join(@path, 'db'), 'logger' => @logger)
        @in_transaction = false
        @in_ro_transaction = false
#--}}}
      end
      def stdin4 jid
#--{{{
        "stdin/#{ jid }"
#--}}}
      end
      def standard_in_4 jid
#--{{{
        File::expand_path(File::join(path, stdin4(jid)))
#--}}}
      end
      def stdout4 jid
#--{{{
        "stdout/#{ jid }"
#--}}}
      end
      def standard_out_4 jid
#--{{{
        File::expand_path(File::join(path, stdout4(jid)))
#--}}}
      end
      def stderr4 jid
#--{{{
        "stderr/#{ jid }"
#--}}}
      end
      def standard_err_4 jid
#--{{{
        File::expand_path(File::join(path, stderr4(jid)))
#--}}}
      end
      def data4 jid
#--{{{
        "data/#{ jid }"
#--}}}
      end
      def data_4 jid
#--{{{
        File::expand_path(File::join(path, data4(jid)))
#--}}}
      end
      def submit(*jobs, &block)
#--{{{
        if jobs.size == 1 and jobs.first.is_a?(String)
          jobs = [ { "command" => jobs.to_s } ]
        end

        now = Util::timestamp Time::now
    
        transaction do
          sql = "select max(jid) from jobs"
          tuple = execute(sql).first
          jid = tuple.first || 0
          jid = Integer(jid) + 1

          jobs.each do |job|
            command = job['command']
            stdin = job['stdin']
            data = job['data']

            raise "no command for job <#{ job.inspect }>" unless command 

            tmp_stdin(stdin) do |ts|
              tuple = QDB::tuple

              tuple['command']     = command 
              tuple['priority']    = job['priority'] || 0
              tuple['tag']         = job['tag']
              tuple['runner']      = job['runner']
              tuple['restartable'] = job['restartable']
              tuple['state']       = 'pending'
              tuple['submitted']   = now
              tuple['submitter']   = Util::hostname
              tuple['stdin']       = stdin4 jid
              tuple['stdout']      = nil 
              tuple['stderr']      = nil 
              tuple['data']       = data4 jid

              values = QDB::q tuple

              sql = "insert into jobs values (#{ values.join ',' });\n"
              execute(sql){}

              FileUtils::rm_rf standard_in_4(jid)
              FileUtils::rm_rf standard_out_4(jid)
              FileUtils::rm_rf standard_err_4(jid)
              FileUtils::rm_rf data_4(jid)
              FileUtils::cp ts.path, standard_in_4(jid) if ts
              if data
                FileUtils::cp_r data, data_4(jid)
              else
                FileUtils::mkdir_p data_4(jid)
              end

              if block
                sql = "select * from jobs where jid = '#{ jid }'"
                execute(sql, &block)
              end
            end

            jid += 1
          end
        end
    
        self
#--}}}
      end
      def resubmit(*jobs, &block)
#--{{{
        now = Util::timestamp Time::now

        transaction do
          jobs.each do |job|
            jid = Integer job['jid']
            command = job['command']
            stdin = job['stdin']
            data = job['data']

            raise "no jid for job <#{ job.inspect }>" unless jid 
            raise "no command for job <#{ job.inspect }>" unless command 

            tmp_stdin(stdin) do |ts|
              tuple = QDB::tuple

              tuple['jid']         = jid
              tuple['command']     = command
              tuple['priority']    = job['priority'] || 0
              tuple['tag']         = job['tag']
              tuple['runner']      = job['runner']
              tuple['restartable'] = job['restartable']
              tuple['state']       = 'pending'
              tuple['submitted']   = now
              tuple['submitter']   = Util::hostname
              tuple['stdin']       = stdin4 jid
              tuple['stdout']      = nil
              tuple['stderr']      = nil
              tuple['data']        = data4 jid

              kvs = tuple.fields[1..-1].map{|f| "#{ f }=#{ QDB::q(tuple[ f ]) }"}
              sql = "update jobs set #{ kvs.join ',' } where jid=#{ jid };\n"

              execute(sql){}

              FileUtils::rm_rf standard_in_4(jid)
              FileUtils::rm_rf standard_out_4(jid)
              FileUtils::rm_rf standard_err_4(jid)
              #FileUtils::rm_rf data_4(jid)
              FileUtils::cp ts.path, standard_in_4(jid) if ts
              if data
                FileUtils::mv data, data_4(jid)
              else
                FileUtils::mkdir_p data_4(jid)
              end

              if block
                sql = "select * from jobs where jid = '#{ jid }'"
                execute(sql, &block)
              end
            end # tmp_stdin
          end # jobs.each
        end # transaction
    
        self
#--}}}
      end
      def tmp_stdin stdin = nil
#--{{{
        stdin = nil if stdin.to_s.empty?
        stdin = STDIN if stdin == '-'

        was_opened = false

        begin
          unless stdin.respond_to?('read') or stdin.nil?
            stdin = stdin.to_s
            # relative to queue
            if stdin =~ %r|^@?stdin/\d+$|
              stdin.gsub! %r|^@|, ''
              stdin = File::join(path, stdin)
            end
            stdin = File.expand_path stdin
            stdin = open stdin
            was_opened = true
          end

          tmp = Tempfile::new "#{ Process::pid }_#{ rand }"
          while((buf = stdin.read(8192))); tmp.write buf; end if stdin
          tmp.close

          if block_given?
            begin
              yield tmp
            ensure
              tmp.close!
            end
          else
            return tmp
          end
        ensure
          stdin.close if was_opened rescue nil
        end
#--}}}
      end
      def list(*whats, &block)
#--{{{
        ret = nil

        whats.replace(%w( pending running finished dead )) if 
          whats.empty? or whats.include?('all')
    
        whats.map! do |what|
          case what
            when %r/^\s*p/io
              'pending'
            when %r/^\s*h/io
              'holding'
            when %r/^\s*r/io
              'running'
            when %r/^\s*f/io
              'finished'
            when %r/^\s*d/io
              'dead'
            else
              what
          end
        end

        where_clauses = [] 

        whats.each do |what|
          case what
            when Numeric
              where_clauses << "jid=#{ what }\n"
            else
              what = "#{ what }"
              if what.to_s =~ %r/^\s*\d+\s*$/o
                where_clauses << "jid=#{ QDB::q what }\n"
              else
                where_clauses << "state=#{ QDB::q what }\n"
              end
          end
        end

        where_clause = where_clauses.join(" or \n")

        sql = <<-sql
          select * from jobs
          where #{ where_clause } 
        sql

        if block
          ro_transaction{ execute(sql, &block) }
        else
          ret = ro_transaction{ execute(sql) }
        end

        ret
#--}}}
      end
      def status options = {}
#--{{{
        stats = OrderedAutoHash::new

        now = Time::now

        hms = lambda do |t|
          elapsed =
            begin
              Float t
            rescue
              now - Util::stamptime(t, 'local' => true)
            end
          sh, sm, ss = Util::hms elapsed.to_f
          s = "#{ '%2.2d' % sh }h#{ '%2.2d' % sm }m#{ '%05.2f' % ss }s" 
        end

        exit_code_map = 
          options[:exit_code_map] || options['exit_code_map'] || {}

        ro_transaction do
        #
        # jobs stats
        #
          total = 0
          %w( pending holding running finished dead ).each do |state|
            sql = <<-sql
              select count(*) from jobs 
                where 
                  state='#{ state }'
            sql
            tuples = execute sql 
            tuple = tuples.first
            count = (tuple ? Integer(tuple.first || 0) : 0)
            stats['jobs'][state] = count
            total += count
          end
          stats['jobs']['total'] = total
        #
        # temporal stats 
        #
          metrics = OrderedAutoHash::new
          metrics['pending']  = 'submitted'
          metrics['holding']  = 'submitted'
          metrics['running']  = 'started'
          metrics['finished'] = 'elapsed'
          metrics['dead']     = 'elapsed'

          metrics.each do |state, metric|
            sql = 
              unless metric == 'elapsed'
                <<-sql
                  select min(#{ metric }) as max, max(#{ metric }) as min 
                    from jobs where state='#{ state }'
                sql
              else
                <<-sql
                  select min(#{ metric }) as min, max(#{ metric }) as max 
                    from jobs where state='#{ state }'
                sql
              end
            tuple = execute(sql).first
            next unless tuple

            %w( min max ).each do |time|
              oh = nil
              t = tuple[time]
              if t
                sql = <<-sql
                  select jid from jobs where #{ metric }='#{ t }' and state='#{ state }'
                sql
                which = execute(sql).first
                jid = (which and which['jid']).to_i
                if jid
                  oh = OrderedAutoHash::new
                  oh[jid] = hms[t]
                  oh.yaml_inline = true
                end
                stats['temporal'][state][time] = oh 
              end
            end
            #stats['temporal'][state] ||= nil
          end
          stats['temporal'] ||= nil
        #
        # generate performance stats
        #
          sql = <<-sql
            select avg(elapsed) from jobs 
              where 
                state='finished'
          sql
          tuples = execute sql 
          tuple = tuples.first
          avg = (tuple ? Float(tuple.first || 0) : 0)
          stats['performance']['avg_time_per_job'] = hms[avg] 

          list = []
          0.step(5){|i| list << (2 ** i)}
          list << 24
          list.sort!

          list = 1, 12, 24

          list.each do |n|
            ago = now - (n * 3600)
            ago = Util::timestamp ago
            sql = <<-sql
              select count(*) from jobs 
                where 
                  state = 'finished' and 
                  finished  > '#{ ago }'
            sql
            tuples = execute sql 
            tuple = tuples.first
            count = (tuple ? Integer(tuple.first || 0) : 0)
            #stats['performance']["n_jobs_in_last_#{ n }_hrs"] = count
            stats['performance']["n_jobs_in_last_hrs"][n] = count
          end

        #
        # generate exit_status stats
        #
          #stats['exit_status'] = {}
          sql = <<-sql
            select count(*) from jobs 
              where 
                state='finished' and 
                exit_status=0 
          sql
          tuples = execute sql 
          tuple = tuples.first
          successes = (tuple ? Integer(tuple.first || 0) : 0)
          stats['exit_status']['successes'] = successes

          sql = <<-sql
            select count(*) from jobs 
              where 
                (state='finished' and 
                exit_status!=0) or
                state='dead'
          sql
          tuples = execute sql 
          tuple = tuples.first
          failures = (tuple ? Integer(tuple.first || 0) : 0)
          stats['exit_status']['failures'] = failures

          exit_code_map.each do |which, codes|
            exit_status_clause = codes.map{|code| "exit_status=#{ code }"}.join(' or ')
            sql = <<-sql
              select count(*) from jobs 
                where 
                  (state='finished' and (#{ exit_status_clause }))
            sql
            tuples = execute sql 
            tuple = tuples.first
            n = (tuple ? Integer(tuple.first || 0) : 0)
            stats['exit_status'][which] = n
          end
        end

        stats
#--}}}
      end
      def query(where_clause = nil, &block)
#--{{{
        ret = nil

        sql = 
          if where_clause

          #
          # turn =~ into like clauses 
          #
            #where_clause.gsub!(/(=~\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "like '%#{ q }%'"}
          #
          # quote everything on the rhs of an '=' sign - helps with shell problems...
          #
            #where_clause.gsub!(/(==?\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "='#{ q }'"}

            "select * from jobs where #{ where_clause };"
          else
            "select * from jobs;"
          end

        if block
          ro_transaction{ execute(sql, &block) }
        else
          ret = ro_transaction{ execute(sql) }
        end

        ret
#--}}}
      end
      def delete(*args, &block)
#--{{{
        whats, optargs = args.partition{|arg| not Hash === arg}

        opts = {}
        optargs.each{|oa| opts.update oa}

        force = Util::getopt 'force', opts

        delete_sql, select_sql = '', ''

        whats << 'all' if whats.empty?

        whats.each do |what|
          case "#{ what }"
            when %r/^\s*\d+\s*$/io # number
              delete_sql << "delete from jobs where jid=#{ what } and state!='running';\n"
              select_sql << "select * from jobs where jid=#{ what } and state!='running';\n"
            when %r/^\s*p/io # pending
              delete_sql << "delete from jobs where state='pending';\n"
              select_sql << "select * from jobs where state='pending';\n"
            when %r/^\s*h/io # holding
              delete_sql << "delete from jobs where state='holding';\n"
              select_sql << "select * from jobs where state='holding';\n"
            when %r/^\s*r/io # running
              delete_sql << "delete from jobs where state='running';\n" if force
              select_sql << "select * from jobs where state='running';\n" if force
            when %r/^\s*f/io # finished
              delete_sql << "delete from jobs where state='finished';\n"
              select_sql << "select * from jobs where state='finished';\n"
            when %r/^\s*d/io # dead
              delete_sql << "delete from jobs where state='dead';\n"
              select_sql << "select * from jobs where state='dead';\n"
            when %r/^\s*a/io # all
              delete_sql << "delete from jobs where state!='running';\n"
              select_sql << "select * from jobs where state!='running';\n"
            else
              raise ArgumentError, "cannot delete <#{ what.inspect }>"
          end
        end

        scrub = lambda do |jid|
          [standard_in_4(jid), standard_out_4(jid), standard_err_4(jid), data_4(jid)].each do |path| 
            FileUtils::rm_rf path
          end
        end

        tuples = []

        metablock = 
          if block
            lambda do |tuple|
              jid = tuple['jid']
              block[tuple]
              scrub[jid]
            end
          else
            lambda do |tuple|
              jid = tuple['jid']
              scrub[jid]
              tuples << tuple
            end
          end

# TODO - make file deletion transactional too

        transaction do
          execute(select_sql, &metablock)
          execute(delete_sql){}
        end

        delete_sql = nil
        select_sql = nil

        block ? nil : tuples
#--}}}
      end
      def vacuum
#--{{{
        @qdb.vacuum
#--}}}
      end
      def update(kvs, *jids, &block)
#--{{{
        ret = nil
      #
      # yank out stdin - which we allow as a key
      #
        stdin = kvs.delete 'stdin'
        data = kvs.delete 'data'
      #
      # validate/munge state value iff present
      #
        if((state = kvs['state']))
          case state
            when %r/^p/io
              kvs['state'] = 'pending'
            when %r/^h/io
              kvs['state'] = 'holding'
            else
              raise "update of <state> = <#{ state }> not allowed (try pending or holding)"
          end
        end
      #
      # validate kvs pairs
      #
        allowed = %w( priority command tag runner restartable )
        kvs.each do |key, val|
          raise "update of <#{ key }> = <#{ val }> not allowed" unless
            (allowed.include?(key)) or (key == 'state' and %w( pending holding ).include?(val))
        end
      #
      # ensure there are acutally some jobs to update
      #
        raise "no jobs to update" if jids.empty?
      #
      # generates sql to update jids with kvs and sql to show updated tuples
      #
        build_sql = 
          lambda do |kvs, jids|
            if(jids.delete('pending'))
              execute("select jid from jobs where state='pending'") do |tuple| 
                jids << tuple['jid']
              end
            end

            if(jids.delete('holding'))
              execute("select jid from jobs where state='holding'") do |tuple| 
                jids << tuple['jid']
              end
            end

            rollback_transaction "no jobs to update" if jids.empty?

            update_clause = kvs.map{|k,v| v ? "#{ k }='#{ v }'" : "#{ k }=NULL" }.join(",\n")
            where_clause = jids.map{|jid| "jid=#{ jid }"}.join(" or\n")
            update_sql = 
              "update jobs\n" <<
              "set\n#{ update_clause }\n" <<
              "where\n(state='pending' or state='holding') and\n(#{ where_clause })"
            select_sql = "select * from jobs where (state='pending' or state='holding') and\n(#{ where_clause })"

            if kvs.empty?
              [ nil, select_sql ]
            else
              [ update_sql, select_sql ]
            end
          end
        #
        # setup stdin
        #
        tmp_stdin(stdin) do |ts|
          clobber_stdin = lambda do |job|
            FileUtils::cp ts.path, standard_in_4(job['jid']) if ts
            true
          end

          clobber_data = lambda do |job|
            if data
              FileUtils::rm_rf data_4(job['jid'])
              FileUtils::cp_r data, data_4(job['jid'])
            end
            true
          end

          tuples = []

          metablock = 
            if block
              lambda{|job| clobber_stdin[job] and clobber_data[job] and block[job]}
            else
              lambda{|job| clobber_stdin[job] and clobber_data[job] and tuples << job}
            end

          transaction do 
            update_sql, select_sql = build_sql[kvs, jids]
            break unless select_sql
            execute(update_sql){} if update_sql
            execute(select_sql, &metablock)
          end

          block ? nil : tuples
        end
#--}}}
      end

      def getjob
#--{{{
        sql = <<-sql
          select * from jobs 
            where 
              (state='pending' or (state='dead' and (not restartable isnull))) and 
              (runner like '%#{ Util::host }%' or runner isnull)
            order by priority desc, submitted asc, jid asc
            limit 1;
        sql
        tuples = execute sql
        job = tuples.first
        job
#--}}}
      end
      def jobisrunning job 
#--{{{
        sql = <<-sql
          update jobs 
            set
              pid='#{ job['pid'] }',
              state='#{ job['state'] }',
              started='#{ job['started'] }',
              runner='#{ job['runner'] }',
              stdout='#{ job['stdout'] }',
              stderr='#{ job['stderr'] }'
            where jid=#{ job['jid'] };
        sql
        execute sql
#--}}}
      end
      def jobisdone job
#--{{{
        sql = <<-sql
          update jobs 
            set
              state = '#{ job['state'] }',
              exit_status = '#{ job['exit_status'] }',
              finished = '#{ job['finished'] }',
              elapsed = '#{ job['elapsed'] }'
            where jid = #{ job['jid'] };
        sql
        execute sql
#--}}}
      end
      def getdeadjobs(started, &block)
#--{{{
        ret = nil
        sql = <<-sql
          select * from jobs 
            where 
              state = 'running' and 
              runner='#{ Util::hostname }' and 
              started<='#{ started }'
        sql
        if block
          execute(sql, &block)
        else
          ret = execute(sql)
        end
        ret
#--}}}
      end
      def jobisdead job
#--{{{
        jid = job['jid']
        if jid
          sql = "update jobs set state='dead' where jid='#{ jid }'"
          execute(sql){}
        end
        job
#--}}}
      end

      def transaction(*args)
#--{{{
        raise "cannot upgrade ro_transaction" if @in_ro_transaction
        ret = nil
        if @in_transaction
          ret = yield
        else
          begin
            @in_transaction = true
            @qdb.transaction(*args){ ret = yield }
          ensure
            @in_transaction = false 
          end
        end
        ret
#--}}}
      end
      def ro_transaction(*args)
#--{{{
        ret = nil
        if @in_ro_transaction || @in_transaction
          ret = yield
        else
          begin
            @in_ro_transaction = true
            @qdb.ro_transaction(*args){ ret = yield }
          ensure
            @in_ro_transaction = false 
          end
        end
        ret
#--}}}
      end
      def execute(*args, &block)
#--{{{
        @qdb.execute(*args, &block)
#--}}}
      end
      def integrity_check(*args, &block)
#--{{{
        @qdb.integrity_check(*args, &block)
#--}}}
      end
      def recover!(*args, &block)
#--{{{
        @qdb.recover!(*args, &block)
#--}}}
      end
      def lock(*args, &block)
#--{{{
        @qdb.lock(*args, &block)
#--}}}
      end
      def abort_transaction(*a,&b)
#--{{{
        @qdb.abort_transaction(*a,&b)
#--}}}
      end
      def rollback_transaction(*a,&b)
#--{{{
        @qdb.rollback_transaction(*a,&b)
#--}}}
      end

      def snapshot qtmp = "#{ @basename }.snapshot", retries = nil 
#--{{{
        qtmp ||= "#{ @basename }.snapshot"
        debug{ "snapshot <#{ @path }> -> <#{ qtmp }>" }
        retries = Integer(retries || 16)
        debug{ "retries <#{ retries }>" }

        qss = nil
        loopno = 0

        take_snapshot = lambda do
          FileUtils::rm_rf qtmp
          FileUtils::mkdir_p qtmp
          %w(db db.schema lock).each do |base|
            src, dest = File::join(@path, base), File::join(qtmp, base)
            debug{ "cp <#{ src }> -> <#{ dest }>" }
            FileUtils::cp(src, dest)
          end
          ss = klass::new qtmp, @opts
          if ss.integrity_check
            ss
          else
            begin; recover! unless integrity_check; rescue; nil; end
            ss.recover!
          end
        end

        loop do
          break if loopno >= retries
          if((ss = take_snapshot.call))
            debug{ "snapshot <#{ qtmp }> created" }
            qss = ss
            break
          else
            debug{ "failure <#{ loopno + 1}> of <#{ retries }> attempts to create snapshot <#{ qtmp }> - retrying..." }
          end
          loopno += 1
        end

        unless qss
          debug{ "locking <#{ @path }> as last resort" }
          @qdb.write_lock do
            if((ss = take_snapshot.call))
              debug{ "snapshot <#{ qtmp }> created" }
              qss = ss
            else
              raise "failed <#{ loopno }> times to create snapshot <#{ qtmp }>"
            end
          end
        end

        qss
#--}}}
      end

# TODO - use mtime to optimize checks by feeder??
      def mtime
#--{{{
        File::stat(@path).mtime
#--}}}
      end
      def []= key, value
#--{{{
        sql = "select count(*) from attributes where key='#{ key }';"
        tuples = @qdb.execute sql
        tuple = tuples.first
        count = Integer tuple['count(*)']
        case count
          when 0
            sql = "insert into attributes values('#{ key }','#{ value }');"
            @qdb.execute sql
          when 1
            sql = "update attributes set key='#{ key }', value='#{ value }' where key='#{ key }';"
            @qdb.execute sql
          else
            raise "key <#{ key }> has become corrupt!"
        end
#--}}}
      end
      def attributes 
#--{{{
        h = {}
        tuples = @qdb.execute "select * from attributes;"
        tuples.map!{|t| h[t['key']] = t['value']}
        h
#--}}}
      end
#--}}}
    end # class JobQueue
#--}}}
  end # module RQ
$__rq_jobqueue__ = __FILE__ 
end