#ifndef BZS_NETSVC_CLINET_TCPCLIENT_H
#define BZS_NETSVC_CLINET_TCPCLIENT_H
/* =================================================================
 Copyright (C) 2012 2013 BizStation Corp All rights reserved.

 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
 as published by the Free Software Foundation; either version 2
 of the License, or (at your option) any later version.

 This program is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with this program; if not, write to the Free Software
 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
 02111-1307, USA.
 ================================================================= */

#include <bzs/netsvc/client/iconnection.h>
#include <boost/asio/write.hpp>
#include <boost/asio/read.hpp>
#include <boost/system/system_error.hpp>
#include <boost/thread/mutex.hpp>
#include <stdio.h>
#include <vector>
#ifdef LINUX
#include <pthread.h> 
#include <signal.h> 
#endif


using namespace boost;
using namespace boost::system;

#define READBUF_SIZE 66000
#define WRITEBUF_SIZE 66000

#define PORTNUMBUF_SIZE 10
#define CLIENT_ERROR_CANT_CREATEPIPE 3106
#define CLIENT_ERROR_SHAREMEM_DENIED 3104
#define CLIENT_ERROR_CONNECTION_FAILURE 3106
#define MAX_DATA_SIZE 10485760 // 10MB

namespace bzs
{
namespace netsvc
{
namespace client
{

typedef bool (*handshake)(connection* c, void* data);
class connections
{
    std::vector<connection*> m_conns;
    boost::asio::io_service m_ios;
    mutex m_mutex;
    std::string m_pipeName;
    static bool m_usePipedLocal;

    connection* getConnection(asio::ip::tcp::endpoint& ep);
    asio::ip::tcp::endpoint endpoint(const std::string& host,
                                     boost::system::error_code& ec);
    bool isUseNamedPipe(asio::ip::tcp::endpoint& ep);
#ifdef USE_PIPE_CLIENT
    connection* getConnectionPipe();
#endif
    inline connection* doConnect(connection* c);
    inline connection* createConnection(asio::ip::tcp::endpoint& ep, bool namedPipe);
    inline bool doHandShake(connection* c, handshake f, void* data);
public:
    connections(const char* pipeName);
    ~connections();
    connection* connect(const std::string& host, handshake f,
                        void* data, bool newConnection = false);
    connection* getConnection(const std::string& host);
    bool disconnect(connection* c);
    int connectionCount();
    static char port[PORTNUMBUF_SIZE];
    static short timeout;
};

/** Implementation of Part of the connection interface
 */
class connectionBase : public connection
{
protected:
    friend class connections;

    asio::io_service m_ios;
    asio::ip::tcp::endpoint m_ep;
    std::vector<char> m_readbuf;
    std::vector<char> m_sendbuf;
    idirectReadHandler* m_reader;
    size_t m_readLen;
    int m_refCount;
    int m_charsetServer;
    bool m_connected;
    bool m_isHandShakable;

    void addref() { ++m_refCount; }

    void release() { --m_refCount; }

    int refCount() const { return m_refCount; }

    bool isConnected() const { return m_connected; };

    const asio::ip::tcp::endpoint& endpoint() const { return m_ep; }

    int charsetServer() const { return m_charsetServer; };

    void setCharsetServer(int v) { m_charsetServer = v; }

public:
    connectionBase(asio::ip::tcp::endpoint& ep)
        : m_ep(ep), m_reader(NULL), m_refCount(0),
          m_charsetServer(-1), m_connected(false), m_isHandShakable(true)
    {
    }

    void setDirectReadHandler(idirectReadHandler* p){ m_reader = p; }

    char* sendBuffer(size_t size)
    {
        if (size > m_sendbuf.size())
            m_sendbuf.resize(size);
        return &m_sendbuf[0];
    }

    unsigned int sendBufferSize() { return (unsigned int)m_sendbuf.size(); };

    void setReadBufferSizeIf(size_t size)
    {
        if (m_readbuf.size() < size)
            m_readbuf.resize(size);
    }

