unless defined? $__rq_jobqueue__
  module RQ 
#{{{
    LIBDIR = File::dirname(File::expand_path(__FILE__)) + File::SEPARATOR unless
      defined? LIBDIR

    require LIBDIR + 'util'
    require LIBDIR + 'logging'
    require LIBDIR + 'qdb'

    class  JobQueue
#{{{
      include Logging
      include Util
      class Error < StandardError; end
    
      MAX_JID = 2 ** 20 
    
      class << self
    #{{{
        def create path, opts = {}
    #{{{
          FileUtils::rm_rf path
          FileUtils::mkdir_p path
          db = File::join path, 'db'
          qdb = QDB.create db, opts
          opts['qdb'] = qdb
          q = new path, opts
          q
    #}}}
        end
    #}}}
      end
      attr :path
      attr :opts
      attr :qdb
      alias :db :qdb
      def initialize path, opts = {}
    #{{{
        @path = path
        @opts = opts
        raise "q <#{ @path }> does not exist" unless test ?e, @path
        raise "q <#{ @path }> is not a directory" unless test ?d, @path
        @basename = File::basename(@path)
        @dirname = File::dirname(@path)
        @logger = getopt('logger', opts) || Logger::new(STDERR)
        @qdb = getopt('qdb', opts) || QDB::new(File::join(@path, 'db'), 'logger' => @logger)
    #}}}
      end
      def submit(*jobs)
    #{{{
        now = Util::timestamp Time.now
        tuples = nil
        sql = ''
    
        jobs.each do |job|
          raise "no command for job <#{ job.inspect }>" unless job['command']

          tuple = QDB::tuple

          tuple['command'] = job['command']
          tuple['priority'] = job['priority'] || 0 
          tuple['tag'] = job['tag']
          tuple['state'] = 'pending'
          tuple['submitted'] = now
          tuple['submitter'] = Util::hostname 

          values = QDB::q tuple

          sql << "insert into jobs values (#{ values.join ',' });\n"
        end
    
        tuples = nil
        transaction do
          execute(sql){}
          if block_given?
            sql = "select * from jobs where submitted = '#{ now }'"
            tuples = execute sql
          end
        end
        tuples.each{|t| yield t} if tuples
        tuples = nil
    
        self
    #}}}
      end
      def list(*whats)
    #{{{
        whats.replace(['pending', 'running', 'finished', 'dead']) if 
          whats.empty? or whats.include?('all')
    
        whats.map! do |what|
          case what
            when %r/^\s*p/io
              'pending'
            when %r/^\s*r/io
              'running'
            when %r/^\s*f/io
              'finished'
            when %r/^\s*d/io
              'dead'
            else
              what
          end
        end

        where_clauses = [] 

        whats.each do |what|
          if what =~ %r/^\s*\d+\s*$/o
            where_clauses << "jid=#{ QDB::q what }\n"
          else
            where_clauses << "state=#{ QDB::q what }\n"
          end
        end

        where_clause = where_clauses.join(" or \n")

        sql = <<-sql
          select * from jobs
          where #{ where_clause } 
        sql
    
        tuples = ro_transaction{ execute sql }
        puts '---'
        fields = @qdb.fields
        tuples.each do |tuple|
          puts '-'
          fields.each{|f| puts " #{ f }: #{ tuple[f] }" }
        end
        tuples = nil
    
        self
    #}}}
      end
      def status(*whats)
    #{{{
        whats.replace(['pending', 'running', 'finished', 'dead']) if 
          whats.empty? or whats.include?('all')
    
        whats.map! do |what|
          case what
            when %r/^\s*p/io
              'pending'
            when %r/^\s*r/io
              'running'
            when %r/^\s*f/io
              'finished'
            when %r/^\s*d/io
              'dead'
            else
              what
          end
        end
    
        puts '---'
        ro_transaction do
          whats.each do |what|
            sql = <<-sql
              select count(*) from jobs where state = '#{ what }';
            sql
            tuples = execute sql 
            tuple = tuples.first
            count = (tuple ? tuple.first : 0)
            puts "#{ what } : #{ count }"
          end
        end
    
        self
    #}}}
      end
      def query where_clause = nil
    #{{{
        sql = 
          if where_clause

          #
          # turn =~ into like clauses 
          #
            #where_clause.gsub!(/(=~\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "like '%#{ q }%'"}
          #
          # quote everything on the rhs of an '=' sign - helps with shell problems...
          #
            #where_clause.gsub!(/(==?\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "='#{ q }'"}

            "select * from jobs where #{ where_clause };"
          else
            "select * from jobs;"
          end

        tuples = ro_transaction{ execute sql }

        puts '---'
        fields = @qdb.fields
        tuples.each do |tuple|
          puts '-'
          fields.each{|f| puts " #{ f }: #{ tuple[f] }"}
        end
        tuples = nil
    
        self
    #}}}
      end
      def delete(*jids)
    #{{{
        what = jids.first || 'all'
    
        case what
          when String
            sql =
              case what
                when %r/^\s*p/io
                  "delete from jobs where state = 'pending';"
                when %r/^\s*r/io
                  "delete from jobs where state = 'running';"
                when %r/^\s*f/io
                  "delete from jobs where state = 'finished';"
                when %r/^\s*d/io
                  "delete from jobs where state = 'dead';"
                when %r/^\s*a/io
                  "delete from jobs;"
                else
                  raise ArgumentError, "cannot delete <#{ what }>"
              end
          else
            sql = ''
            jids.each do |jid|
              jid = Integer jid
              sql << "delete from jobs where jid = #{ jid };"
            end
        end
        transaction{ execute sql }
        sql = nil
        @qdb.vacuum
        self
    #}}}
      end
      def update(kvs,*jids)
    #{{{
        allowed = %w(priority command tag)
        kvs.keys.each do |key|
          raise "update of <#{ key }> not allowed" unless
            allowed.include? key
        end

        raise "no jids" if jids.empty?
        jids.sort!
        jids.uniq!

        pending = jids.delete 'pending'
    
        update_clause = kvs.map{|k,v| "#{ k }='#{ v }'"}.join(",\n")

        if pending
          update_sql = 
            "update jobs\n" <<
            "set\n#{ update_clause }\n" <<
            "where\nstate='pending'"
          select_sql = "select * from jobs where state='pending'"
        else
          where_clause = jids.map{|jid| "jid=#{ jid }"}.join(" or\n")
          update_sql = 
            "update jobs\n" <<
            "set\n#{ update_clause }\n" <<
            "where\nstate='pending' and\n#{ where_clause }"
          select_sql = "select * from jobs where #{ where_clause }"
        end

        tuples = nil 
        transaction do 
          execute update_sql
          tuples = execute select_sql 
        end
        tuples or []
    #}}}
      end
      def transaction(*args, &block)
    #{{{
        @qdb.transaction(*args, &block)
    #}}}
      end
      def ro_transaction &block
    #{{{
        @qdb.ro_transaction &block
    #}}}
      end
      def execute(*args, &block)
    #{{{
        @qdb.execute(*args, &block)
    #}}}
      end
      def integrity_check(*args, &block)
    #{{{
        @qdb.integrity_check(*args, &block)
    #}}}
      end
      def snapshot qtmp = "#{ @basename }.snapshot", retries = nil 
    #{{{
        qtmp ||= "#{ @basename }.snapshot"
        debug{ "snapshot <#{ @path }> -> <#{ qtmp }>" }
        retries = Integer(retries || 16)
        debug{ "retries <#{ retries }>" }

        qss = nil
        loopno = 0

        take_snapshot = lambda do
          FileUtils::rm_rf qtmp
          FileUtils::mkdir_p qtmp
          %w(db db.schema lock).each do |base|
            src, dest = File::join(@path, base), File::join(qtmp, base)
            debug{ "cp <#{ src }> -> <#{ dest }>" }
            FileUtils::cp(src, dest)
          end
          ss = klass::new qtmp, @opts
          if ss.integrity_check
            ss
          else
            nil
          end
        end

        loop do
          break if loopno >= retries
          if((ss = take_snapshot.call))
            debug{ "snapshot <#{ qtmp }> created" }
            qss = ss
            break
          else
            debug{ "failure <#{ loopno + 1}> of <#{ retries }> attempts to create snapshot <#{ qtmp }> - retrying..." }
          end
          loopno += 1
        end

        unless qss
          debug{ "locking <#{ @path }> as last resort" }
          @qdb.write_lock do
            if((ss = take_snapshot.call))
              debug{ "snapshot <#{ qtmp }> created" }
              qss = ss
            else
              raise "failed <#{ loopno }> times to create snapshot <#{ qtmp }>"
            end
          end
        end

        qss
    #}}}
      end
      def lock(*args, &block)
    #{{{
        @qdb.lock(*args, &block)
    #}}}
      end
      def getjob
    #{{{
        sql = <<-sql
          select * from jobs 
            where state = 'pending' 
            order by priority desc, submitted asc, jid asc
            limit 1;
        sql
        tuples = execute sql
        job = tuples.first
        job
    #}}}
      end
      def jobisrunning job 
    #{{{
        sql = <<-sql
          update jobs 
            set
              pid = '#{ job['pid'] }',
              state = '#{ job['state'] }',
              started = '#{ job['started'] }',
              runner = '#{ job['runner'] }'
            where jid = #{ job['jid'] };
        sql
        execute sql
    #}}}
      end
      def jobisdone job
    #{{{
        sql = <<-sql
          update jobs 
            set
              state = '#{ job['state'] }',
              exit_status = '#{ job['exit_status'] }',
              finished = '#{ job['finished'] }',
              elapsed = '#{ job['elapsed'] }'
            where jid = #{ job['jid'] };
        sql
        execute sql
    #}}}
      end
      def mtime
    #{{{
        File::stat(@path).mtime
    #}}}
      end
      def []= key, value
    #{{{
        sql = "select count(*) from attributes where key='#{ key }';"
        tuples = @qdb.execute sql
        tuple = tuples.first
        count = Integer tuple['count(*)']
        case count
          when 0
            sql = "insert into attributes values('#{ key }','#{ value }');"
            @qdb.execute sql
          when 1
            sql = "update attributes set key='#{ key }', value='#{ value }' where key='#{ key }';"
            @qdb.execute sql
          else
            raise "key <#{ key }> has become corrupt!"
        end
    #}}}
      end
      def attributes 
    #{{{
        h = {}
        tuples = @qdb.execute "select * from attributes;"
        tuples.map!{|t| h[t['key']] = t['value']}
        h
    #}}}
      end
#}}}
    end # class JobQueue
#}}}
  end # module RQ
$__rq_jobqueue__ = __FILE__ 
end