// // chat_server.cpp // ~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using asio::ip::tcp; using asio::awaitable; using asio::co_spawn; using asio::detached; using asio::redirect_error; using asio::use_awaitable; //---------------------------------------------------------------------- class chat_participant { public: virtual ~chat_participant() {} virtual void deliver(const std::string& msg) = 0; }; typedef std::shared_ptr chat_participant_ptr; //---------------------------------------------------------------------- class chat_room { public: void join(chat_participant_ptr participant) { participants_.insert(participant); for (auto msg: recent_msgs_) participant->deliver(msg); } void leave(chat_participant_ptr participant) { participants_.erase(participant); } void deliver(const std::string& msg) { recent_msgs_.push_back(msg); while (recent_msgs_.size() > max_recent_msgs) recent_msgs_.pop_front(); for (auto participant: participants_) participant->deliver(msg); } private: std::set participants_; enum { max_recent_msgs = 100 }; std::deque recent_msgs_; }; //---------------------------------------------------------------------- class chat_session : public chat_participant, public std::enable_shared_from_this { public: chat_session(tcp::socket socket, chat_room& room) : socket_(std::move(socket)), timer_(socket_.get_executor()), room_(room) { timer_.expires_at(std::chrono::steady_clock::time_point::max()); } void start() { room_.join(shared_from_this()); co_spawn(socket_.get_executor(), [self = shared_from_this()]{ return self->reader(); }, detached); co_spawn(socket_.get_executor(), [self = shared_from_this()]{ return self->writer(); }, detached); } void deliver(const std::string& msg) { write_msgs_.push_back(msg); timer_.cancel_one(); } private: awaitable reader() { try { for (std::string read_msg;;) { std::size_t n = co_await asio::async_read_until(socket_, asio::dynamic_buffer(read_msg, 1024), "\n", use_awaitable); room_.deliver(read_msg.substr(0, n)); read_msg.erase(0, n); } } catch (std::exception&) { stop(); } } awaitable writer() { try { while (socket_.is_open()) { if (write_msgs_.empty()) { asio::error_code ec; co_await timer_.async_wait(redirect_error(use_awaitable, ec)); } else { co_await asio::async_write(socket_, asio::buffer(write_msgs_.front()), use_awaitable); write_msgs_.pop_front(); } } } catch (std::exception&) { stop(); } } void stop() { room_.leave(shared_from_this()); socket_.close(); timer_.cancel(); } tcp::socket socket_; asio::steady_timer timer_; chat_room& room_; std::deque write_msgs_; }; //---------------------------------------------------------------------- awaitable listener(tcp::acceptor acceptor) { chat_room room; for (;;) { std::make_shared( co_await acceptor.async_accept(use_awaitable), room )->start(); } } //---------------------------------------------------------------------- int main(int argc, char* argv[]) { try { if (argc < 2) { std::cerr << "Usage: chat_server [ ...]\n"; return 1; } asio::io_context io_context(1); for (int i = 1; i < argc; ++i) { unsigned short port = std::atoi(argv[i]); co_spawn(io_context, [&io_context, port] { return listener(tcp::acceptor(io_context, {tcp::v4(), port})); }, detached); } asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ io_context.stop(); }); io_context.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }