/* * This file is part of the Flowee project * Copyright (C) 2016-2026 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #ifndef FLOWEE_NETWORKMANAGER_P_H #define FLOWEE_NETWORKMANAGER_P_H /* * WARNING USAGE OF THIS HEADER IS RESTRICTED. * This Header file is part of the private API and is meant to be used solely by the NetworkManager component. * * Usage of this API will likely mean your code will break in interesting ways in the future, * or even stop to compile. * * YOU HAVE BEEN WARNED!! */ #include #if BOOST_VERSION > 107400 # define FLOWEE_NET_SSL 1 # include # include #endif #include "NetworkEndPoint.h" #include "NetworkConnection.h" #include #include #include #include #include #include #include #include #include #include class NetworkServiceBase; class NetworkManagerPrivate; using boost::asio::ip::tcp; /** * Ringbuffer is specially crafted for the NetworkManager to have a zero-alloc buffer. * The special part about this ringbuffer is that it allows you to have a read pointer * that you use to get items out which you then mark as read. This marking it as read * only affects your ability to read the next one. * Marking something as read does not mark that item as deletable. We still keep a reference * to the item! Especially useful if the item is a shared-pointer. * Then at some later point we allow removing of items. Either using removeTip() or using * removeAllRead() */ template class RingBuffer { public: RingBuffer(size_t size) : m_array(size + 1), NumItems(static_cast(size + 1)) { assert(size > 0); assert(size < static_cast(std::numeric_limits::max())); } void append(const V &v) { assert(!isFull()); if (isFull()) throw std::runtime_error("RingBuffer is full"); m_array[m_next] = v; if (++m_next >= NumItems) m_next = 0; assert(m_next != m_first); } /// total amount of space in this ringbuffer inline int reserved() const { return NumItems - 1; } /// Amount of items filled inline int count() const { return m_next - m_first + (m_next < m_first /* we went circular */ ? NumItems : 0); } /// reserved minus usage int slotsAvailable() const { return reserved() - count(); } /// alias for count() inline int size() const { return count(); } inline bool isEmpty() const { return m_first == m_next; } /// the tip is the first inserted, but not yet removed item. inline const V &tip() const { assert(m_first != m_next); // aka empty return m_array[m_first]; } /// remove the tip, moving the tip to the next item. inline void removeTip() { assert(m_first != m_next); // aka empty m_array[m_first] = V(); if (++m_first >= NumItems) m_first = 0; if (m_first <= m_next) // standard linear list m_readIndex = std::max(m_readIndex, m_first); else if (m_readIndex < m_first && m_readIndex > m_next) // circular list state m_readIndex = m_first; } /// an item just inserted is unread, we read in the same order as insertion. inline void markRead(int count = 1) { assert(count < NumItems); assert(count > 0); m_readIndex += count; while (m_readIndex >= NumItems) m_readIndex -= NumItems; assert(m_first < m_next && m_readIndex >= m_first && m_readIndex <= m_next || (m_first > m_next && (m_readIndex >= m_first || m_readIndex <= m_next))); } inline void removeAllRead() { while (m_first != m_readIndex) { m_array[m_first] = V(); if (++m_first >= NumItems) m_first = 0; } } /// first not yet read item. inline const V &unreadTip() const { assert(m_first < m_next && m_readIndex >= m_first && m_readIndex < m_next || (m_first > m_next && (m_readIndex >= m_first || m_readIndex < m_next))); return m_array[m_readIndex]; } /// returns true, like isEmpty(), when there are no unread items. inline bool isRead() const { return m_readIndex == m_next; } /// Return true if there are items inserted, but not yet marked read (inverse of isRead()) inline bool hasUnread() const { return m_readIndex != m_next; } inline bool isFull() const { return (m_next + 1) % NumItems == m_first; } inline bool hasItemsMarkedRead() const { return m_readIndex != m_first; } inline void markAllUnread() { m_readIndex = m_first; } /// clear all data. inline void clear() { const int end = count() + m_first; for (int i = m_first; i < end; ++i) { m_array[i % NumItems] = V(); } m_first = 0; m_readIndex = 0; m_next = 0; } private: /* We append at 'next', increasing it to point to the first unused one. * As this is a FIFO, we move the m_first and m_readIndex as we process (and remove) items, * if we reach the 'next' position we have an empty buffer. * * Write | variable | Read * * m_first -> tip() * | * markRead() -> m_readIndex -> unreadTip() * | * last-item * append() -> m_next -> nullptr */ std::vector m_array; int m_first = 0; int m_readIndex = 0; int m_next = 0; // last plus one const int NumItems; }; class NetworkManagerSocketProxyBase { public: NetworkManagerSocketProxyBase() = default; virtual ~NetworkManagerSocketProxyBase() = default; virtual boost::asio::ip::tcp::endpoint remote_endpoint() const = 0; virtual void newConnection(const tcp::endpoint &to, const std::shared_ptr &owner, const std::function &callback) = 0; virtual void async_write(const std::vector &data, const std::function &callback) = 0; virtual void async_receive(Streaming::BufferPool &pool, const std::function &callback) = 0; virtual bool is_open() const = 0; virtual void close() = 0; // should reflect the EndPoint.encrypted boolean virtual bool isEncrypted() const = 0; NetworkManagerConnection *q = nullptr; }; class NetworkManagerConnection : public std::enable_shared_from_this { public: enum MessageHeaderType { FloweeNative, LegacyP2P }; NetworkManagerConnection(const std::shared_ptr &parent, NetworkManagerSocketProxyBase *socketProxy, int connectionId); NetworkManagerConnection(const std::shared_ptr &parent, const EndPoint &remote); ~NetworkManagerConnection(); /// Connects to remote (async) void connect(); int nextCallbackId(); /// unregister a NetworkConnection. Calls have to be from the strand. void removeAllCallbacksFor(int id); void queueMessage(const Message &message, NetworkConnection::MessagePriority priority); inline bool isConnected() const { return m_isConnected; } inline const EndPoint &endPoint() const { return m_remote; } void setEndPoint(const EndPoint &ep) { m_remote = ep; } /// add callback, calls have to be on the strand. void addOnConnectedCallback(int id, std::function callback); /// add callback, calls have to be on the strand. void addOnDisconnectedCallback(int id, std::function callback); /// add callback, calls have to be on the strand. void addOnIncomingMessageCallback(int id, std::function callback); /// add callback, calls have to be on the strand. void addOnError(int id, std::function callback); void setLoginCreator(const std::function &creator); /// forcably shut down the connection, soon you should no longer reference this object void shutdown(); /// only incoming connections need accepting. void accept(NetworkConnection::AcceptLimit limit); inline void disconnect() { close(false); if (m_priorityMessageQueue) m_priorityMessageQueue->clear(); if (m_messageQueue) m_messageQueue->clear(); } void recycleConnection(); boost::asio::io_context::strand m_strand; /// move a call to the thread that the strand represents void runOnStrand(const std::function &function); inline bool acceptedConnection() const { return m_acceptedConnection; } void setMessageHeaderType(MessageHeaderType messageHeaderType); void punish(int amount); void setMessageQueueSizes(int main, int priority); void close(bool reconnect = true); // close down connection void setCertificate(const Streaming::ConstBuffer &cert); short m_punishment = 0; // aka ban-sore // used to check incoming messages being actually for us MessageHeaderType m_messageHeaderType = FloweeNative; std::shared_ptr d; // called from helper classes... void errorDetected(const boost::system::error_code &error); void requestMoreBytes(); private: EndPoint m_remote; void onAddressResolveComplete(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::results_type results); void onConnectComplete(const boost::system::error_code& error); /// call then to start sending messages to remote. Will do noting if we are already sending messages. void runMessageQueue(); void sentSomeBytes(const boost::system::error_code& error, std::size_t bytes_transferred); void requestMoreBytes_callback(const boost::system::error_code &error); void receivedSomeBytes(const boost::system::error_code& error, std::size_t bytes_transferred); bool processPacket(const std::shared_ptr &buffer, const char *data); bool processLegacyPacket(const std::shared_ptr &buffer, const char *data); void connect_priv(); // thread-unsafe version of connect void reconnectWithCheck(const boost::system::error_code& error); // called from the m_reconectDelay timer void finalShutdown(); void sendPing(const boost::system::error_code& error); void pingTimeout(const boost::system::error_code& error); void allocateBuffers(); void callOnDisconnectedCallbacks(); inline bool isOutgoing() const { return m_remote.announcePort == m_remote.peerPort; } Streaming::ConstBuffer createHeader(const Message &message); std::map > m_onConnectedCallbacks; std::map > m_onDisConnectedCallbacks; std::map > m_onIncomingMessageCallbacks; std::map > m_onErrorCallbacks; std::function m_loginMessageCreator; NetworkManagerSocketProxyBase *m_socketProxy = nullptr; tcp::resolver m_resolver; std::unique_ptr > m_messageQueue; std::unique_ptr > m_priorityMessageQueue; std::unique_ptr > m_sendQHeaders; int m_messageBytesSend = 0; Streaming::BufferPool m_receiveStream; mutable std::atomic m_lastCallbackId; std::atomic m_isClosingDown; bool m_firstPacket = true; /// The connection that we used to notify of a new incoming connection is the only /// one that will get the first message, likely for some authentication feature. bool m_firstMessageIsForAcceptConnection = false; bool m_isConnecting = false; bool m_isConnected; bool m_sendingInProgress = false; bool m_acceptedConnection = false; int m_queueSizeMain = 2000; // config setting for the ringbuffers sizes int m_priorityQueueSize = 20; // ditto // flow-control limits /* * When on processing the received messages, the send queue reached this size * we interrupt and do a quick send. */ int m_forceSendLimit = 750; /* * A send-message queue size limit that determines * if we throttle waiting on receive, and by how much. * Below L1 we don't wait. * Below L2 we wait 10ms, below L3 we wait 50ms. */ int m_throttleReceiveAtSendLimitL1 = 1000; int m_throttleReceiveAtSendLimitL2 = 1500; int m_throttleReceiveAtSendLimitL3 = 1900; short m_reconnectStep = 0; boost::asio::system_timer m_reconnectDelay; // for these I write 'ping' but its 'pong' for server (incoming) connections. boost::asio::system_timer m_pingTimer; boost::posix_time::ptime m_lastPong; // timestamp of the last pong seen by the outgoing connection boost::asio::steady_timer m_sendTimer; Message m_pingMessage; // chunked messages can be recombined. Streaming::BufferPool m_chunkedMessageBuffer; int m_chunkedServiceId = -1; int m_chunkedMessageId = -1; std::map m_chunkedHeaderData; }; // NetworkManagerServerBase and its two implementations are the server side part // that listens on a network interface and spawning a new connection. // The two subclasses mark the difference between a raw socket connection and an ssl-wrapped one. class NetworkManagerServerBase { public: /** * Constructor. * @param parent the link to the NetworkManager * @param endpoint which interface to listen on. * @param callback to call on creating a connection. */ NetworkManagerServerBase(const std::shared_ptr &parent, tcp::endpoint endpoint, const std::function &callback); virtual ~NetworkManagerServerBase() = default; /// the callback when a connection comes in, will call createNewConnection() void acceptConnection(boost::asio::ip::tcp::socket && socket); virtual void shutdown() = 0; protected: virtual void setupCallback() = 0; virtual std::shared_ptr createNewConnection(int connectionId, boost::asio::ip::tcp::socket && socket) = 0; std::weak_ptr d; tcp::acceptor m_acceptor; tcp::endpoint m_endpoint; std::function onIncomingConnection; // callback }; #ifdef FLOWEE_NET_SSL /// Implements a server that binds and handles ssl connection setup. class NetworkManagerServerSSL : public NetworkManagerServerBase { public: NetworkManagerServerSSL(const std::shared_ptr &parent, tcp::endpoint endpoint, const std::function &callback); void start() { setupCallback(); } void shutdown() override; void setCertificateChain(const Streaming::ConstBuffer &certificateChain); void setPrivateKey(const Streaming::ConstBuffer &privateKey); void setDHTemp(const Streaming::ConstBuffer &dhTemp); protected: void setupCallback() override; std::shared_ptr createNewConnection(int connectionId, boost::asio::ip::tcp::socket && socket) override; private: boost::asio::ssl::context m_context; }; class NetworkManagerSSLSocketProxy : public NetworkManagerSocketProxyBase { public: NetworkManagerSSLSocketProxy(boost::asio::ssl::stream socket); NetworkManagerSSLSocketProxy(boost::asio::io_context &ioContext); void setClientCertificate(const Streaming::ConstBuffer &data); tcp::endpoint remote_endpoint() const override; void newConnection(const tcp::endpoint &to, const std::shared_ptr &owner, const std::function &callback) override; void async_write(const std::vector &data, const std::function &callback) override; void async_receive(Streaming::BufferPool &pool, const std::function &callback) override; bool is_open() const override; void close() override; bool isEncrypted() const override { return true; } void doASyncHandshake(const std::shared_ptr &owner); private: // for outgoing connections we might need to make a new socket // we need the context for that. boost::asio::ssl::context m_context; // the socket we work on. boost::asio::ssl::stream m_socket; }; #endif class NetworkManagerServerSimple : public NetworkManagerServerBase { public: NetworkManagerServerSimple(const std::shared_ptr &parent, tcp::endpoint endpoint, const std::function &callback); void shutdown() override; protected: void setupCallback() override; std::shared_ptr createNewConnection(int connectionId, boost::asio::ip::tcp::socket && socket) override; private: tcp::socket m_socket; }; class NetworkManagerBasicSocketProxy : public NetworkManagerSocketProxyBase { public: NetworkManagerBasicSocketProxy(boost::asio::io_context &io_context); NetworkManagerBasicSocketProxy(tcp::socket && socket); tcp::endpoint remote_endpoint() const override; void newConnection(const tcp::endpoint &to, const std::shared_ptr &owner, const std::function &callback) override; void async_write(const std::vector &data, const std::function &callback) override; void async_receive(Streaming::BufferPool &pool, const std::function &callback) override; bool is_open() const override; void close() override; bool isEncrypted() const override { return false; } private: tcp::socket m_socket; }; struct BannedNode { EndPoint endPoint; boost::posix_time::ptime banTimeout; }; class NetworkManagerPrivate { public: NetworkManagerPrivate(boost::asio::io_context &context); ~NetworkManagerPrivate(); inline void alwaysConnectingNewConnectionHandler(NetworkConnection &con) { con.accept(); } enum ListenType { PlainSocket, SSLSocket }; NetworkManagerServerBase *bind(ListenType type, std::shared_ptr me, const boost::asio::ip::tcp::endpoint &endpoint, const std::function &callback); void punishNode(int connectionId, int punishScore); void cronHourly(const boost::system::error_code& error); boost::asio::io_context& ioContext; std::map > connections; std::deque > unusedConnections; int lastConnectionId; std::recursive_mutex mutex; // to lock access to things like the connections map std::mutex connectionMutex; bool isClosingDown; std::vector servers; std::list banned; std::list services; boost::asio::system_timer m_cronHourly; // support for the p2p legacy envelope design uint8_t networkId[4] = { 0xE3, 0xE1, 0xF3, 0xE8}; std::map messageIds; std::map messageIdsReverse; size_t m_maxIncomingMessageSize; }; #endif