= PerfectQueue Highly available distributed queue. It provides exactly-once semantics unless backend database fails. Pushed tasks are surely retried by another worker node even if a worker fails. And it never delivers finished/canceled tasks. Backend database is pluggable. You can use any databases that supports CAS (compare-and-swap) operation. PerfectQueue supports RDBMS and Amazon SimpleDB for now. == Architecture PerfectQueue uses following database schema: ( id:string -- unique identifier of the task data:blob -- additional attributes of the task created_at:int -- unix time when the task is created (or null for canceled tasks) timeout:int ) 1. list: lists tasks whose timeout column is old enough. 2. lock: updates timeout column of the first task 3. run: executes a command * if the task takes long time, updates the timeout column. this is repeated until the task is finished * if the task takes more long time, kills the process 4. remove: if it succeeded, removes the row from the backend database 5. or leave: if it failed, leave the row and expect to be retried == Usage === Submitting a task *Using* *command* *line:* # RDBMS $ perfectqueue \ --database mysql://user:password@localhost/mydb \ --table perfectqueue \ --push unique-key-id='{"any":"data"}' # SimpleDB $ perfectqueue \ --simpledb your-simpledb-domain-name \ -k AWS_KEY_ID \ -s AWS_SECRET_KEY \ --push unique-key-id='{"any":"data"}' *Using* *PerfectQueue* *library:* require 'perfectqueue' # RDBMS require 'perfectqueue/backend/rdb' queue = PerfectQueue::Backend::RDBBackend.new( 'mysql://user:password@localhost/mydb', table='perfectqueue') # SimpleDB require 'perfectqueue/backend/simpledb' queue = PerfectQueue::Backend::SimpleDBBackend.new( 'AWS_KEY_ID', 'AWS_SECRET_KEY', 'your-simpledb-domain-name') queue.submit('unique-key-id', '{"any":"data"}') Alternatively, you can insert a row into the backend database directly. *RDBMS:* > CREATE TABLE IF NOT EXISTS perfectqueue ( id VARCHAR(256) NOT NULL, timeout INT NOT NULL, data BLOB NOT NULL, created_at INT, PRIMARY KEY (id) ); > SET @now = UNIX_TIMESTAMP(); > INSERT INTO perfectqueue (id, timeout, data, created_at) VALUES ('unique-task-id', @now, '{"any":"data"}', @now); *SimpleDB:* require 'aws' # gem install aws-sdk queue = AWS::SimpleDB.new domain = queue.domains['your-simpledb-domain-name'] now = "%08x" % Time.now.to_i domain.items['unique-task-id'].attributes.replace( 'timeout'=>now, 'data'=>'{"any":"data"}', 'created_at'=>now, :unless=>'timeout') === Canceling a queued task *Using* *command* *line:* $ perfectqueue ... --cancel unique-key-id *Using* *PerfectQueue* *library:* queue.cancel('unique-key-id') Alternatively, you can delete a row from the backend database directly. *RDBMS:* > DELETE FROM perfectqueue WHERE id='unique-key-id'; *SimpleDB:* domain.items['unique-task-id'].delete === Running worker node Use _perfectqueue_ command to execute a command. Usage: perfectqueue [options] [-- ] --push ID=DATA Push a task to the queue --list Show queued tasks --cancel ID Cancel a queued task --configure PATH.yaml Write configuration file --exec COMMAND Execute command --run SCRIPT.rb Run method named 'run' defined in the script -f, --file PATH.yaml Read configuration file -C, --run-class Class name for --run (default: ::Run) -t, --timeout SEC Time for another worker to take over a task when this worker goes down (default: 600) -b, --heartbeat-interval SEC Threshold time to extend the timeout (heartbeat interval) (default: timeout * 3/4) -x, --kill-timeout SEC Threshold time to kill a task process (default: timeout * 10) -X, --kill-interval SEC Threshold time to retry killing a task process (default: 60) -i, --poll-interval SEC Polling interval (default: 1) -r, --retry-wait SEC Time to retry a task when it is failed (default: same as timeout) -e, --expire SEC Threshold time to expire a task (default: 345600 (4days)) --database URI Use RDBMS for the backend database (e.g.: mysql://user:password@localhost/mydb) --table NAME backend: name of the table (default: perfectqueue) --simpledb DOMAIN Use Amazon SimpleDB for the backend database (e.g.: --simpledb mydomain -k KEY_ID -s SEC_KEY) -k, --key-id ID AWS Access Key ID -s, --secret-key KEY AWS Secret Access Key -w, --worker NUM Number of worker threads (default: 1) -d, --daemon PIDFILE Daemonize (default: foreground) -o, --log PATH log file path -v, --verbose verbose mode ==== --exec Execute a command when a task is received. The the data column is passed to the stdin and the id column passed to the last argument. The command have to exit with status code 0 when it succeeded. *Example:* #!/usr/bin/env ruby require 'json' js = JSON.load(STDIN.read) puts "received: id=#{ARGV.last} #{js.inspect}" #$ perfectqueue --database sqlite://test.db \ --exec ./this_file -- my cmd args When the kill timeout (-x, --kill-timeout) is elapsed, SIGTERM signal will be sent to the child process. The signal will be repeated every few seconds (-X, --kill-interval). ==== --run This is same as 'exec' except that it creates a instance of a class named 'Run' defined in the file. The class should has 'initialize(task)', 'run' and 'kill' methods. You can get data column and id column of the task from the argument of the initialize method. It is assumed it succeeded if the method doesn't raise any errors. *Example:* require 'json' class Run def initialize(task) @task = task end def run js = JSON.load(@task.data) puts "received: id=#{@task.id} #{js.inspect}" end def kill puts "kill!" end end #$ perfectqueue --database sqlite://test.db \ --run ./this_file.rb -- my cmd args When the kill timeout (-x, --klill-timeout) is elapsed, Run#kill method will be called (if it is defined). It will be repeated every few seconds (-X, --kill-retry). ==== --configure Write configuration file and exit. Written configuration file can be used with -f option: *Example:* ## create myqueue.yaml file $ perfectqueue --database mysql://root:my@localhost/mydb \ --run myrun.rb -- my cmd args \ --configure myqueue.yaml ## run perfectqueue using the configuration file $ perfectqueue -f myqueue.yaml ==== --list Show queued tasks. *Example:* $ perfectqueue --database sqlite://test.db --list id created_at timeout data task1 2011-08-23 23:07:45 +0900 2011-08-23 23:07:45 +0900 {"attr1":"val1","attr":"val2"} 1 entries.