/*================================================================= Copyright (C) 2012 2013 BizStation Corp All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. =================================================================*/ #include "serverPipe.h" #include #include #include #include #include #include #include #include #include #include "IAppModule.h" #include using namespace boost; using namespace boost::asio; using namespace boost::asio::ip; char* getWindowsErrMsg(DWORD ErrorCode) { LPVOID lpMsgBuf; FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, ErrorCode, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language (LPSTR)&lpMsgBuf, 0, NULL); return (char*)lpMsgBuf; } #define PIPE_EOF_ERROR_CODE boost::system::windows_error::broken_pipe #define BUFSIZE 0 namespace bzs { namespace netsvc { namespace server { namespace pipe { void getSecurityAttribute(SECURITY_ATTRIBUTES& sa, SECURITY_DESCRIPTOR& sd) { InitializeSecurityDescriptor(&sd, SECURITY_DESCRIPTOR_REVISION); SetSecurityDescriptorDacl(&sd, TRUE, NULL, FALSE); sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.bInheritHandle = TRUE; sa.lpSecurityDescriptor = &sd; } acceptor::acceptor() : m_fd(0), m_cancel(false) { } void acceptor::open(const std::string& pipeName) { m_pipeName = pipeName; } void acceptor::cancel() { m_cancel = true; } void acceptor::accept(platform_stream& pipe) { m_fd = NULL; SECURITY_DESCRIPTOR sd; SECURITY_ATTRIBUTES sa; getSecurityAttribute(sa, sd); char pipeName[100]; sprintf_s(pipeName, 100, "\\\\.\\pipe\\%s", m_pipeName.c_str()); m_fd = CreateNamedPipe(pipeName, // pipe name PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, // max. instances BUFSIZE, // output buffer size BUFSIZE, // input buffer size 0, // client time-out &sa); // default security attribute if (m_fd == INVALID_HANDLE_VALUE) THROW_BZS_ERROR_WITH_MSG(getWindowsErrMsg(GetLastError())); OVERLAPPED overlapped = { 0 }; overlapped.hEvent = CreateEvent(0, TRUE, FALSE, 0); BOOL ret = ConnectNamedPipe(m_fd, &overlapped); // Connection may be completed by timing at this time. if ((ret == FALSE) && (GetLastError() == ERROR_PIPE_CONNECTED)) { CloseHandle(overlapped.hEvent); pipe.assign(m_fd); return; } if ((ret != FALSE) || (GetLastError() != ERROR_IO_PENDING)) { CloseHandle(overlapped.hEvent); CloseHandle(m_fd); THROW_BZS_ERROR_WITH_MSG("ConnectNamedPipe error"); return; } // A shutdown is supervised periodically. DWORD waitRes; while (1) { if (m_cancel) { CloseHandle(overlapped.hEvent); CloseHandle(m_fd); break; } waitRes = WaitForSingleObject(overlapped.hEvent, 5); if (waitRes == WAIT_OBJECT_0) { CloseHandle(overlapped.hEvent); pipe.assign(m_fd); break; } else if (waitRes == WAIT_TIMEOUT) ; else { CloseHandle(overlapped.hEvent); CloseHandle(m_fd); THROW_BZS_ERROR_WITH_MSG("WaitForSingleObject error"); } } } unsigned int g_connections = 0; unsigned int g_waitThread = 0; // --------------------------------------------------------------------------- // connection // --------------------------------------------------------------------------- class IExitCheckHandler { public: virtual ~IExitCheckHandler(){}; virtual bool isExit() = 0; }; class exitCheckHnadler : public IExitCheckHandler { HANDLE m_procHandle; bool m_cancel; IAppModule* m_module; public: exitCheckHnadler(DWORD procId) : m_cancel(false), m_module(NULL) { m_procHandle = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, procId); } ~exitCheckHnadler() { if (m_procHandle) CloseHandle(m_procHandle); } void setModule(IAppModule* p) { m_module = p; } bool isExit() { if (m_cancel) return true; DWORD ExitCode; if (m_procHandle && GetExitCodeProcess(m_procHandle, &ExitCode)) { if (STILL_ACTIVE != ExitCode) return true; } if (m_module && m_module->isShutDown()) return true; return false; } void setCancel(bool value) { m_cancel = value; }; bool cancel() { return m_cancel; } }; class winEventComm { HANDLE m_recv; HANDLE m_send; bool m_sent; bool m_cancel; public: winEventComm(const char* rcvName, const char* sndName) : m_cancel(false) { SECURITY_DESCRIPTOR sd; SECURITY_ATTRIBUTES sa; getSecurityAttribute(sa, sd); m_recv = CreateEvent(&sa, FALSE, FALSE, rcvName); if (!m_recv) THROW_BZS_ERROR_WITH_MSG(getWindowsErrMsg(GetLastError())); m_send = CreateEvent(&sa, FALSE, FALSE, sndName); if (!m_send) THROW_BZS_ERROR_WITH_MSG(getWindowsErrMsg(GetLastError())); } ~winEventComm() { if (!m_sent) SetEvent(m_send); if (m_recv) CloseHandle(m_recv); if (m_send) CloseHandle(m_send); } bool recv(int checkTimeSpan, IExitCheckHandler* handler) { DWORD wait = WAIT_TIMEOUT; do { if (wait == WAIT_ABANDONED) return false; if (m_cancel || (handler && (handler->isExit()))) return false; } while (WAIT_OBJECT_0 != (wait = WaitForSingleObject(m_recv, checkTimeSpan))); m_sent = false; return true; } bool send() { if (SetEvent(m_send) == 0) return false; m_sent = true; return true; } void chancel() { m_cancel = true; } }; class sharedMem { HANDLE m_mapFile; char* m_readPtr; char* m_writePtr; DWORD m_size; public: sharedMem(const char* name, unsigned int memsize) { SECURITY_DESCRIPTOR sd; SECURITY_ATTRIBUTES sa; getSecurityAttribute(sa, sd); SYSTEM_INFO SystemInfo; GetSystemInfo(&SystemInfo); m_size = memsize / SystemInfo.dwAllocationGranularity + 1; m_size = m_size * SystemInfo.dwAllocationGranularity; m_mapFile = CreateFileMapping(INVALID_HANDLE_VALUE, &sa, PAGE_READWRITE, 0, m_size * 2, name); if (m_mapFile == NULL) THROW_BZS_ERROR_WITH_MSG("CreateFileMapping error"); m_readPtr = (char*)MapViewOfFile(m_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, m_size); if (m_readPtr == NULL) THROW_BZS_ERROR_WITH_MSG("MapViewOfFile R error"); m_writePtr = (char*)MapViewOfFile(m_mapFile, FILE_MAP_ALL_ACCESS, 0, m_size, m_size); if (m_writePtr == NULL) THROW_BZS_ERROR_WITH_MSG("MapViewOfFile W error"); } ~sharedMem() { if (m_mapFile) { UnmapViewOfFile(m_readPtr); UnmapViewOfFile(m_writePtr); CloseHandle(m_mapFile); } } size_t size() const { return (size_t)m_size; } char* readBuffer() { return m_readPtr; } char* writeBuffer() { return m_writePtr; } }; class sharedMemBuffer : public IResultBuffer { sharedMem& m_buf; public: sharedMemBuffer(sharedMem& v) : m_buf(v) {} void resize(size_t v) {} size_t size() const { return m_buf.size(); } char* ptr() { return m_buf.writeBuffer(); } }; class connection : public iconnection, private boost::noncopyable { mutable io_service m_ios; static mutex m_mutex; platform_stream m_socket; shared_ptr m_comm; shared_ptr m_exitHandler; shared_ptr m_sharedMem; shared_ptr m_module; size_t m_readLen; void run() { bool sentResult = true; while (sentResult) { if (m_comm->recv(3000, m_exitHandler.get()) == false) return; bool complete = false; m_readLen = *((unsigned int*)m_sharedMem->readBuffer()); //When readLen = 0 , close connection if (m_readLen == 0) return; m_module->onRead(m_sharedMem->readBuffer(), m_readLen, complete); if (complete) { size_t size = 0; int ret = m_module->execute(sharedMemBuffer(*m_sharedMem), size, NULL); if (ret == EXECUTE_RESULT_QUIT) return; else m_readLen = 0; sentResult = m_comm->send(); //When named pipe, dissconnect from local client. //if (ret == EXECUTE_RESULT_ACCESS_DNIED) // return; m_module->cleanup(); } } } char* getUniqName(DWORD id, __int64 processid, const char* name, char* buf, int size) { sprintf_s(buf, size, "%s_%lu_%Lu", name, id, processid); return buf; } public: static std::string m_pipeName; static unsigned int m_shareMemSize; connection() : m_socket(m_ios), m_readLen(0) { mutex::scoped_lock lck(m_mutex); connections.push_back(this); ++g_connections; } ~connection() { if (connections.size()) { mutex::scoped_lock lck(m_mutex); std::vector::iterator it = find(connections.begin(), connections.end(), this); if (it != connections.end()) { connections.erase(it); --g_connections; } } m_exitHandler.reset(); } void close() { m_comm->chancel(); } void asyncWrite(const char* p, unsigned int size) {} io_service& ios() const { return m_ios; }; platform_stream& socket() { return m_socket; } void setModule(shared_ptr p) { m_module = p; if (m_exitHandler) m_exitHandler->setModule(p.get()); } void start() { m_ios.reset(); char buf[128]; char buf2[50]; char tmp[50]; char tmp2[50]; system::error_code e; std::size_t len = asio::read(m_socket, buffer(buf, 16), e); if (len != 16) THROW_BZS_ERROR_WITH_MSG("readThredID error"); DWORD clinetProcessID = *((DWORD*)(buf + 4)); __int64 clientid = *((__int64*)(buf + 8)); sprintf_s(tmp, 50, "Global\\%s", m_pipeName.c_str()); m_sharedMem.reset( new sharedMem(getUniqName(clinetProcessID, clientid, tmp, buf, 128), m_shareMemSize)); sprintf_s(tmp, 50, "Global\\%sToSrv", m_pipeName.c_str()); sprintf_s(tmp2, 50, "Global\\%sToClnt", m_pipeName.c_str()); m_comm.reset(new winEventComm( getUniqName(clinetProcessID, clientid, tmp, buf, 128), getUniqName(clinetProcessID, clientid, tmp2, buf2, 50))); m_exitHandler.reset(new exitCheckHnadler(clinetProcessID)); if (m_module) m_exitHandler->setModule(m_module.get()); tmp[0] = 0x00; // signe of handshakable memcpy(tmp + 3, &m_shareMemSize, sizeof(unsigned int));// sharemem size //asio::write(m_socket, buffer(tmp, 7), e); //len = asio::read(m_socket, buffer(buf, 9), e); //if (len != 9) // THROW_BZS_ERROR_WITH_MSG("handshake error"); //send handshake packet m_module->onAccept(m_sharedMem->writeBuffer(), m_sharedMem->size()); m_comm->send(); asio::write(m_socket, buffer(tmp, 7), e); run(); DisconnectNamedPipe(m_socket.native()); } void cancel() { if (m_exitHandler) m_exitHandler->setCancel(true); ios().stop(); socket().cancel(); } static std::vector connections; static void stop() { mutex::scoped_lock lck(m_mutex); try { for (size_t i = 0; i < connections.size(); i++) connections[i]->cancel(); } catch (system::system_error&) { }; } }; std::vector connection::connections; mutex connection::m_mutex; std::string connection::m_pipeName; unsigned int connection::m_shareMemSize; // --------------------------------------------------------------------------- // worker // --------------------------------------------------------------------------- class worker : private boost::noncopyable { static mutex m_mutex; static std::vector > m_threads; static std::vector m_workers; // used by Muliti thread. static worker* worker::findWaitThread() { mutex::scoped_lock lck(m_mutex); for (size_t i = 0; i < m_workers.size(); i++) if (m_workers[i]->m_connection == NULL) return m_workers[i]; return NULL; } shared_ptr m_connection; bool resume() { return shutdown || m_connection; } ~worker() { mutex::scoped_lock lck(m_mutex); m_workers.erase(find(m_workers.begin(), m_workers.end(), this)); } public: static bool shutdown; static const char* hostCheckName; static boost::condition_variable condition; static void worker::registThread(shared_ptr t) { mutex::scoped_lock lck(m_mutex); m_threads.push_back(t); } static void worker::join() { for (size_t i = 0; i < m_threads.size(); i++) m_threads[i]->join(); m_threads.erase(m_threads.begin(), m_threads.end()); } /** In search of an empty worker thread, * if there is nothing, it will create, perform and return. */ static worker* worker::get(const IAppModuleBuilder* app) { worker* p = findWaitThread(); if (p == NULL) { thread::attributes attr; attr.set_stack_size(125 * 1024); p = new worker(); shared_ptr t(new thread(attr, bind(&worker::run, p, app))); registThread(t); } return p; } /** * Call from accepter thread. */ worker() { mutex::scoped_lock lck(m_mutex); m_workers.push_back(this); } void worker::setConnection(shared_ptr conn) { m_connection = conn; } void run(const IAppModuleBuilder* app) { try { while (!shutdown) { if (m_connection) { system::error_code ec; tcp::endpoint endpoint; endpoint.address( address(address_v4::from_string("127.0.0.1"))); shared_ptr mod( ((IAppModuleBuilder*)app)->createSessionModule( endpoint, m_connection.get(), SERVER_TYPE_CPT)); m_connection->setModule(mod); if (mod->checkHost(hostCheckName, NULL, 0)) m_connection->start(); // It does not return, unless a // connection is close. m_connection.reset(); } // TODO A used thread -- it is all held. // The number of maintenance is decided and it is made not to // hold any more. mutex::scoped_lock lck(m_mutex); ++g_waitThread; condition.wait(lck, bind(&worker::resume, this)); --g_waitThread; } } catch (bzs::rtl::exception& e) { if (server::erh) { if (const std::string* msg = getMsg(e)) { std::string s = "Pipe server " + *msg; server::erh->printError(s.c_str()); } else server::erh->printError( boost::diagnostic_information(e).c_str()); } } catch (...) { if (server::erh) server::erh->printError("pipe server Unknown"); } // An end of this thread will delete self. delete this; } }; bool worker::shutdown = false; const char* worker::hostCheckName; condition_variable worker::condition; mutex worker::m_mutex; std::vector > worker::m_threads; std::vector worker::m_workers; // --------------------------------------------------------------------------- // server // --------------------------------------------------------------------------- inotifyHandler* server::erh = NULL; /** server * If it starts, a server will create the exclusive thread for accpter * and will go into an infinite loop. */ server::server(shared_ptr app, const std::string& name, const char* port, std::size_t max_connections, unsigned int shareMemSize, const char* hostCheckName) : m_app(app), m_maxConnections(max_connections), m_stopped(false) { worker::hostCheckName = hostCheckName; m_newConnection.reset(new connection()); connection::m_pipeName = name; connection::m_shareMemSize = shareMemSize; std::string tmp = name; tmp += port; m_acceptor.open(tmp); } /** Start the server */ void server::start() { shared_ptr t(new thread(bind(&server::run, this))); worker::registThread(t); } void server::run() { try { if (erh) erh->printInfo("Start Pipe server."); while (1) { if (m_stopped) return; while (connection::connections.size() > m_maxConnections) { Sleep(100); if (m_stopped) return; } // Time to await slight time and no pipe be exists. // A client is trying connection several times. m_acceptor.accept(m_newConnection->socket()); if (m_stopped) return; system::error_code e; onAccept(e); } } catch (bzs::rtl::exception& e) { if (erh) { if (const std::string* msg = getMsg(e)) { std::string s = "Pipe server accept " + *msg; erh->printError(s.c_str()); } else erh->printError(boost::diagnostic_information(e).c_str()); } stop(); } } void server::onAccept(const system::error_code& e) { // connection is passed to a thread and it resumes. if (!e) { worker* w = worker::get(m_app.get()); w->setConnection(m_newConnection); worker::condition.notify_all(); m_newConnection.reset(new connection()); } } void server::doStop() { if (!m_stopped) { m_stopped = true; if (erh) erh->printInfo("Stopping Pipe server ..."); m_acceptor.cancel(); worker::shutdown = true; connection::stop(); worker::condition.notify_all(); } } void server::stop() { doStop(); worker::join(); } } // namespace pipe } // namespace server } // namespace netsvc } // namespace bzs