// Copyright (C) 2016 Davis E. King (davis@dlib.net) // License: Boost Software License See LICENSE.txt for the full license. #ifndef DLIB_SUBPROCeSS_STREAM_H_ #define DLIB_SUBPROCeSS_STREAM_H_ #include <utility> #include <unistd.h> #include <iostream> #include <memory> #include <dlib/matrix.h> #include <sys/types.h> #include <sys/socket.h> namespace dlib { // -------------------------------------------------------------------------------------- // Call dlib's serialize and deserialize by default. The point of this version of // serialize is to do something fast that normally we wouldn't do, like directly copy // memory. This is safe since this is an interprocess communication happening the same // machine. template <typename T> void interprocess_serialize ( const T& item, std::ostream& out) { serialize(item, out); } template <typename T> void interprocess_deserialize (T& item, std::istream& in) { deserialize(item, in); } // But have overloads for direct memory copies for some types since this is faster than // their default serialization. template <typename T, long NR, long NC, typename MM, typename L> void interprocess_serialize(const dlib::matrix<T,NR,NC,MM,L>& item, std::ostream& out) { dlib::serialize(item.nr(), out); dlib::serialize(item.nc(), out); if (item.size() != 0) out.write((const char*)&item(0,0), sizeof(T)*item.size()); if (!out) throw dlib::serialization_error("Error writing matrix to interprocess iostream."); } template <typename T, long NR, long NC, typename MM, typename L> void interprocess_deserialize(dlib::matrix<T,NR,NC,MM,L>& item, std::istream& in) { long nr, nc; dlib::deserialize(nr, in); dlib::deserialize(nc, in); item.set_size(nr,nc); if (item.size() != 0) in.read((char*)&item(0,0), sizeof(T)*item.size()); if (!in) throw dlib::serialization_error("Error reading matrix from interprocess iostream."); } // ---------------------------------------------------------------------------------------- namespace impl{ std::iostream& get_data_iostream(); } inline void send_to_parent_process() {impl::get_data_iostream().flush();} template <typename U, typename ...T> void send_to_parent_process(U&& arg1, T&& ...args) /*! ensures - sends all the arguments to send_to_parent_process() to the parent process by serializing them with interprocess_serialize(). !*/ { interprocess_serialize(arg1, impl::get_data_iostream()); send_to_parent_process(std::forward<T>(args)...); if (!impl::get_data_iostream()) throw dlib::error("Error sending object to parent process."); } inline void receive_from_parent_process() {} template <typename U, typename ...T> void receive_from_parent_process(U&& arg1, T&& ...args) /*! ensures - receives all the arguments to receive_from_parent_process() from the parent process by deserializing them from interprocess_serialize(). !*/ { interprocess_deserialize(arg1, impl::get_data_iostream()); receive_from_parent_process(std::forward<T>(args)...); if (!impl::get_data_iostream()) throw dlib::error("Error receiving object from parent process."); } // ---------------------------------------------------------------------------------------- class filestreambuf; class subprocess_stream : noncopyable { /*! WHAT THIS OBJECT REPRESENTS This is a tool for spawning a subprocess and communicating with it. Here is an example: subprocess_stream s("/usr/bin/some_program"); s.send(obj1, obj2, obj3); s.receive(obj4, obj5); s.wait(); // wait for sub process to terminate Then in the sub process you would have: receive_from_parent_process(obj1, obj2, obj3); // do stuff cout << "echo this text to parent cout" << endl; send_to_parent_process(obj4, obj5); Additionally, if the sub process writes to its standard out then that will be echoed to std::cout in the parent process. Writing to std::cerr or returning a non-zero value from main will also be noted by the parent process and an appropriate exception will be thrown. !*/ public: explicit subprocess_stream( const char* program_name ); /*! ensures - spawns a sub process by executing the file with the given program_name. !*/ ~subprocess_stream( ); /*! ensures - calls wait(). Note that the destructor never throws even though wait() can. If an exception is thrown by wait() it is just logged to std::cerr. !*/ void wait( ); /*! ensures - closes the input stream to the child process and then waits for the child to terminate. - If the child returns an error (by returning != 0 from its main) or outputs to its standard error then wait() throws a dlib::error() with the standard error output in it. !*/ int get_child_pid() const { return child_pid; } /*! ensures - returns the PID of the child process !*/ template <typename U, typename ...T> void send(U&& arg1, T&& ...args) /*! ensures - sends all the arguments to send() to the subprocess by serializing them with interprocess_serialize(). !*/ { interprocess_serialize(arg1, iosub); send(std::forward<T>(args)...); if (!iosub) { std::ostringstream sout; sout << stderr.rdbuf(); throw dlib::error("Error sending object to child process.\n" + sout.str()); } } void send() {iosub.flush();} template <typename U, typename ...T> void receive(U&& arg1, T&& ...args) /*! ensures - receives all the arguments to receive() to the subprocess by deserializing them with interprocess_deserialize(). !*/ { interprocess_deserialize(arg1, iosub); receive(std::forward<T>(args)...); if (!iosub) { std::ostringstream sout; sout << stderr.rdbuf(); throw dlib::error("Error receiving object from child process.\n" + sout.str() ); } } void receive() {} private: void send_eof(); class cpipe : noncopyable { private: int fd[2]; public: cpipe() { if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fd)) throw dlib::error("Failed to create pipe"); } ~cpipe() { close(); } int parent_fd() const { return fd[0]; } int child_fd() const { return fd[1]; } void close() { ::close(fd[0]); ::close(fd[1]); } }; cpipe data_pipe; cpipe stdout_pipe; cpipe stderr_pipe; bool wait_called = false; std::unique_ptr<filestreambuf> inout_buf; std::unique_ptr<filestreambuf> err_buf; int child_pid = -1; std::istream stderr; std::iostream iosub; }; } // ---------------------------------------------------------------------------------------- #endif // DLIB_SUBPROCeSS_STREAM_H_