#ifndef BZS_NETSVC_CLINET_TCPCLIENT_H #define BZS_NETSVC_CLINET_TCPCLIENT_H /*================================================================= Copyright (C) 2012 2013 BizStation Corp All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. =================================================================*/ #ifdef __BCPLUSPLUS__ #pragma warn -8012 #endif #include #include #include #include #include #include #include #include #include #include #include #include #ifdef __BCPLUSPLUS__ #pragma warn +8012 #endif #include using namespace boost; using namespace boost::system; #ifdef WIN32 #define USE_PIPE_CLIENT #endif #ifdef USE_PIPE_CLIENT using boost::asio::windows::stream_handle; typedef stream_handle platform_stream; typedef HANDLE platform_descriptor; #define PIPE_EOF_ERROR_CODE boost::system::windows_error::broken_pipe typedef DWORD thread_id; #define threadid GetCurrentThreadId #else typedef pthread_t thread_id; #define threadid pthread_self #endif #define READBUF_SIZE 66000 #define WRITEBUF_SIZE 66000 #define PORTNUMBUF_SIZE 10 #define CLIENT_ERROR_CANT_CREATEPIPE 3106 #define CLIENT_ERROR_SHAREMEM_DENIED 3104 namespace bzs { namespace netsvc { namespace client { typedef std::vector buffers; class connection; class connections { std::vector m_conns; boost::asio::io_service m_ios; mutex m_mutex; connection* getConnection(asio::ip::tcp::endpoint& ep); asio::ip::tcp::endpoint endpoint(const std::string& host); bool isUseNamedPipe(asio::ip::tcp::endpoint& ep); static bool m_usePipedLocal; std::string m_pipeName; #ifdef USE_PIPE_CLIENT connection* getConnectionPipe(); #endif public: connections(const char* pipeName); ~connections(); connection* connect(const std::string& host, bool newConnection= false); connection* getConnection(const std::string& host); bool disconnect(connection* c); static char port[PORTNUMBUF_SIZE]; int connectionCount(); }; /** The connection interface * */ class connection { public: virtual ~connection(){}; virtual void connect() = 0; virtual void addref() = 0; virtual void release() = 0; virtual int refCount() const = 0; virtual bool isConnected()const = 0; virtual const asio::ip::tcp::endpoint& endpoint() const = 0; virtual thread_id tid()const = 0; virtual char* sendBuffer()=0; virtual unsigned int sendBufferSize()=0; virtual buffers* optionalBuffers()=0; virtual char* asyncWriteRead(unsigned int writeSize)=0; }; /** Implementation of Part of the connection interface */ class connectionBase : public connection { protected: friend class connections; asio::io_service m_ios; asio::ip::tcp::endpoint m_ep; int m_refCount; thread_id m_tid; std::vector m_readbuf; std::vector m_sendbuf; size_t m_readLen; bool m_connected; void addref(){++m_refCount;} void release() {--m_refCount;} int refCount()const{return m_refCount;} bool isConnected()const{return m_connected;}; const asio::ip::tcp::endpoint& endpoint()const{return m_ep;} thread_id tid()const{return m_tid;}; public: connectionBase(asio::ip::tcp::endpoint& ep) :m_ep(ep),m_refCount(0),m_tid(threadid()),m_connected(false) { } char* sendBuffer(){return &m_sendbuf[0];}; unsigned int sendBufferSize(){return (unsigned int)m_sendbuf.size();}; }; /** Implementation of the connection template */ template class connectionImple : public connectionBase { protected: T m_socket; buffers m_optionalBuffes; void handle_read(const boost::system::error_code& e , std::size_t bytes_transferred) { if (!e) { unsigned int* n=NULL; if (bytes_transferred==0) return ; m_readLen += bytes_transferred; n = (unsigned int*)(&m_readbuf[0]); if (*n == m_readLen) return; else if (*n > m_readbuf.size()) m_readbuf.resize(*n); m_socket.async_read_some(asio::buffer(&m_readbuf[m_readLen] ,m_readbuf.size()-m_readLen) ,boost::bind(&connectionImple::handle_read, this ,boost::asio::placeholders::error ,boost::asio::placeholders::bytes_transferred)); } } void handle_write(const boost::system::error_code& e) { if (!e) { m_optionalBuffes.clear(); m_readLen = 0; m_socket.async_read_some(asio::buffer(&m_readbuf[0], m_readbuf.size()), boost::bind(&connectionImple::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } } void read() { if (!m_connected) throw system_error(asio::error::not_connected); boost::system::error_code e; m_readLen = 0; m_readLen += boost::asio::read(m_socket, boost::asio::buffer(&m_readbuf[m_readLen], m_readbuf.size()-m_readLen) ,boost::asio::transfer_at_least(4)); unsigned int* n=NULL; n = (unsigned int*)(&m_readbuf[0]); if (*n > m_readbuf.size()) m_readbuf.resize(*n); if (m_readLen != *n) m_readLen += boost::asio::read(m_socket, boost::asio::buffer(&m_readbuf[m_readLen], *n-m_readLen) , boost::asio::transfer_all()); } void write(unsigned int writeSize) { if (!m_connected) throw system_error(asio::error::not_connected); m_optionalBuffes.insert(m_optionalBuffes.begin(), asio::buffer(sendBuffer(), writeSize)); boost::asio::write(m_socket, m_optionalBuffes, boost::asio::transfer_all()); m_optionalBuffes.clear(); } public: connectionImple(asio::ip::tcp::endpoint& ep) :connectionBase(ep),m_socket(m_ios) { } ~connectionImple() { m_ios.stop(); try { m_socket.close(); } catch(...){} } char* asyncWriteRead(unsigned int writeSize) { write(writeSize); read(); return &m_readbuf[0]; } buffers* optionalBuffers(){return &m_optionalBuffes;}; }; #define TIMEOUT_SEC 3 /** Implementation of The TCP connection. */ class tcpConnection : public connectionImple { public: tcpConnection(asio::ip::tcp::endpoint& ep) :connectionImple(ep) { m_readbuf.resize(READBUF_SIZE); m_sendbuf.resize(WRITEBUF_SIZE); } void setupTimeouts() { #if defined _WIN32 int32_t timeout = TIMEOUT_SEC*1000; setsockopt(m_socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout)); setsockopt(m_socket.native(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout, sizeof(timeout)); #else struct timeval timeout; timeout.tv_usec = 0; timeout.tv_sec = TIMEOUT_SEC; setsockopt(m_socket.native(), SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); setsockopt(m_socket.native(), SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); #endif } void connect() { setupTimeouts(); m_socket.connect(m_ep); m_socket.set_option(boost::asio::ip::tcp::no_delay(true)); char tmp[20]; m_socket.read_some(boost::asio::buffer(tmp, 10)); m_connected = true; } }; class exception: public std::exception { int m_error; std::string m_msg; public: exception(int errorCode, const char* const &message) :std::exception(),m_error(errorCode),m_msg(message){} ~exception() throw(){}; const char *what() const throw(){return m_msg.c_str();}; int error(){return m_error;}; }; #ifdef USE_PIPE_CLIENT /** Implementation of The Named pipe connection. */ class pipeConnection : public connectionImple { const std::string& m_pipeName; char* m_readbuf_p; char* m_writebuf_p; unsigned int m_sendBufferSize; HANDLE m_recvEvent; HANDLE m_sendEvent; HANDLE m_mapFile; char* GetErrorMessage( DWORD ErrorCode) { LPVOID lpMsgBuf; FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, ErrorCode, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language (LPTSTR) &lpMsgBuf, 0, NULL); return (char*)lpMsgBuf; } void throwException(const char* msg, int errorCode) { char buf[4096]; char user[128]; char* p = GetErrorMessage(GetLastError()); DWORD size = 128; GetUserName(user, &size); sprintf_s(buf, 4096, "User:%s %s %d %s",user, msg, GetLastError(), p); LocalFree(p); throw exception(errorCode, buf); } char* getUniqName(const char* name) { connectionBase::m_readbuf.resize(120); char* p = &connectionBase::m_readbuf[0]; DWORD threadId = GetCurrentThreadId(); __int64 clientid = (__int64)this; sprintf_s(p, 120, "%s_%u_%Lu", name, threadId, clientid); return p; } void createKernelObjects(unsigned int shareMemSize) { char tmp[50]; SYSTEM_INFO SystemInfo; GetSystemInfo(&SystemInfo); int size = shareMemSize/SystemInfo.dwAllocationGranularity + 1; m_sendBufferSize = size*SystemInfo.dwAllocationGranularity; sprintf_s(tmp, 50, "Global\\%s", m_pipeName.c_str()); m_mapFile = CreateFileMapping(INVALID_HANDLE_VALUE ,NULL, PAGE_READWRITE, 0,m_sendBufferSize*2, getUniqName(tmp)); if (m_mapFile == NULL) throwException("CreateFileMapping", CLIENT_ERROR_SHAREMEM_DENIED); m_writebuf_p = (char*)MapViewOfFile(m_mapFile, FILE_MAP_ALL_ACCESS , 0, 0, m_sendBufferSize); if (m_writebuf_p == NULL) throwException("MapViewOfFile R", CLIENT_ERROR_SHAREMEM_DENIED); m_readbuf_p = (char*)MapViewOfFile(m_mapFile, FILE_MAP_ALL_ACCESS , 0, m_sendBufferSize, m_sendBufferSize); if (m_readbuf_p == NULL) throwException("MapViewOfFile W", CLIENT_ERROR_SHAREMEM_DENIED); sprintf_s(tmp, 50, "Global\\%sToClnt", m_pipeName.c_str()); m_recvEvent = OpenEvent(EVENT_ALL_ACCESS, FALSE, getUniqName(tmp)); if (m_recvEvent==NULL) throwException("OpenEvent Client", CLIENT_ERROR_SHAREMEM_DENIED); sprintf_s(tmp, 50, "Global\\%sToSrv", m_pipeName.c_str()); m_sendEvent = OpenEvent(EVENT_ALL_ACCESS, FALSE, getUniqName(tmp)); if (m_sendEvent==NULL) throwException("OpenEvent Server", CLIENT_ERROR_SHAREMEM_DENIED); } public: pipeConnection(asio::ip::tcp::endpoint& ep,const std::string& pipeName) :connectionImple(ep) ,m_pipeName(pipeName) ,m_mapFile(NULL) ,m_recvEvent(NULL) ,m_sendEvent(NULL) ,m_writebuf_p(NULL) ,m_readbuf_p(NULL) ,m_sendBufferSize(0) { } ~pipeConnection() { memset(m_writebuf_p, 0, sizeof(unsigned int)); SetEvent(m_sendEvent); WaitForSingleObject(m_recvEvent, INFINITE); if (m_recvEvent) CloseHandle(m_recvEvent); if (m_sendEvent) CloseHandle(m_sendEvent); if (m_writebuf_p) UnmapViewOfFile(m_writebuf_p); if (m_readbuf_p) UnmapViewOfFile(m_readbuf_p); if (m_mapFile) CloseHandle(m_mapFile); } void connect() { platform_descriptor fd; #ifdef WIN32 char pipeName[100]; sprintf_s(pipeName, 100, "\\\\.\\pipe\\%s", m_pipeName.c_str()); int i=1000; while (--i) { fd = CreateFile(pipeName, GENERIC_READ | GENERIC_WRITE , 0, NULL , OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); if (fd != INVALID_HANDLE_VALUE) break; if (ERROR_PIPE_BUSY != GetLastError()) break; Sleep(1); } if (fd == INVALID_HANDLE_VALUE) throwException("CreateFile", CLIENT_ERROR_CANT_CREATEPIPE); #endif m_socket.assign(fd); m_connected = true; //send thredid; DWORD threadId = GetCurrentThreadId(); int size = 16; connectionBase::m_readbuf.resize(size); char* p = &connectionBase::m_readbuf[0]; memcpy(p, &size, sizeof(int)); memcpy(p+4, &threadId, sizeof(DWORD)); __int64 clientid = (__int64)this; memcpy(p+8, &clientid, sizeof(__int64)); boost::asio::write(m_socket, boost::asio::buffer(p, size)); boost::asio::read(m_socket, boost::asio::buffer(p, 7)); unsigned int* shareMemSize = (unsigned int*)(p+3); createKernelObjects(*shareMemSize); } char* asyncWriteRead(unsigned int writeSize) { SetEvent(m_sendEvent); WaitForSingleObject(m_recvEvent, INFINITE); return m_readbuf_p; } char* sendBuffer(){return m_writebuf_p;} unsigned int sendBufferSize(){return m_sendBufferSize;}; buffers* optionalBuffers(){return NULL;}; //not support }; #endif }//namespace client }//namespace netsvc }//namespace bzs #endif//BZS_NETSVC_CLINET_TCPCLIENT_H