/*=================================================================
   Copyright (C) 2012 2013 BizStation Corp All rights reserved.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; either version 2
   of the License, or (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software 
   Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  
   02111-1307, USA.
=================================================================*/
#include "serverCpt.h"
#include "IAppModule.h"
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_array.hpp>
#include <boost/thread.hpp>
#include <boost/asio/write.hpp> 
#include <boost/thread/xtime.hpp>
#include <algorithm>
#include <boost/enable_shared_from_this.hpp>
#include <bzs/rtl/debuglog.h>
#include <bzs/env/crosscompile.h>
#include <bzs/rtl/exception.h>

using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;



namespace bzs
{
namespace netsvc
{
namespace server
{
namespace cpt //connection per thread
{

unsigned int g_connections = 0;
unsigned int g_waitThread = 0;
// ---------------------------------------------------------------------------
//		connection
// ---------------------------------------------------------------------------
#define READBUF_SIZE 66000
#define WRITEBUF_SIZE 66000

class connection  : public iconnection, private boost::noncopyable 			
{
	mutable io_service m_ios;

	static mutex m_mutex;
	tcp::socket m_socket;
	std::vector<char> m_buffer;
	std::vector<char> m_result;
	buffers m_optionalBuffes;
	shared_ptr<IAppModule> m_module;
	
