<?php

/**
 * Interface for Danga's Gearman job scheduling system
 *
 * PHP version 5.1.0+
 *
 * LICENSE: This source file is subject to the New BSD license that is 
 * available through the world-wide-web at the following URI:
 * http://www.opensource.org/licenses/bsd-license.php. If you did not receive  
 * a copy of the New BSD License and are unable to obtain it through the web, 
 * please send a note to license@php.net so we can mail you a copy immediately.
 *
 * @category  Net
 * @package   Net_Gearman
 * @author    Joe Stump <joe@joestump.net> 
 * @copyright 2007-2008 Digg.com, Inc.
 * @license   http://www.opensource.org/licenses/bsd-license.php New BSD License
 * @version   CVS: $Id$
 * @link      http://pear.php.net/package/Net_Gearman
 * @link      http://www.danga.com/gearman/
 */

require_once 'Net/Gearman/Connection.php';

/**
 * A client for managing Gearmand servers
 *
 * This class implements the administrative text protocol used by Gearman to do
 * a number of administrative tasks such as collecting stats on workers, the
 * queue, shutting down the server, version, etc.
 *
 * @category  Net
 * @package   Net_Gearman
 * @author    Joe Stump <joe@joestump.net> 
 * @copyright 2007-2008 Digg.com, Inc.
 * @license   http://www.opensource.org/licenses/bsd-license.php New BSD License
 * @link      http://www.danga.com/gearman/
 */
class Net_Gearman_Manager
{
    /**
     * Connection resource
     *
     * @var resource $conn Connection to Gearman server
     * @see Net_Gearman_Manager::sendCommand()
     * @see Net_Gearman_Manager::recvCommand()
     */
    protected $conn = null;

    /**
     * The server is shutdown
     *
     * We obviously can't send more commands to a server after it's been shut
     * down. This is set to true in Net_Gearman_Manager::shutdown() and then
     * checked in Net_Gearman_Manager::sendCommand().
     *
     * @var boolean $shutdown 
     */ 
    protected $shutdown = false;

    /**
     * Constructor
     *
     * @param string  $server  Host and port (e.g. 'localhost:7003')
     * @param integer $timeout Connection timeout
     *
     * @throws Net_Gearman_Exception
     * @see Net_Gearman_Manager::$conn
     */
    public function __construct($server, $timeout = 5)
    {
        if (strpos($server, ':')) {
            list($host, $port) = explode(':', $server);
        } else {
            $host = $server;
            $port = 7003;
        }

        $errCode    = 0;
        $errMsg     = '';
        $this->conn = @fsockopen($host, $port, $errCode, $errMsg, $timeout);
        if ($this->conn === false) {
            throw new Net_Gearman_Exception(
                'Could not connect to ' . $host . ':' . $port
            );
        } 
    }

    /**
     * Get the version of Gearman running
     *
     * @return string
     * @see Net_Gearman_Manager::sendCommand()
     * @see Net_Gearman_Manager::checkForError()
     */
    public function version()
    {
        $this->sendCommand('version');
        $res = fgets($this->conn, 4096);
        $this->checkForError($res);
        return trim($res);
    }

    /**
     * Shut down Gearman
     *
     * @param boolean $graceful Whether it should be a graceful shutdown
     *
     * @return boolean
     * @see Net_Gearman_Manager::sendCommand()
     * @see Net_Gearman_Manager::checkForError()
     * @see Net_Gearman_Manager::$shutdown
     */
    public function shutdown($graceful = false)
    {
        $cmd = ($graceful) ? 'shutdown graceful' : 'shutdown';
        $this->sendCommand($cmd);
        $res = fgets($this->conn, 4096);
        $this->checkForError($res);
        
        $this->shutdown = (trim($res) == 'OK');
        return $this->shutdown;
    }

    /**
     * Get worker status and info
     *
     * Returns the file descriptor, IP address, client ID and the abilities
     * that the worker has announced.
     *
     * @return array A list of workers connected to the server
     * @throws Net_Gearman_Exception
     */
    public function workers()
    {
        $this->sendCommand('workers');
        $res     = $this->recvCommand();
        $workers = array();
        $tmp     = explode("\n", $res);
        foreach ($tmp as $t) {
            if (!Net_Gearman_Connection::stringLength($t)) {
                continue;
            }

            list($info, $abilities) = explode(" : ", $t);
            list($fd, $ip, $id)     = explode(' ', $info);

            $abilities = trim($abilities);

            $workers[] = array(
                'fd' => $fd,
                'ip' => $ip,
                'id' => $id,
                'abilities' => empty($abilities) ? array() : explode(' ', $abilities)
            );
        }

        return $workers;
    }

