module PerfectQueue class RDBBackend < Backend def initialize(uri, table) require 'sequel' @uri = uri @table = table @db = Sequel.connect(@uri) init_db(@uri.split('//',2)[0]) end private def init_db(type) sql = '' case type when /mysql/i sql << "CREATE TABLE IF NOT EXISTS `#{@table}` (" sql << " id VARCHAR(256) NOT NULL," sql << " timeout INT NOT NULL," sql << " data BLOB NOT NULL," sql << " created_at INT," sql << " PRIMARY KEY (id)" sql << ") ENGINE=INNODB;" else sql << "CREATE TABLE IF NOT EXISTS `#{@table}` (" sql << " id VARCHAR(256) NOT NULL," sql << " timeout INT NOT NULL," sql << " data BLOB NOT NULL," sql << " created_at INT," sql << " PRIMARY KEY (id)" sql << ");" end # TODO index connect { @db.run sql } end def connect(&block) begin block.call ensure @db.disconnect end end public def list(&block) @db.fetch("SELECT id, timeout, data, created_at FROM `#{@table}` WHERE created_at IS NOT NULL ORDER BY timeout ASC;") {|row| yield row[:id], row[:created_at], row[:data], row[:timeout] } end MAX_SELECT_ROW = 4 def acquire(timeout, now=Time.now.to_i) connect { while true rows = 0 @db.fetch("SELECT id, timeout, data, created_at FROM `#{@table}` WHERE timeout <= ? ORDER BY timeout ASC LIMIT #{MAX_SELECT_ROW};", now) {|row| unless row[:created_at] # finished/canceled task @db["DELETE FROM `#{@table}` WHERE id=?;", row[:id]].delete else n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND timeout=?;", timeout, row[:id], row[:timeout]].update if n > 0 return row[:id], Task.new(row[:id], row[:created_at], row[:data]) end end rows += 1 } if rows < MAX_SELECT_ROW return nil end end } end def finish(id, delete_timeout=3600, now=Time.now.to_i) connect { n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout, id].update return n > 0 } end def update(id, timeout) connect { n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND created_at IS NOT NULL;", timeout, id].update if n <= 0 raise CanceledError, "Task id=#{id} is canceled." end return nil } end def cancel(id, delete_timeout=3600, now=Time.now.to_i) finish(id, delete_timeout, now) end def submit(id, data, time=Time.now.to_i) connect { begin n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at) VALUES (?, ?, ?, ?);", id, time, data, time].insert return true rescue Sequel::DatabaseError return nil end } end end end