Files
thehub/libs/networkmanager/NetworkManager_p.h
tomFlowee 21a6b1bef4 Cap reassmbled native messages.
Implement old TODO; set the maximum size of a sequenced message to 64MiB
by default.
We use a variable, so it should be easy to make a setter, but without
usecase we'll wait to write code that would be unused.
2026-05-19 18:46:35 +02:00

566 lines
20 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2016-2026 Tom Zander <tom@flowee.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef FLOWEE_NETWORKMANAGER_P_H
#define FLOWEE_NETWORKMANAGER_P_H
/*
* WARNING USAGE OF THIS HEADER IS RESTRICTED.
* This Header file is part of the private API and is meant to be used solely by the NetworkManager component.
*
* Usage of this API will likely mean your code will break in interesting ways in the future,
* or even stop to compile.
*
* YOU HAVE BEEN WARNED!!
*/
#include <boost/version.hpp>
#if BOOST_VERSION > 107400
# define FLOWEE_NET_SSL 1
# include <boost/asio/registered_buffer.hpp>
# include <boost/asio/ssl.hpp>
#endif
#include "NetworkEndPoint.h"
#include "NetworkConnection.h"
#include <Message.h>
#include <streaming/BufferPool.h>
#include <list>
#include <deque>
#include <mutex>
#include <atomic>
#include <limits>
#include <stdexcept>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/asio.hpp>
class NetworkServiceBase;
class NetworkManagerPrivate;
using boost::asio::ip::tcp;
/**
* Ringbuffer is specially crafted for the NetworkManager to have a zero-alloc buffer.
* The special part about this ringbuffer is that it allows you to have a read pointer
* that you use to get items out which you then mark as read. This marking it as read
* only affects your ability to read the next one.
* Marking something as read does not mark that item as deletable. We still keep a reference
* to the item! Especially useful if the item is a shared-pointer.
* Then at some later point we allow removing of items. Either using removeTip() or using
* removeAllRead()
*/
template<class V>
class RingBuffer
{
public:
RingBuffer(size_t size) : m_array(size + 1), NumItems(static_cast<int>(size + 1)) {
assert(size > 0);
assert(size < static_cast<size_t>(std::numeric_limits<int>::max()));
}
void append(const V &v) {
assert(!isFull());
if (isFull())
throw std::runtime_error("RingBuffer is full");
m_array[m_next] = v;
if (++m_next >= NumItems)
m_next = 0;
assert(m_next != m_first);
}
/// total amount of space in this ringbuffer
inline int reserved() const { return NumItems - 1; }
/// Amount of items filled
inline int count() const {
return m_next - m_first + (m_next < m_first /* we went circular */ ? NumItems : 0);
}
/// reserved minus usage
int slotsAvailable() const {
return reserved() - count();
}
/// alias for count()
inline int size() const { return count(); }
inline bool isEmpty() const { return m_first == m_next; }
/// the tip is the first inserted, but not yet removed item.
inline const V &tip() const {
assert(m_first != m_next); // aka empty
return m_array[m_first];
}
/// remove the tip, moving the tip to the next item.
inline void removeTip() {
assert(m_first != m_next); // aka empty
m_array[m_first] = V();
if (++m_first >= NumItems)
m_first = 0;
if (m_first <= m_next) // standard linear list
m_readIndex = std::max(m_readIndex, m_first);
else if (m_readIndex < m_first && m_readIndex > m_next) // circular list state
m_readIndex = m_first;
}
/// an item just inserted is unread, we read in the same order as insertion.
inline void markRead(int count = 1) {
assert(count < NumItems);
assert(count > 0);
m_readIndex += count;
while (m_readIndex >= NumItems)
m_readIndex -= NumItems;
assert(m_first < m_next && m_readIndex >= m_first && m_readIndex <= m_next
|| (m_first > m_next && (m_readIndex >= m_first || m_readIndex <= m_next)));
}
inline void removeAllRead() {
while (m_first != m_readIndex) {
m_array[m_first] = V();
if (++m_first >= NumItems)
m_first = 0;
}
}
/// first not yet read item.
inline const V &unreadTip() const {
assert(m_first < m_next && m_readIndex >= m_first && m_readIndex < m_next
|| (m_first > m_next && (m_readIndex >= m_first || m_readIndex < m_next)));
return m_array[m_readIndex];
}
/// returns true, like isEmpty(), when there are no unread items.
inline bool isRead() const { return m_readIndex == m_next; }
/// Return true if there are items inserted, but not yet marked read (inverse of isRead())
inline bool hasUnread() const { return m_readIndex != m_next; }
inline bool isFull() const { return (m_next + 1) % NumItems == m_first; }
inline bool hasItemsMarkedRead() const {
return m_readIndex != m_first;
}
inline void markAllUnread() {
m_readIndex = m_first;
}
/// clear all data.
inline void clear() {
const int end = count() + m_first;
for (int i = m_first; i < end; ++i) {
m_array[i % NumItems] = V();
}
m_first = 0;
m_readIndex = 0;
m_next = 0;
}
private:
/* We append at 'next', increasing it to point to the first unused one.
* As this is a FIFO, we move the m_first and m_readIndex as we process (and remove) items,
* if we reach the 'next' position we have an empty buffer.
*
* Write | variable | Read
*
* m_first -> tip()
* |
* markRead() -> m_readIndex -> unreadTip()
* |
* last-item
* append() -> m_next -> nullptr
*/
std::vector<V> m_array;
int m_first = 0;
int m_readIndex = 0;
int m_next = 0; // last plus one
const int NumItems;
};
class NetworkManagerSocketProxyBase
{
public:
NetworkManagerSocketProxyBase() = default;
virtual ~NetworkManagerSocketProxyBase() = default;
virtual boost::asio::ip::tcp::endpoint remote_endpoint() const = 0;
virtual void newConnection(const tcp::endpoint &to, const std::shared_ptr<NetworkManagerConnection> &owner,
const std::function<void(const boost::system::error_code &error)> &callback) = 0;
virtual void async_write(const std::vector<Streaming::ConstBuffer> &data,
const std::function<void(const boost::system::error_code &error,std::size_t bytes_transferred)> &callback) = 0;
virtual void async_receive(Streaming::BufferPool &pool,
const std::function<void(const boost::system::error_code &error,std::size_t bytes_transferred)> &callback) = 0;
virtual bool is_open() const = 0;
virtual void close() = 0;
// should reflect the EndPoint.encrypted boolean
virtual bool isEncrypted() const = 0;
NetworkManagerConnection *q = nullptr;
};
class NetworkManagerConnection : public std::enable_shared_from_this<NetworkManagerConnection>
{
public:
enum MessageHeaderType {
FloweeNative,
LegacyP2P
};
NetworkManagerConnection(const std::shared_ptr<NetworkManagerPrivate> &parent, NetworkManagerSocketProxyBase *socketProxy, int connectionId);
NetworkManagerConnection(const std::shared_ptr<NetworkManagerPrivate> &parent, const EndPoint &remote);
~NetworkManagerConnection();
/// Connects to remote (async)
void connect();
int nextCallbackId();
/// unregister a NetworkConnection. Calls have to be from the strand.
void removeAllCallbacksFor(int id);
void queueMessage(const Message &message, NetworkConnection::MessagePriority priority);
inline bool isConnected() const {
return m_isConnected;
}
inline const EndPoint &endPoint() const {
return m_remote;
}
void setEndPoint(const EndPoint &ep) {
m_remote = ep;
}
/// add callback, calls have to be on the strand.
void addOnConnectedCallback(int id, std::function<void(const EndPoint&)> callback);
/// add callback, calls have to be on the strand.
void addOnDisconnectedCallback(int id, std::function<void(const EndPoint&)> callback);
/// add callback, calls have to be on the strand.
void addOnIncomingMessageCallback(int id, std::function<void(const Message&)> callback);
/// add callback, calls have to be on the strand.
void addOnError(int id, std::function<void(int,const boost::system::error_code&)> callback);
void setLoginCreator(const std::function<Message()> &creator);
/// forcably shut down the connection, soon you should no longer reference this object
void shutdown();
/// only incoming connections need accepting.
void accept(NetworkConnection::AcceptLimit limit);
inline void disconnect() {
close(false);
if (m_priorityMessageQueue)
m_priorityMessageQueue->clear();
if (m_messageQueue)
m_messageQueue->clear();
}
void recycleConnection();
boost::asio::io_context::strand m_strand;
/// move a call to the thread that the strand represents
void runOnStrand(const std::function<void()> &function);
inline bool acceptedConnection() const {
return m_acceptedConnection;
}
void setMessageHeaderType(MessageHeaderType messageHeaderType);
void punish(int amount);
void setMessageQueueSizes(int main, int priority);
void close(bool reconnect = true); // close down connection
void setCertificate(const Streaming::ConstBuffer &cert);
short m_punishment = 0; // aka ban-sore
// used to check incoming messages being actually for us
MessageHeaderType m_messageHeaderType = FloweeNative;
std::shared_ptr<NetworkManagerPrivate> d;
// called from helper classes...
void errorDetected(const boost::system::error_code &error);
void requestMoreBytes();
private:
EndPoint m_remote;
void onAddressResolveComplete(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::results_type results);
void onConnectComplete(const boost::system::error_code& error);
/// call then to start sending messages to remote. Will do noting if we are already sending messages.
void runMessageQueue();
void sentSomeBytes(const boost::system::error_code& error, std::size_t bytes_transferred);
void requestMoreBytes_callback(const boost::system::error_code &error);
void receivedSomeBytes(const boost::system::error_code& error, std::size_t bytes_transferred);
bool processPacket(const std::shared_ptr<char> &buffer, const char *data);
bool processLegacyPacket(const std::shared_ptr<char> &buffer, const char *data);
void connect_priv(); // thread-unsafe version of connect
void reconnectWithCheck(const boost::system::error_code& error); // called from the m_reconectDelay timer
void finalShutdown();
void sendPing(const boost::system::error_code& error);
void pingTimeout(const boost::system::error_code& error);
void allocateBuffers();
void callOnDisconnectedCallbacks();
inline bool isOutgoing() const {
return m_remote.announcePort == m_remote.peerPort;
}
Streaming::ConstBuffer createHeader(const Message &message);
std::map<int, std::function<void(const EndPoint&)> > m_onConnectedCallbacks;
std::map<int, std::function<void(const EndPoint&)> > m_onDisConnectedCallbacks;
std::map<int, std::function<void(const Message&)> > m_onIncomingMessageCallbacks;
std::map<int, std::function<void(int,const boost::system::error_code&)> > m_onErrorCallbacks;
std::function<Message()> m_loginMessageCreator;
NetworkManagerSocketProxyBase *m_socketProxy = nullptr;
tcp::resolver m_resolver;
std::unique_ptr<RingBuffer<Message> > m_messageQueue;
std::unique_ptr<RingBuffer<Message> > m_priorityMessageQueue;
std::unique_ptr<RingBuffer<Streaming::ConstBuffer> > m_sendQHeaders;
int m_messageBytesSend = 0;
Streaming::BufferPool m_receiveStream;
mutable std::atomic<int> m_lastCallbackId;
std::atomic<bool> m_isClosingDown;
bool m_firstPacket = true;
/// The connection that we used to notify of a new incoming connection is the only
/// one that will get the first message, likely for some authentication feature.
bool m_firstMessageIsForAcceptConnection = false;
bool m_isConnecting = false;
bool m_isConnected;
bool m_sendingInProgress = false;
bool m_acceptedConnection = false;
int m_queueSizeMain = 2000; // config setting for the ringbuffers sizes
int m_priorityQueueSize = 20; // ditto
// flow-control limits
/*
* When on processing the received messages, the send queue reached this size
* we interrupt and do a quick send.
*/
int m_forceSendLimit = 750;
/*
* A send-message queue size limit that determines
* if we throttle waiting on receive, and by how much.
* Below L1 we don't wait.
* Below L2 we wait 10ms, below L3 we wait 50ms.
*/
int m_throttleReceiveAtSendLimitL1 = 1000;
int m_throttleReceiveAtSendLimitL2 = 1500;
int m_throttleReceiveAtSendLimitL3 = 1900;
short m_reconnectStep = 0;
boost::asio::system_timer m_reconnectDelay;
// for these I write 'ping' but its 'pong' for server (incoming) connections.
boost::asio::system_timer m_pingTimer;
boost::posix_time::ptime m_lastPong; // timestamp of the last pong seen by the outgoing connection
boost::asio::steady_timer m_sendTimer;
Message m_pingMessage;
// chunked messages can be recombined.
Streaming::BufferPool m_chunkedMessageBuffer;
int m_chunkedServiceId = -1;
int m_chunkedMessageId = -1;
std::map<int, int> m_chunkedHeaderData;
};
// NetworkManagerServerBase and its two implementations are the server side part
// that listens on a network interface and spawning a new connection.
// The two subclasses mark the difference between a raw socket connection and an ssl-wrapped one.
class NetworkManagerServerBase
{
public:
/**
* Constructor.
* @param parent the link to the NetworkManager
* @param endpoint which interface to listen on.
* @param callback to call on creating a connection.
*/
NetworkManagerServerBase(const std::shared_ptr<NetworkManagerPrivate> &parent, tcp::endpoint endpoint, const std::function<void(NetworkConnection&)> &callback);
virtual ~NetworkManagerServerBase() = default;
/// the callback when a connection comes in, will call createNewConnection()
void acceptConnection(boost::asio::ip::tcp::socket && socket);
virtual void shutdown() = 0;
protected:
virtual void setupCallback() = 0;
virtual std::shared_ptr<NetworkManagerConnection> createNewConnection(int connectionId, boost::asio::ip::tcp::socket && socket) = 0;
std::weak_ptr<NetworkManagerPrivate> d;
tcp::acceptor m_acceptor;
tcp::endpoint m_endpoint;
std::function<void(NetworkConnection&)> onIncomingConnection; // callback
};
#ifdef FLOWEE_NET_SSL
/// Implements a server that binds and handles ssl connection setup.
class NetworkManagerServerSSL : public NetworkManagerServerBase
{
public:
NetworkManagerServerSSL(const std::shared_ptr<NetworkManagerPrivate> &parent, tcp::endpoint endpoint, const std::function<void(NetworkConnection&)> &callback);
void start() {
setupCallback();
}
void shutdown() override;
void setCertificateChain(const Streaming::ConstBuffer &certificateChain);
void setPrivateKey(const Streaming::ConstBuffer &privateKey);
void setDHTemp(const Streaming::ConstBuffer &dhTemp);
protected:
void setupCallback() override;
std::shared_ptr<NetworkManagerConnection> createNewConnection(int connectionId, boost::asio::ip::tcp::socket && socket) override;
private:
boost::asio::ssl::context m_context;
};
class NetworkManagerSSLSocketProxy : public NetworkManagerSocketProxyBase
{
public:
NetworkManagerSSLSocketProxy(boost::asio::ssl::stream<tcp::socket> socket);
NetworkManagerSSLSocketProxy(boost::asio::io_context &ioContext);
void setClientCertificate(const Streaming::ConstBuffer &data);
tcp::endpoint remote_endpoint() const override;
void newConnection(const tcp::endpoint &to, const std::shared_ptr<NetworkManagerConnection> &owner,
const std::function<void (const boost::system::error_code &e)> &callback) override;
void async_write(const std::vector<Streaming::ConstBuffer> &data,
const std::function<void (const boost::system::error_code &e, std::size_t)> &callback) override;
void async_receive(Streaming::BufferPool &pool,
const std::function<void (const boost::system::error_code &e, std::size_t bytesReceived)> &callback) override;
bool is_open() const override;
void close() override;
bool isEncrypted() const override {
return true;
}
void doASyncHandshake(const std::shared_ptr<NetworkManagerConnection> &owner);
private:
// for outgoing connections we might need to make a new socket
// we need the context for that.
boost::asio::ssl::context m_context;
// the socket we work on.
boost::asio::ssl::stream<tcp::socket> m_socket;
};
#endif
class NetworkManagerServerSimple : public NetworkManagerServerBase
{
public:
NetworkManagerServerSimple(const std::shared_ptr<NetworkManagerPrivate> &parent, tcp::endpoint endpoint, const std::function<void(NetworkConnection&)> &callback);
void shutdown() override;
protected:
void setupCallback() override;
std::shared_ptr<NetworkManagerConnection> createNewConnection(int connectionId, boost::asio::ip::tcp::socket && socket) override;
private:
tcp::socket m_socket;
};
class NetworkManagerBasicSocketProxy : public NetworkManagerSocketProxyBase
{
public:
NetworkManagerBasicSocketProxy(boost::asio::io_context &io_context);
NetworkManagerBasicSocketProxy(tcp::socket && socket);
tcp::endpoint remote_endpoint() const override;
void newConnection(const tcp::endpoint &to, const std::shared_ptr<NetworkManagerConnection> &owner,
const std::function<void (const boost::system::error_code &e)> &callback) override;
void async_write(const std::vector<Streaming::ConstBuffer> &data,
const std::function<void (const boost::system::error_code &e, std::size_t)> &callback) override;
void async_receive(Streaming::BufferPool &pool,
const std::function<void (const boost::system::error_code &e, std::size_t bytesReceived)> &callback) override;
bool is_open() const override;
void close() override;
bool isEncrypted() const override {
return false;
}
private:
tcp::socket m_socket;
};
struct BannedNode
{
EndPoint endPoint;
boost::posix_time::ptime banTimeout;
};
class NetworkManagerPrivate
{
public:
NetworkManagerPrivate(boost::asio::io_context &context);
~NetworkManagerPrivate();
inline void alwaysConnectingNewConnectionHandler(NetworkConnection &con) {
con.accept();
}
enum ListenType {
PlainSocket,
SSLSocket
};
NetworkManagerServerBase *bind(ListenType type,
std::shared_ptr<NetworkManagerPrivate> me,
const boost::asio::ip::tcp::endpoint &endpoint, const std::function<void(NetworkConnection&)> &callback);
void punishNode(int connectionId, int punishScore);
void cronHourly(const boost::system::error_code& error);
boost::asio::io_context& ioContext;
std::map<int, std::shared_ptr<NetworkManagerConnection> > connections;
std::deque<std::shared_ptr<NetworkManagerConnection> > unusedConnections;
int lastConnectionId;
std::recursive_mutex mutex; // to lock access to things like the connections map
std::mutex connectionMutex;
bool isClosingDown;
std::vector<NetworkManagerServerBase *> servers;
std::list<BannedNode> banned;
std::list<NetworkServiceBase*> services;
boost::asio::system_timer m_cronHourly;
// support for the p2p legacy envelope design
uint8_t networkId[4] = { 0xE3, 0xE1, 0xF3, 0xE8};
std::map<int, std::string> messageIds;
std::map<std::string, int> messageIdsReverse;
size_t m_maxIncomingMessageSize;
};
#endif