	size_t m_readLen;
	void handle_read(const boost::system::error_code& e, std::size_t bytes_transferred)
	{
		DEBUG_PROFILE_START(m_readLen==0)
		if (!e)
		{
			bool complete = false;
			
			if (bytes_transferred==0)
				return ;
			m_readLen += bytes_transferred;
			size_t n = m_module->onRead(&m_buffer[0], m_readLen, complete);
			if (complete)
			{
				DEBUG_PROFILE_END(1, "handle_read")
				size_t size=0;
				if (m_optionalBuffes.size())
					m_optionalBuffes.clear();
				if (m_module->execute(&m_result[0], size, &m_optionalBuffes) == EXECUTE_RESULT_QUIT)
					return ;
				else
				{
					m_readLen = 0;
					DEBUG_PROFILE_START(1)
					{
					if (m_optionalBuffes.size())
					{
						m_optionalBuffes.insert(m_optionalBuffes.begin(), buffer(&m_result[0], size));
						async_write(m_socket, m_optionalBuffes, boost::bind(&connection::handle_write, this,
						 boost::asio::placeholders::error));
					}else
					{
						async_write(m_socket, buffer(&m_result[0], size), boost::bind(&connection::handle_write, this,
						 boost::asio::placeholders::error));
						/*boost::asio::write(m_socket, buffer(&m_result[0], size), boost::asio::transfer_all());
						m_socket.async_read_some(buffer(&m_buffer[0], m_buffer.size()),
							boost::bind(&connection::handle_read, this,
							boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));*/
					}
					}
					m_module->cleanup();
					return;
				}
			}
			
			if (n > m_buffer.size())
				m_buffer.resize(n);
			m_socket.async_read_some(buffer(&m_buffer[m_readLen], m_buffer.size()-m_readLen),
					boost::bind(&connection::handle_read, this,
					boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
		}
	}
	
	void handle_write(const boost::system::error_code& e)
	{
		if (!e)
		{
			
			DEBUG_PROFILE_END(1, "write")
			m_socket.async_read_some(buffer(&m_buffer[0], m_buffer.size()),
			boost::bind(&connection::handle_read, this,
				boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
		}
				
	}
public:
	
	connection(): m_socket(m_ios)
		,m_readLen(0)
	{
		m_buffer.resize(READBUF_SIZE);
		m_result.resize(WRITEBUF_SIZE);
		mutex::scoped_lock lck(m_mutex);
		connections.push_back(this);
		++g_connections;
		
	}

	~connection()
	{
		
		if (connections.size())
		{
			mutex::scoped_lock lck(m_mutex);
			std::vector<connection*>::iterator it = find(connections.begin(), connections.end(), this);
			if (it != connections.end())
			{
				connections.erase( it);
				--g_connections;
			}
		}
	}
	
	void close()
	{
		m_ios.post(boost::bind(&connection::doClose, this));
	}
	
	void doClose()
	{
		boost::system::error_code ec;
		m_socket.close(ec);
	}
	
	io_service& ios()const{return m_ios;};
	
	tcp::socket& socket(){return m_socket;}
	
	void setModule(boost::shared_ptr<IAppModule> p)
	{
		m_module = p;
	}
	
	void sendConnectAccept()
	{
		m_ios.reset();
		const boost::asio::ip::tcp::no_delay nodelay(true);
		m_socket.set_option(nodelay);

		size_t n = m_module->onAccept(&m_result[0], WRITEBUF_SIZE);
		if (n)
			boost::asio::write(m_socket, buffer(&m_result[0], n), boost::asio::transfer_all());
	}
	
	void start()
	{
		m_socket.async_read_some(buffer(&m_buffer[0], m_buffer.size()),
					boost::bind(&connection::handle_read, this,
					boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
		m_ios.run();
	}
	
	static std::vector<connection* > connections;
	
	static void stop()
	{
		mutex::scoped_lock lck(m_mutex);
		try
		{
			for (size_t i=0;i<connections.size();i++)
				connections[i]->ios().stop();
		}
		catch(boost::system::system_error &){};
	}
	
};
std::vector<connection* > connection::connections;
mutex connection::m_mutex;

// ---------------------------------------------------------------------------
//		worker
// ---------------------------------------------------------------------------

class worker : private boost::noncopyable 
{

	static boost::mutex m_mutex;	
	static std::vector< shared_ptr<boost::thread> > m_threads;
	static std::vector<worker*> m_workers; 
	static worker* findWaitThread()
	{
		mutex::scoped_lock lck(m_mutex);
		for (size_t i=0;i<m_workers.size();i++)
			if (m_workers[i]->m_connection==NULL)
				return m_workers[i];
		return NULL;
	}
	
	boost::shared_ptr<connection>  m_connection;
	
	bool resume(){return shutdown || m_connection;}
	
	~worker()
	{
		mutex::scoped_lock lck(m_mutex);
		m_workers.erase( find(m_workers.begin(), m_workers.end(), this));
	}
	
public:
	static bool shutdown;
	static const char* hostCheckName;
	static boost::condition_variable condition;
	
	static void registThread(shared_ptr<boost::thread> t)
	{
		mutex::scoped_lock lck(m_mutex);
		m_threads.push_back(t);
	}

	static void join()
	{
		for (size_t i=0;i<m_threads.size();i++)
			m_threads[i]->join();
		//delete thread
		m_threads.erase(m_threads.begin(), m_threads.end());
	}

	static worker* get(const IAppModuleBuilder* app)
	{
		worker* p = findWaitThread();
		if (p==NULL)
		{
			boost::thread::attributes attr;
			attr.set_stack_size( 125 * 1024); 
			p = new worker();
			shared_ptr<boost::thread> t(new boost::thread(attr, bind(&worker::run, p, app)));
			registThread(t);
		}
		return p;
	}

	worker()
	{
		mutex::scoped_lock lck(m_mutex);
		m_workers.push_back(this);
	}

	void setConnection(boost::shared_ptr<connection> conn)
	{
		m_connection = conn;
	}

	void run(const IAppModuleBuilder*  app)
	{
		try
		{
			while (!shutdown)
			{
				if (m_connection)
				{
					boost::system::error_code ec;
					tcp::endpoint endpoint = m_connection->socket().remote_endpoint(ec);
					boost::shared_ptr<IAppModule> mod(((IAppModuleBuilder*)app)->createSessionModule(endpoint, m_connection.get(), SERVER_TYPE_CPT));
					m_connection->setModule(mod);
					if (mod->checkHost(hostCheckName))
					{
						m_connection->sendConnectAccept();
						m_connection->start(); //It does not return, unless a connection is close. 
					}
					m_connection.reset();
				}
				//TODO A used thread -- it is all held. 
				//The number of maintenance is decided and it is made not to hold any more.
				mutex::scoped_lock lck(m_mutex); 
				++g_waitThread;
				condition.wait(lck, bind(&worker::resume, this));
				--g_waitThread;
			}
		}
		catch(bzs::rtl::exception &e)
		{
			if (server::erh)
			{
				if(const std::string *msg = getMsg(e))
				{
					std::string s = "Cpt server " + *msg;
					server::erh->printError(s.c_str());
				}else
					server::erh->printError(boost::diagnostic_information(e).c_str());
			}
		}
		catch(...)
		{
			if (server::erh)
				server::erh->printError("Cpt server Unknown");
		}
		//An end of this thread will delete self.
		delete this;
	}

};

bool worker::shutdown = false;
const char* worker::hostCheckName;
boost::condition_variable worker::condition;
boost::mutex worker::m_mutex;	
std::vector< boost::shared_ptr<boost::thread> > worker::m_threads;
std::vector<worker*>  worker::m_workers;

// ---------------------------------------------------------------------------
//		listener
// ---------------------------------------------------------------------------

class listener
{
	boost::shared_ptr<IAppModuleBuilder> m_app;	
	boost::asio::ip::tcp::acceptor m_acceptor;
	boost::shared_ptr<connection> m_newConnection;
	server* m_srv;
public:
	listener(server* srv, 
			shared_ptr<IAppModuleBuilder> app, const std::string& address, const std::string& port)
			:m_srv(srv), m_app(app),m_acceptor(srv->ios())
	{

		m_newConnection.reset(new connection());
		tcp::resolver resolver(m_newConnection->ios());
		tcp::resolver::query query(address, port, resolver_query_base::numeric_service);
		tcp::endpoint endpoint = *resolver.resolve(query);
	
		m_acceptor.open(endpoint.protocol());
		m_acceptor.set_option(tcp::acceptor::reuse_address(true));
		m_acceptor.bind(endpoint);
		m_acceptor.listen((int)m_srv->maxConnections());
	}
	
	void startAsyncAccept()
	{
		if (m_srv->checkConnections())
			m_acceptor.async_accept(m_newConnection->socket(), boost::bind(&listener::onAccept, this, placeholders::error));
	}

	void onAccept(const boost::system::error_code& e)
	{
		//connection is passed to a thread and it resumes.
		if (!e)
		{
			worker* w = worker::get(m_app.get());
			w->setConnection(m_newConnection);
			worker::condition.notify_all();
			m_newConnection.reset(new connection());
			startAsyncAccept();
		}
	}

	void cancel()
	{
		boost::system::error_code  ec;
		m_acceptor.cancel(ec);
	}
};


// ---------------------------------------------------------------------------
//		server
// ---------------------------------------------------------------------------
inotifyHandler* server::erh=NULL;

/** server
 *	If it starts, a server will create the exclusive thread for accpter 
 *	and will go into an infinite loop. 
 */
server::server(std::size_t max_connections, const char* hostCheckName)
		: m_maxConnections(max_connections)	,m_timer(m_ios),m_stopped(false)
{
	worker::hostCheckName = hostCheckName;
}

server::~server()
{
	m_listeners.clear();
}

void server::addApplication(boost::shared_ptr<IAppModuleBuilder> app, const std::string& address
		, const std::string& port)
{
	m_listeners.push_back(boost::shared_ptr<listener>(new listener(this, app, address, port)));
}

/** Start the server
 */
void server::start()
{
	shared_ptr<boost::thread> t(new boost::thread(bind(&server::run, this)));
	worker::registThread(t);
}

bool server::checkConnections()
{
	while (connection::connections.size() > m_maxConnections)
	{
		Sleep(100*MCRTOMM);
		if (m_stopped)
			return false;
	}
	return true;
}

void server::startAsyncAccept()
{
	for (size_t i = 0;i < m_listeners.size();i++)
		m_listeners[i]->startAsyncAccept();
}

/** Strat the time.
 *  If the timer times out then call shoutdown ceck function
 */
void server::startTimer()
{
	m_timer.expires_from_now(posix_time::seconds(10));
	m_timer.async_wait(bind(&server::onCheckInternlShutdown, this, placeholders::error));

}

void server::run()
{
	if (erh)
		erh->printInfo("Start Cpt server.");
	
	startAsyncAccept();
	startTimer();
	m_ios.run();
}

void server::onCheckInternlShutdown(const boost::system::error_code& e)
{
	bool shutdown = false;
	{
		mutex::scoped_lock lck(modulesMutex);
		for (size_t i=0;i<modules.size();i++)
		{
			IAppModule* mod = modules[i];
			if (mod->isShutDown())
			{
				shutdown = true;
				break;
			}
		}	
	}
	
	if (shutdown)
		doStop();
	else
		startTimer();
}

void server::doStop()
{
	if (!m_stopped)
	{
		m_stopped = true;
		if (erh)
			erh->printInfo("Stopping Cpt server ...");

		for (size_t i = 0;i < m_listeners.size();i++)
			m_listeners[i]->cancel();
		worker::shutdown = true;
		connection::stop();
		m_ios.stop();
		worker::condition.notify_all();
	}
}

void server::stop()
{
	doStop();
	worker::join();

}


}//namespace cpt
}//namespace server
}//namesapce netsvc
}//namespace bzs