_mbiterable = false; $this->_throwErrors = true; $this->_serializer = new TextCommandSerializer(); } public function write(IConnectionComposable $connection, ICommand $command) { $connection->writeBytes($this->_serializer->serialize($command)); } public function read(IConnectionComposable $connection) { $chunk = $connection->readLine(); $prefix = $chunk[0]; $payload = substr($chunk, 1); switch ($prefix) { case '+': // inline switch ($payload) { case 'OK': return true; case 'QUEUED': return new ResponseQueued(); default: return $payload; } case '$': // bulk $size = (int) $payload; if ($size === -1) { return null; } return substr($connection->readBytes($size + 2), 0, -2); case '*': // multi bulk $count = (int) $payload; if ($count === -1) { return null; } if ($this->_mbiterable == true) { return new MultiBulkResponseSimple($connection, $count); } $multibulk = array(); for ($i = 0; $i < $count; $i++) { $multibulk[$i] = $this->read($connection); } return $multibulk; case ':': // integer return (int) $payload; case '-': // error if ($this->_throwErrors) { throw new ServerException($payload); } return new ResponseError($payload); default: Helpers::onCommunicationException(new ProtocolException( $connection, "Unknown prefix: '$prefix'" )); } } public function setOption($option, $value) { switch ($option) { case 'iterable_multibulk': $this->_mbiterable = (bool) $value; break; case 'throw_errors': $this->_throwErrors = (bool) $value; break; } } }