#ifndef BZS_NETSVC_CLINET_TCPCLIENT_H #define BZS_NETSVC_CLINET_TCPCLIENT_H /* ================================================================= Copyright (C) 2012 2013 BizStation Corp All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. ================================================================= */ #ifdef __BCPLUSPLUS__ #pragma warn -8012 #endif #include <boost/asio/buffer.hpp> #include <boost/asio/windows/stream_handle.hpp> #include <boost/asio/io_service.hpp> #include <boost/asio/windows/basic_handle.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/read.hpp> #include <boost/asio/write.hpp> #include <boost/bind.hpp> #include <boost/asio/placeholders.hpp> #include <boost/system/system_error.hpp> #include <boost/thread/mutex.hpp> #include <stdio.h> #ifdef __BCPLUSPLUS__ #pragma warn +8012 #endif #include <vector> using namespace boost; using namespace boost::system; #ifdef _WIN32 #define USE_PIPE_CLIENT #endif #ifdef USE_PIPE_CLIENT using boost::asio::windows::stream_handle; typedef stream_handle platform_stream; typedef HANDLE platform_descriptor; #define PIPE_EOF_ERROR_CODE boost::system::windows_error::broken_pipe typedef DWORD thread_id; #define threadid GetCurrentThreadId #else // NOT USE_PIPE_CLIENT typedef pthread_t thread_id; #define threadid pthread_self #endif // NOT USE_PIPE_CLIENT #define READBUF_SIZE 66000 #define WRITEBUF_SIZE 66000 #define PORTNUMBUF_SIZE 10 #define CLIENT_ERROR_CANT_CREATEPIPE 3106 #define CLIENT_ERROR_SHAREMEM_DENIED 3104 namespace bzs { namespace netsvc { namespace client { typedef std::vector<boost::asio::const_buffer> buffers; class connection; class connections { std::vector<connection*> m_conns; boost::asio::io_service m_ios; mutex m_mutex; connection* getConnection(asio::ip::tcp::endpoint& ep); asio::ip::tcp::endpoint endpoint(const std::string& host, boost::system::error_code& ec); bool isUseNamedPipe(asio::ip::tcp::endpoint& ep); static bool m_usePipedLocal; std::string m_pipeName; #ifdef USE_PIPE_CLIENT connection* getConnectionPipe(); #endif public: connections(const char* pipeName); ~connections(); connection* connect(const std::string& host, bool newConnection = false); connection* getConnection(const std::string& host); bool disconnect(connection* c); static char port[PORTNUMBUF_SIZE]; int connectionCount(); }; /** The connection interface * */ class connection { public: virtual ~connection(){}; virtual void connect() = 0; virtual void addref() = 0; virtual void release() = 0; virtual int refCount() const = 0; virtual bool isConnected() const = 0; virtual const asio::ip::tcp::endpoint& endpoint() const = 0; virtual thread_id tid() const = 0; virtual char* sendBuffer(size_t size) = 0; virtual unsigned int sendBufferSize() = 0; virtual buffers* optionalBuffers() = 0; virtual char* asyncWriteRead(unsigned int writeSize) = 0; virtual unsigned int datalen() const = 0; // additinal info at segment read virtual unsigned short rows() const = 0; // additinal info at segment read virtual void setReadBufferSizeIf(size_t size) = 0; }; /** Implementation of Part of the connection interface */ class connectionBase : public connection { protected: friend class connections; asio::io_service m_ios; asio::ip::tcp::endpoint m_ep; int m_refCount; thread_id m_tid; std::vector<char> m_readbuf; std::vector<char> m_sendbuf; size_t m_readLen; bool m_connected; void addref() { ++m_refCount; } void release() { --m_refCount; } int refCount() const { return m_refCount; } bool isConnected() const { return m_connected; }; const asio::ip::tcp::endpoint& endpoint() const { return m_ep; } thread_id tid() const { return m_tid; }; public: connectionBase(asio::ip::tcp::endpoint& ep) : m_ep(ep), m_refCount(0), m_tid(threadid()), m_connected(false) { } char* sendBuffer(size_t size) { if (size > m_sendbuf.size()) m_sendbuf.resize(size); return &m_sendbuf[0]; }; unsigned int sendBufferSize() { return (unsigned int)m_sendbuf.size(); }; void setReadBufferSizeIf(size_t size) { if (m_readbuf.size() < size) m_readbuf.resize(size); } }; /** Implementation of the connection template */ template <class T> class connectionImple : public connectionBase { protected: unsigned int m_datalen; unsigned short m_rows; T m_socket; buffers m_optionalBuffes; unsigned int datalen() const { return m_datalen; } unsigned short rows() const { return m_rows; } // server send any segment of lower than 0xFFFF data by asyncWrite // last 4byte is 0xFFFFFFFF, that is specify end of data void segmentRead() { bool end = false; unsigned short n; while (!end) { boost::asio::read(m_socket, boost::asio::buffer(&n, 2), boost::asio::transfer_all()); if (m_readLen + n > m_readbuf.size()) m_readbuf.resize(m_readLen + n); m_readLen += boost::asio::read( m_socket, boost::asio::buffer(&m_readbuf[m_readLen], n), boost::asio::transfer_all()); end = (*((unsigned int*)(&m_readbuf[m_readLen - 4])) == 0xFFFFFFFF); } m_readLen -= 4; // additinal data length info boost::asio::read(m_socket, boost::asio::buffer(&m_datalen, 4), boost::asio::transfer_all()); boost::asio::read(m_socket, boost::asio::buffer(&m_rows, 2), boost::asio::transfer_all()); } void read() { if (!m_connected) throw system_error(asio::error::not_connected); boost::system::error_code e; m_readLen = 0; m_datalen = 0; m_rows = 0; unsigned int n; m_readLen += boost::asio::read(m_socket, boost::asio::buffer(&n, 4), boost::asio::transfer_all()); if (n == 0xFFFFFFFF) { segmentRead(); m_readLen += boost::asio::read(m_socket, boost::asio::buffer(&n, 4), boost::asio::transfer_all()); } if (n > m_readbuf.size()) m_readbuf.resize(n); if (m_readLen != n) m_readLen += boost::asio::read( m_socket, boost::asio::buffer(&m_readbuf[m_readLen], n - m_readLen), boost::asio::transfer_all()); } void write(unsigned int writeSize) { if (!m_connected) throw system_error(asio::error::not_connected); if (m_optionalBuffes.size()) { m_optionalBuffes.insert(m_optionalBuffes.begin(), asio::buffer(sendBuffer(0), writeSize)); boost::asio::write(m_socket, m_optionalBuffes, boost::asio::transfer_all()); m_optionalBuffes.clear(); } else boost::asio::write(m_socket, asio::buffer(sendBuffer(0), writeSize), boost::asio::transfer_all()); } public: connectionImple(asio::ip::tcp::endpoint& ep) : connectionBase(ep), m_datalen(0), m_socket(m_ios) { } ~connectionImple() { m_ios.stop(); try { m_socket.close(); } catch (...) { } } char* asyncWriteRead(unsigned int writeSize) { write(writeSize); read(); return &m_readbuf[0]; } buffers* optionalBuffers() { return &m_optionalBuffes; } }; #define TIMEOUT_SEC 3 /** Implementation of The TCP connection. */ class tcpConnection : public connectionImple<asio::ip::tcp::socket> { public: tcpConnection(asio::ip::tcp::endpoint& ep) : connectionImple<asio::ip::tcp::socket>(ep) { m_readbuf.resize(READBUF_SIZE); m_sendbuf.resize(WRITEBUF_SIZE); } void setupTimeouts() { #if defined _WIN32 int32_t timeout = TIMEOUT_SEC * 1000; setsockopt(m_socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout)); setsockopt(m_socket.native(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout, sizeof(timeout)); #else struct timeval timeout; timeout.tv_usec = 0; timeout.tv_sec = TIMEOUT_SEC; setsockopt(m_socket.native(), SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); setsockopt(m_socket.native(), SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); #endif } void connect() { setupTimeouts(); m_socket.connect(m_ep); m_socket.set_option(boost::asio::ip::tcp::no_delay(true)); char tmp[20]; m_socket.read_some(boost::asio::buffer(tmp, 10)); m_connected = true; } }; class exception : public std::exception { int m_error; std::string m_msg; public: exception(int errorCode, const char* const& message) : std::exception(), m_error(errorCode), m_msg(message) { } ~exception() throw() {} const char* what() const throw() { return m_msg.c_str(); } int error() { return m_error; } }; #ifdef USE_PIPE_CLIENT /** Implementation of The Named pipe connection. */ class pipeConnection : public connectionImple<platform_stream> { const std::string& m_pipeName; char* m_readbuf_p; char* m_writebuf_p; unsigned int m_sendBufferSize; HANDLE m_recvEvent; HANDLE m_sendEvent; HANDLE m_mapFile; char* GetErrorMessage(DWORD ErrorCode) { LPVOID lpMsgBuf; FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, ErrorCode, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language (LPTSTR)&lpMsgBuf, 0, NULL); return (char*)lpMsgBuf; } void throwException(const char* msg, int errorCode) { char buf[4096]; char user[128]; char* p = GetErrorMessage(GetLastError()); DWORD size = 128; GetUserName(user, &size); sprintf_s(buf, 4096, "User:%s %s %d %s", user, msg, GetLastError(), p); LocalFree(p); throw exception(errorCode, buf); } char* getUniqName(const char* name) { connectionBase::m_readbuf.resize(120); char* p = &connectionBase::m_readbuf[0]; DWORD processId = GetCurrentProcessId(); __int64 clientid = (__int64) this; sprintf_s(p, 120, "%s_%u_%Lu", name, processId, clientid); return p; } void createKernelObjects(unsigned int shareMemSize) { char tmp[50]; SYSTEM_INFO SystemInfo; GetSystemInfo(&SystemInfo); int size = shareMemSize / SystemInfo.dwAllocationGranularity + 1; m_sendBufferSize = size * SystemInfo.dwAllocationGranularity; sprintf_s(tmp, 50, "Global\\%s", m_pipeName.c_str()); m_mapFile = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, m_sendBufferSize * 2, getUniqName(tmp)); if (m_mapFile == NULL) throwException("CreateFileMapping", CLIENT_ERROR_SHAREMEM_DENIED); m_writebuf_p = (char*)MapViewOfFile(m_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, m_sendBufferSize); if (m_writebuf_p == NULL) throwException("MapViewOfFile R", CLIENT_ERROR_SHAREMEM_DENIED); m_readbuf_p = (char*)MapViewOfFile(m_mapFile, FILE_MAP_ALL_ACCESS, 0, m_sendBufferSize, m_sendBufferSize); if (m_readbuf_p == NULL) throwException("MapViewOfFile W", CLIENT_ERROR_SHAREMEM_DENIED); sprintf_s(tmp, 50, "Global\\%sToClnt", m_pipeName.c_str()); m_recvEvent = OpenEvent(EVENT_ALL_ACCESS, FALSE, getUniqName(tmp)); if (m_recvEvent == NULL) throwException("OpenEvent Client", CLIENT_ERROR_SHAREMEM_DENIED); sprintf_s(tmp, 50, "Global\\%sToSrv", m_pipeName.c_str()); m_sendEvent = OpenEvent(EVENT_ALL_ACCESS, FALSE, getUniqName(tmp)); if (m_sendEvent == NULL) throwException("OpenEvent Server", CLIENT_ERROR_SHAREMEM_DENIED); } public: pipeConnection(asio::ip::tcp::endpoint& ep, const std::string& pipeName) : connectionImple<platform_stream>(ep), m_pipeName(pipeName), m_mapFile(NULL), m_recvEvent(NULL), m_sendEvent(NULL), m_writebuf_p(NULL), m_readbuf_p(NULL), m_sendBufferSize(0) { } ~pipeConnection() { memset(m_writebuf_p, 0, sizeof(unsigned int)); SetEvent(m_sendEvent); WaitForSingleObject(m_recvEvent, INFINITE); if (m_recvEvent) CloseHandle(m_recvEvent); if (m_sendEvent) CloseHandle(m_sendEvent); if (m_writebuf_p) UnmapViewOfFile(m_writebuf_p); if (m_readbuf_p) UnmapViewOfFile(m_readbuf_p); if (m_mapFile) CloseHandle(m_mapFile); } void connect() { platform_descriptor fd; #ifdef WIN32 char pipeName[100]; sprintf_s(pipeName, 100, "\\\\.\\pipe\\%s", m_pipeName.c_str()); int i = 1000; while (--i) { fd = CreateFile(pipeName, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); if (fd != INVALID_HANDLE_VALUE) break; if (ERROR_PIPE_BUSY != GetLastError()) break; Sleep(1); } if (fd == INVALID_HANDLE_VALUE) throwException("CreateFile", CLIENT_ERROR_CANT_CREATEPIPE); #endif // NOT WIN32 m_socket.assign(fd); m_connected = true; // send processId; DWORD processId = GetCurrentProcessId(); int size = 16; connectionBase::m_readbuf.resize(size); char* p = &connectionBase::m_readbuf[0]; memcpy(p, &size, sizeof(int)); memcpy(p + 4, &processId, sizeof(DWORD)); __int64 clientid = (__int64) this; memcpy(p + 8, &clientid, sizeof(__int64)); boost::asio::write(m_socket, boost::asio::buffer(p, size)); boost::asio::read(m_socket, boost::asio::buffer(p, 7)); unsigned int* shareMemSize = (unsigned int*)(p + 3); createKernelObjects(*shareMemSize); } char* asyncWriteRead(unsigned int writeSize) { m_datalen = 0; m_rows = 0; SetEvent(m_sendEvent); WaitForSingleObject(m_recvEvent, INFINITE); return m_readbuf_p; } char* sendBuffer(size_t size) { return m_writebuf_p; } unsigned int sendBufferSize() { return m_sendBufferSize; } buffers* optionalBuffers() { return NULL; } // not support void setReadBufferSizeIf(size_t size) {} // not support }; #endif // NOT WIN32 } // namespace client } // namespace netsvc } // namespace bzs #endif // BZS_NETSVC_CLINET_TCPCLIENT_H