    bool isHandShakable() const {return m_isHandShakable;};
};

/** Implementation of the connection template
 */
template <class T> class connectionImple : public connectionBase
{
#ifdef LINUX
    sigset_t m_signmask, m_sigomask;
#endif
protected:
    //unsigned int m_datalen;
    //unsigned short m_rows;
    T m_socket;
    buffers m_optionalBuffes;

    //unsigned int datalen() const { return m_datalen; }

    //unsigned short rows() const { return m_rows; }

    // server send any segment of lower than 0xFFFF data by asyncWrite
    // last 4byte is 0xFFFFFFFF, that is specify end of data
    /*void segmentRead()
    {
        bool end = false;
        unsigned short n;
        while (!end)
        {
            boost::asio::read(m_socket, boost::asio::buffer(&n, 2),
                              boost::asio::transfer_all());

            if (m_readLen + n > m_readbuf.size())
                m_readbuf.resize(m_readLen + n);

            m_readLen += boost::asio::read(
                m_socket, boost::asio::buffer(&m_readbuf[m_readLen], n),
                boost::asio::transfer_all());

            end = (*((unsigned int*)(&m_readbuf[m_readLen - 4])) == 0xFFFFFFFF);
        }
        m_readLen -= 4;
        // additinal data length info
        boost::asio::read(m_socket, boost::asio::buffer(&m_datalen, 4),
                          boost::asio::transfer_all());
        boost::asio::read(m_socket, boost::asio::buffer(&m_rows, 2),
                          boost::asio::transfer_all());
    }*/
    bool queryFunction(unsigned int v)
    {
        if (v == CONNECTION_FUNCTION_DIRECT_READ)
            return true;
        return false;
    }

    unsigned int directRead(void* buf, unsigned int size)
    {
        return (unsigned int)boost::asio::read(m_socket, 
                                                boost::asio::buffer(buf, size),
                                                boost::asio::transfer_all());
    }
    
    void* directReadRemain(unsigned int size)
    {
        if (size > m_readbuf.size())
            m_readbuf.resize(size);
        
        m_readLen += boost::asio::read(
            m_socket,
            boost::asio::buffer(&m_readbuf[0], size),
            boost::asio::transfer_all());
        return &m_readbuf[0];
    }

    template <typename CompletionCondition, typename MutableBufferSequence>
    size_t doRead(const MutableBufferSequence& buf, CompletionCondition cnd)
    {
        boost::system::error_code e;
#ifdef LINUX
        pthread_sigmask(SIG_SETMASK, &m_signmask , &m_sigomask); 
#endif
        size_t n = boost::asio::read(m_socket, buf, cnd, e);
#ifdef LINUX
        pthread_sigmask(SIG_SETMASK, &m_sigomask, NULL); 
#endif
        if (e) throw e;
        return n;    
    }

    char* read()
    {
        if (!m_connected)
            throw system_error(asio::error::not_connected);
        m_readLen = 0;
        //m_datalen = 0;
        //m_rows = 0;
        unsigned int n;

        //m_readLen += boost::asio::read(m_socket, boost::asio::buffer(&n, 4),
        //                               boost::asio::transfer_all());
        /*if (n == 0xFFFFFFFF)
        {
            segmentRead();
            m_readLen += boost::asio::read(m_socket, boost::asio::buffer(&n, 4),
                                           boost::asio::transfer_all());
        }*/
        if (m_reader)
        {
            m_readLen = doRead(boost::asio::buffer(&n, 4), boost::asio::transfer_all());
            
            m_readLen += m_reader->onRead(n - 4, this);
        }else
        {
            m_readLen = doRead(boost::asio::buffer(&m_readbuf[0],m_readbuf.size()),
                    boost::asio::transfer_at_least(4));
            n = *((unsigned int*)(&m_readbuf[0]));
        }
        if ((n > m_readLen) && (n < MAX_DATA_SIZE))
        {
            if (n > m_readbuf.size())
                m_readbuf.resize(n);
            m_readLen += doRead(boost::asio::buffer(&m_readbuf[m_readLen], n - m_readLen),
                    boost::asio::transfer_all());
        }
        return &m_readbuf[0];
    }

    void write(unsigned int writeSize)
    {
        if (!m_connected)
            throw system_error(asio::error::not_connected);
        if (m_optionalBuffes.size())
        {
            m_optionalBuffes.insert(m_optionalBuffes.begin(),
                                    asio::buffer(sendBuffer(0), writeSize));
            boost::asio::write(m_socket, m_optionalBuffes,
                               boost::asio::transfer_all());
            m_optionalBuffes.clear();
        }
        else
            boost::asio::write(m_socket, asio::buffer(sendBuffer(0), writeSize),
                               boost::asio::transfer_all());
    }

public:
    connectionImple(asio::ip::tcp::endpoint& ep)
        : connectionBase(ep)/*, m_datalen(0)*/, m_socket(m_ios)
    {
#ifdef LINUX
        sigfillset(&m_signmask);
#endif
    }

    ~connectionImple()
    {
        try
        {
            m_ios.stop();
            m_socket.close();
        }
        catch (...)
        {
        }
    }

    char* asyncWriteRead(unsigned int writeSize)
    {
        write(writeSize);
        return read();
    }

    buffers* optionalBuffers() { return &m_optionalBuffes; }
};


/** Implementation of The TCP connection.
 */
class tcpConnection : public connectionImple<asio::ip::tcp::socket>
{

public:
    tcpConnection(asio::ip::tcp::endpoint& ep)
        : connectionImple<asio::ip::tcp::socket>(ep)
    {
        m_readbuf.resize(READBUF_SIZE);
        m_sendbuf.resize(WRITEBUF_SIZE);
    }