    /**
     * Set maximum queue size for a function
     *
     * For a given function of job, the maximum queue size is adjusted to be 
     * max_queue_size jobs long. A negative value indicates unlimited queue 
     * size.
     * 
     * If the max_queue_size value is not supplied then it is unset (and the 
     * default maximum queue size will apply to this function).
     *
     * @param string  $function Name of function to set queue size for
     * @param integer $size     New size of queue
     *
     * @return boolean
     * @throws Net_Gearman_Exception
     */
    public function setMaxQueueSize($function, $size)
    {
        if (!is_numeric($size)) {
            throw new Net_Gearman_Exception('Queue size must be numeric');
        }

        if (preg_match('/[^a-z0-9_]/i', $function)) {
            throw new Net_Gearman_Exception('Invalid function name');
        }

        $this->sendCommand('maxqueue ' . $function . ' ' . $size);
        $res = fgets($this->conn, 4096);
        $this->checkForError($res); 
        return (trim($res) == 'OK');
    }

    /**
     * Get queue/worker status by function
     *
     * This function queries for queue status. The array returned is keyed by
     * the function (job) name and has how many jobs are in the queue, how 
     * many jobs are running and how many workers are capable of performing 
     * that job.
     *
     * @return array An array keyed by function name 
     * @throws Net_Gearman_Exception
     */
    public function status()
    {
        $this->sendCommand('status');
        $res = $this->recvCommand();

        $status = array();
        $tmp    = explode("\n", $res);
        foreach ($tmp as $t) {
            if (!Net_Gearman_Connection::stringLength($t)) {
                continue;
            }

            list($func, $inQueue, $jobsRunning, $capable) = explode("\t", $t);

            $status[$func] = array(
                'in_queue' => $inQueue,
                'jobs_running' => $jobsRunning,
                'capable_workers' => $capable
            );
        }

        return $status;
    }

    /**
     * Send a command
     *
     * @param string $cmd The command to send
     *
     * @return void
     * @throws Net_Gearman_Exception
     */
    protected function sendCommand($cmd)
    {
        if ($this->shutdown) {
            throw new Net_Gearman_Exception('This server has been shut down');
        }

        fwrite($this->conn, 
               $cmd . "\r\n", 
               Net_Gearman_Connection::stringLength($cmd . "\r\n"));
    }

    /**
     * Receive a response
     *
     * For most commands Gearman returns a bunch of lines and ends the 
     * transmission of data with a single line of ".\n". This command reads
     * in everything until ".\n". If the command being sent is NOT ended with
     * ".\n" DO NOT use this command.
     *
     * @throws Net_Gearman_Exception
     * @return string
     */
    protected function recvCommand()
    {
        $ret = '';
        while (true) {
            $data = fgets($this->conn, 4096);
            $this->checkForError($data);
            if ($data == ".\n") {
                break;
            }

            $ret .= $data;
        }

        return $ret;
    }

    /**
     * Check for an error
     *
     * Gearman returns errors in the format of 'ERR code_here Message+here'. 
     * This method checks returned values from the server for this error format
     * and will throw the appropriate exception.
     *
     * @param string $data The returned data to check for an error
     *
     * @return void
     * @throws Net_Gearman_Exception
     */
    protected function checkForError($data)
    {
        $data = trim($data); 
        if (preg_match('/^ERR/', $data)) {
            list(, $code, $msg) = explode(' ', $data);
            throw new Net_Gearman_Exception($msg, urldecode($code));
        }
    }

    /**
     * Disconnect from server
     *
     * @return void
     * @see Net_Gearman_Manager::$conn
     */
    public function disconnect()
    {
        if (is_resource($this->conn)) {
            fclose($this->conn);
        }
    }

    /**
     * Destructor
     *
     * @return void
     */
    public function __destruct()
    {
        $this->disconnect();
    }
}

?>