    void setupTimeouts()
    {
#if defined _WIN32
        int32_t timeout = connections::timeout;
        setsockopt(m_socket.native(), SOL_SOCKET, SO_RCVTIMEO,
                   (const char*)&timeout, sizeof(timeout));
        setsockopt(m_socket.native(), SOL_SOCKET, SO_SNDTIMEO,
                   (const char*)&timeout, sizeof(timeout));
#else
        struct timeval timeout;
        timeout.tv_usec = 0;
        timeout.tv_sec = connections::timeout;
        setsockopt(m_socket.native(), SOL_SOCKET, SO_RCVTIMEO, &timeout,
                   sizeof(timeout));
        setsockopt(m_socket.native(), SOL_SOCKET, SO_SNDTIMEO, &timeout,
                   sizeof(timeout));
#endif
    }

    void connect()
    {
        setupTimeouts();
        m_socket.connect(m_ep);
        m_socket.set_option(boost::asio::ip::tcp::no_delay(true));
        m_connected = true;
    }
};

class exception : public std::exception
{
    int m_error;

    std::string m_msg;

public:
    exception(int errorCode, const char* const& message)
        : std::exception(), m_error(errorCode), m_msg(message)
    {
    }

    ~exception() throw() {}

    const char* what() const throw() { return m_msg.c_str(); }

    int error() { return m_error; }
};

#ifdef USE_PIPE_CLIENT

/** Implementation of The Named pipe connection.
 */
class pipeConnection : public connectionImple<platform_stream>
{
    const std::string& m_pipeName;

    char* m_readbuf_p;
    char* m_writebuf_p;
    unsigned int m_sendBufferSize;

    HANDLE m_recvEvent;
    HANDLE m_sendEvent;
    HANDLE m_mapFile;

    bool queryFunction(unsigned int v)
    {
        if (v == CONNECTION_FUNCTION_DIRECT_READ)
            return false;
        return false;
    }

    char* GetErrorMessage(DWORD ErrorCode)
    {
        LPVOID lpMsgBuf;
        FormatMessage(
            FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL,
            ErrorCode,
            MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
            (LPTSTR)&lpMsgBuf, 0, NULL);
        return (char*)lpMsgBuf;
    }

    void throwException(const char* msg, int errorCode)
    {
        char buf[4096];
        char user[128];
        char* p = GetErrorMessage(GetLastError());
        DWORD size = 128;
        GetUserName(user, &size);
        sprintf_s(buf, 4096, "User:%s %s %d %s", user, msg, GetLastError(), p);
        LocalFree(p);
        throw exception(errorCode, buf);
    }

    char* getUniqName(const char* name)
    {
        connectionBase::m_readbuf.resize(120);
        char* p = &connectionBase::m_readbuf[0];
        DWORD processId = GetCurrentProcessId();
        __int64 clientid = (__int64) this;
        sprintf_s(p, 120, "%s_%u_%Lu", name, processId, clientid);
        return p;
    }

    void createKernelObjects(unsigned int shareMemSize)
    {
        char tmp[50];
        SYSTEM_INFO SystemInfo;
        GetSystemInfo(&SystemInfo);
        int size = shareMemSize / SystemInfo.dwAllocationGranularity + 1;
        m_sendBufferSize = size * SystemInfo.dwAllocationGranularity;

        sprintf_s(tmp, 50, "Global\\%s", m_pipeName.c_str());
        m_mapFile =
            CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0,
                              m_sendBufferSize * 2, getUniqName(tmp));
        if (m_mapFile == NULL)
            throwException("CreateFileMapping", CLIENT_ERROR_SHAREMEM_DENIED);

        m_writebuf_p = (char*)MapViewOfFile(m_mapFile, FILE_MAP_ALL_ACCESS, 0,
                                            0, m_sendBufferSize);
        if (m_writebuf_p == NULL)
            throwException("MapViewOfFile R", CLIENT_ERROR_SHAREMEM_DENIED);

        m_readbuf_p = (char*)MapViewOfFile(m_mapFile, FILE_MAP_ALL_ACCESS, 0,
                                           m_sendBufferSize, m_sendBufferSize);
        if (m_readbuf_p == NULL)
            throwException("MapViewOfFile W", CLIENT_ERROR_SHAREMEM_DENIED);

        sprintf_s(tmp, 50, "Global\\%sToClnt", m_pipeName.c_str());
        m_recvEvent = OpenEvent(EVENT_ALL_ACCESS, FALSE, getUniqName(tmp));
        if (m_recvEvent == NULL)
            throwException("OpenEvent Client", CLIENT_ERROR_SHAREMEM_DENIED);

        sprintf_s(tmp, 50, "Global\\%sToSrv", m_pipeName.c_str());
        m_sendEvent = OpenEvent(EVENT_ALL_ACCESS, FALSE, getUniqName(tmp));
        if (m_sendEvent == NULL)
            throwException("OpenEvent Server", CLIENT_ERROR_SHAREMEM_DENIED);
    }

    void write(unsigned int writeSize)
    {
        //m_datalen = 0;
        //m_rows = 0;
        BOOL ret = SetEvent(m_sendEvent);
        if (ret == FALSE)
            throwException("SetEvent", CLIENT_ERROR_CONNECTION_FAILURE);        
    }

    char* read()
    {
        while (WAIT_TIMEOUT == WaitForSingleObject(m_recvEvent, connections::timeout))
        {
            DWORD n = 0;
            BOOL ret = GetNamedPipeHandleState(m_socket.native(), NULL, &n,
                                           NULL, NULL, NULL, 0);
            if(ret == FALSE || n == 0)
                throwException("PipeConnection", CLIENT_ERROR_CONNECTION_FAILURE);
        }
        return m_readbuf_p;
    }

public:
    pipeConnection(asio::ip::tcp::endpoint& ep, const std::string& pipeName)
        : connectionImple<platform_stream>(ep), m_pipeName(pipeName),
          m_mapFile(NULL), m_recvEvent(NULL), m_sendEvent(NULL),
          m_writebuf_p(NULL), m_readbuf_p(NULL), m_sendBufferSize(0)

    {
    }

    ~pipeConnection()
    {
        if (m_connected)
		{
			if (m_writebuf_p)
				memset(m_writebuf_p, 0, sizeof(unsigned int));
			DWORD n = 0;
			BOOL ret = GetNamedPipeHandleState(m_socket.native(), NULL, &n,
                                            NULL, NULL, NULL, 0);
            if(m_sendEvent && ret && n)
            {
				SetEvent(m_sendEvent);
                //Wait for server side close connection
                while (WAIT_TIMEOUT == 
                    WaitForSingleObject(m_recvEvent, connections::timeout))
                    ;
            }
		}
        if (m_recvEvent)
            CloseHandle(m_recvEvent);
        if (m_sendEvent)
            CloseHandle(m_sendEvent);
        if (m_writebuf_p)
            UnmapViewOfFile(m_writebuf_p);
        if (m_readbuf_p)
            UnmapViewOfFile(m_readbuf_p);
        if (m_mapFile)
            CloseHandle(m_mapFile);
    }

    void connect()
    {
        platform_descriptor fd;
#ifdef WIN32
        char pipeName[100];
        sprintf_s(pipeName, 100, "\\\\.\\pipe\\%s", m_pipeName.c_str());
        int i = 1000;
        while (--i)
        {
            fd = CreateFile(pipeName, GENERIC_READ | GENERIC_WRITE, 0, NULL,
                            OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
            if (fd != INVALID_HANDLE_VALUE)
                break;
            if (ERROR_PIPE_BUSY != GetLastError())
                break;
            Sleep(1);
        }
        if (fd == INVALID_HANDLE_VALUE)
            throwException("CreateFile", CLIENT_ERROR_CANT_CREATEPIPE);
#endif // NOT WIN32
        m_socket.assign(fd);
        m_connected = true;

        // send processId and clientid;
        DWORD processId = GetCurrentProcessId();
        int size = 16;
        connectionBase::m_readbuf.resize(256);
        char* p = &connectionBase::m_readbuf[0];
        memcpy(p, &size, sizeof(int));
        memcpy(p + 4, &processId, sizeof(DWORD));
        __int64 clientid = (__int64) this;
        memcpy(p + 8, &clientid, sizeof(__int64));
        boost::asio::write(m_socket, boost::asio::buffer(p, size));

        boost::asio::read(m_socket, boost::asio::buffer(p, 7));
        unsigned int* shareMemSize = (unsigned int*)(p+3);
        m_isHandShakable = (p[0] == 0x00);
        createKernelObjects(*shareMemSize);
    }

    char* sendBuffer(size_t size) { return m_writebuf_p; }

    unsigned int sendBufferSize() { return m_sendBufferSize; }

    buffers* optionalBuffers() { return NULL; } // not support

    void setReadBufferSizeIf(size_t size) {} // not support
};
#endif // NOT WIN32

} // namespace client
} // namespace netsvc
} // namespace bzs

#endif // BZS_NETSVC_CLINET_TCPCLIENT_H