/* * This file is part of the Flowee project * Copyright (C) 2016-2026 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "NetworkManager.h" #include "NetworkManager_p.h" #include "NetworkQueueFullError.h" #include "NetworkServiceBase.h" #include #include #include #include #include #include #include #include #include #include // #define DEBUG_CONNECTIONS constexpr int RECEIVE_STREAM_SIZE = 200000; constexpr int CHUNK_SIZE = 8000; constexpr int MAX_MESSAGE_SIZE = 9000; constexpr int LEGACY_HEADER_SIZE = 24; constexpr size_t DEFAULT_MAX_INCOMING_MESSAGE_SIZE = 64 * 1024 * 1024; // (64MiB) namespace { thread_local std::shared_ptr m_buffer = std::make_shared(10240); // used only really for message-headers inline std::shared_ptr pool(int reserveSize) { m_buffer->reserve(reserveSize); return m_buffer; } int reconnectTimeoutForStep(short step) { if (step < 15) { int val = 2 * step * step; return std::max(1, (val + 5 )/ 10); } return 44; } Message buildPingMessage(bool outgoingConnection) { Streaming::MessageBuilder builder(Streaming::HeaderOnly, 10); builder.add(Network::ServiceId, Network::SystemServiceId); if (outgoingConnection) // outgoing connections ping builder.add(Network::Ping, true); else builder.add(Network::Pong, true); builder.add(Network::HeaderEnd, true); return builder.message(); } } NetworkManager::NetworkManager(boost::asio::io_context &context) : d(std::make_shared(context)) { } NetworkManager::~NetworkManager() { std::lock_guard lock(d->mutex); d->isClosingDown = true; for (auto server : d->servers) { server->shutdown(); } for (auto it = d->connections.begin(); it != d->connections.end(); ++it) { it->second->shutdown(); } d->connections.clear(); // invalidate NetworkConnection references for (auto service : d->services) { service->setManager(nullptr); } d->services.clear(); d->unusedConnections.clear(); } NetworkConnection NetworkManager::connection(const EndPoint &remote, ConnectionEnum connect) { const bool hasHostname = (!remote.ipAddress.is_unspecified() || !remote.hostname.empty()) && remote.announcePort > 0; if (hasHostname) { std::lock_guard lock(d->mutex); for (auto iter1 = d->connections.begin(); iter1 != d->connections.end(); ++iter1) { const EndPoint &ep = iter1->second->endPoint(); if (!remote.hostname.empty() && ep.hostname != remote.hostname) continue; if (!remote.ipAddress.is_unspecified() && ep.ipAddress != remote.ipAddress) continue; if (!(ep.announcePort == 0 || ep.announcePort == remote.announcePort || remote.announcePort == 0)) continue; if (ep.encrypted != remote.encrypted) continue; return NetworkConnection(this, iter1->first); } if (connect == AutoCreate) { EndPoint ep(remote); if (ep.ipAddress.is_unspecified()) // try to see if hostname is an IP. If so, bypass DNS lookup try { ep.ipAddress = boost::asio::ip::make_address(ep.hostname); } catch (...) {} ep.peerPort = ep.announcePort; // outgoing connections always have those the same. ep.connectionId = ++d->lastConnectionId; if (d->unusedConnections.empty()) { d->connections.insert(std::make_pair(ep.connectionId, std::make_shared(d, ep))); } else { auto con = d->unusedConnections.front(); d->unusedConnections.pop_front(); con->setEndPoint(ep); d->connections.insert(std::make_pair(ep.connectionId, con)); } return NetworkConnection(this, ep.connectionId); } } return NetworkConnection(); } std::list NetworkManager::connectionsFrom(boost::asio::ip::address ipAddress) { assert(!ipAddress.is_unspecified()); // an 'unspecified' address is a joker like 0.0.0.0 std::list answer; std::lock_guard lock(d->mutex); for (auto iter = d->connections.begin(); iter != d->connections.end(); ++iter) { const EndPoint &ep = iter->second->endPoint(); if (ep.ipAddress == ipAddress) { answer.push_back(NetworkConnection(this, iter->first)); } } return answer; } EndPoint NetworkManager::endPoint(int remoteId) const { std::lock_guard lock(d->mutex); auto i = d->connections.find(remoteId); if (i == d->connections.end()) return EndPoint(); NetworkManagerConnection *con = i->second.get(); assert(con); return con->endPoint(); } void NetworkManager::punishNode(int remoteId, int punishment) { d->punishNode(remoteId, punishment); } void NetworkManager::bind(const tcp::endpoint &endpoint, const std::function &callback) { d->bind(NetworkManagerPrivate::PlainSocket, d, endpoint, callback); } void NetworkManager::bind(const tcp::endpoint &endpoint) { d->bind(NetworkManagerPrivate::PlainSocket, d, endpoint, std::bind(&NetworkManagerPrivate::alwaysConnectingNewConnectionHandler, d, std::placeholders::_1)); } void NetworkManager::bindSsl(const boost::asio::ip::tcp::endpoint &endpoint, const Streaming::ConstBuffer &certificateChain, const Streaming::ConstBuffer &privateKey, const Streaming::ConstBuffer &dhTemp, const std::function &callback) { #ifdef FLOWEE_NET_SSL assert(!certificateChain.isEmpty()); assert(!privateKey.isEmpty()); auto *server = d->bind(NetworkManagerPrivate::SSLSocket, d, endpoint, callback); assert(server); auto sslServer = dynamic_cast(server); assert(sslServer); sslServer->setCertificateChain(certificateChain); sslServer->setPrivateKey(privateKey); if (!dhTemp.isEmpty()) sslServer->setDHTemp(dhTemp); sslServer->start(); #endif } void NetworkManager::addService(NetworkServiceBase *service) { assert(service); if (!service) return; std::lock_guard lock(d->mutex); d->services.push_back(service); service->setManager(this); } void NetworkManager::removeService(NetworkServiceBase *service) { assert(service); if (!service) return; std::lock_guard lock(d->mutex); d->services.remove(service); service->setManager(nullptr); } void NetworkManager::setMessageIdLookup(const std::map &table) { d->messageIds = table; d->messageIdsReverse.clear(); for (auto iter = table.begin(); iter != table.end(); ++iter) { d->messageIdsReverse.insert(std::make_pair(iter->second, iter->first)); } } void NetworkManager::setLegacyNetworkId(const std::vector &magic) { assert(magic.size() == 4); for (size_t i = 0; i < 4; ++i) d->networkId[i] = magic[i]; } std::weak_ptr NetworkManager::priv() { return d; } ///////////////////////////////////// NetworkManagerPrivate::NetworkManagerPrivate(boost::asio::io_context &context) : ioContext(context), lastConnectionId(0), isClosingDown(false), m_cronHourly(context), m_maxIncomingMessageSize(DEFAULT_MAX_INCOMING_MESSAGE_SIZE) { } NetworkManagerPrivate::~NetworkManagerPrivate() { m_cronHourly.cancel(); } void NetworkManagerPrivate::punishNode(int connectionId, int punishScore) { std::lock_guard lock(mutex); auto con = connections.find(connectionId); if (con == connections.end()) return; con->second->m_punishment += punishScore; if (con->second->m_punishment >= 1000) { BannedNode bn; bn.endPoint = con->second->endPoint(); bn.banTimeout = boost::posix_time::second_clock::universal_time() + boost::posix_time::hours(24); logInfo(Log::NWM) << "Banned node for 24 hours due to excessive bad behavior" << bn.endPoint.hostname; banned.push_back(bn); connections.erase(connectionId); con->second->shutdown(); } } void NetworkManagerPrivate::cronHourly(const boost::system::error_code &error) { if (error) return; logDebug(Log::NWM) << "cronHourly"; std::lock_guard lock(mutex); if (isClosingDown) return; const auto now = boost::posix_time::second_clock::universal_time(); std::list::iterator bannedNode = banned.begin(); // clean out banned nodes while (bannedNode != banned.end()) { if (bannedNode->banTimeout < now) bannedNode = banned.erase(bannedNode); else ++bannedNode; } for (const auto &connection : connections) { connection.second->m_punishment = std::max(0, connection.second->m_punishment - 100); // logDebug(Log::NWM) << "peer ban scrore;" << connection.second->m_punishment; } m_cronHourly.expires_after(boost::asio::chrono::hours(1)); m_cronHourly.async_wait(std::bind(&NetworkManagerPrivate::cronHourly, this, std::placeholders::_1)); } NetworkManagerServerBase* NetworkManagerPrivate::bind(ListenType type, std::shared_ptr me, const boost::asio::ip::tcp::endpoint &endpoint, const std::function &callback) { std::lock_guard lock(mutex); try { NetworkManagerServerBase *server; if (type == PlainSocket) server = new NetworkManagerServerSimple(me, endpoint, callback); #ifdef FLOWEE_NET_SSL else if (type == SSLSocket) server = new NetworkManagerServerSSL(me, endpoint, callback); #endif else throw std::runtime_error(""); // invalid enum value servers.push_back(server); } catch (std::exception &ex) { logWarning(Log::NWM) << "Creating NetworkMangerServer failed with" << ex.what(); throw std::runtime_error("Failed to bind to endpoint"); } if (servers.size() == 1) // start cron cronHourly(boost::system::error_code()); return servers.back(); } ///////////////////////////////////// NetworkManagerConnection::NetworkManagerConnection(const std::shared_ptr &parent, NetworkManagerSocketProxyBase *socketProxy, int connectionId) : m_strand(parent->ioContext), d(parent), m_socketProxy(socketProxy), // we take ownership m_resolver(parent->ioContext), m_lastCallbackId(1), m_isClosingDown(false), m_isConnected(true), m_reconnectDelay(parent->ioContext), m_pingTimer(parent->ioContext), m_sendTimer(parent->ioContext), m_chunkedMessageBuffer(0) { assert(m_socketProxy); m_socketProxy->q = this; const auto ep = m_socketProxy->remote_endpoint(); m_remote.ipAddress = ep.address(); m_remote.announcePort = ep.port(); m_remote.hostname = m_remote.ipAddress.to_string(); m_remote.peerPort = 0; m_remote.connectionId = connectionId; m_remote.encrypted = m_socketProxy->isEncrypted(); } NetworkManagerConnection::NetworkManagerConnection(const std::shared_ptr &parent, const EndPoint &remote) : m_strand(parent->ioContext), d(parent), m_remote(remote), m_resolver(parent->ioContext), m_receiveStream(RECEIVE_STREAM_SIZE), m_lastCallbackId(1), m_isClosingDown(false), m_isConnected(false), m_reconnectDelay(parent->ioContext), m_pingTimer(parent->ioContext), m_sendTimer(parent->ioContext) { if (m_remote.peerPort == 0) m_remote.peerPort = m_remote.announcePort; #ifdef FLOWEE_NET_SSL if (remote.encrypted) m_socketProxy = new NetworkManagerSSLSocketProxy(parent->ioContext); else #endif m_socketProxy = new NetworkManagerBasicSocketProxy(parent->ioContext); m_socketProxy->q = this; } NetworkManagerConnection::~NetworkManagerConnection() { delete m_socketProxy; } void NetworkManagerConnection::connect() { m_isClosingDown.store(false); runOnStrand(std::bind(&NetworkManagerConnection::connect_priv, shared_from_this())); } void NetworkManagerConnection::connect_priv() { assert(m_strand.running_in_this_thread()); assert(m_remote.announcePort == m_remote.peerPort); // its an outgoing connection if (m_isConnecting) return; if (m_isClosingDown) return; if (m_isConnected) return; m_isConnecting = true; allocateBuffers(); if (m_remote.ipAddress.is_unspecified()) { auto handler = boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::onAddressResolveComplete, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); m_resolver.async_resolve(m_remote.hostname, boost::lexical_cast(m_remote.announcePort), handler); } else { if (m_remote.hostname.empty()) m_remote.hostname = m_remote.ipAddress.to_string(); boost::asio::ip::tcp::endpoint endpoint(m_remote.ipAddress, m_remote.announcePort); std::lock_guard lock(d->connectionMutex); m_socketProxy->newConnection(endpoint, shared_from_this(), boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::onConnectComplete, shared_from_this(), std::placeholders::_1))); } } void NetworkManagerConnection::onAddressResolveComplete(const boost::system::error_code &error, boost::asio::ip::tcp::resolver::results_type results) { if (m_isClosingDown) return; if (error) { logWarning(Log::NWM).nospace() << "connect[" << m_remote << "] " << error.message() << " (" << error.value() << ")"; m_isConnecting = false; m_reconnectDelay.expires_after(boost::asio::chrono::seconds(45)); m_reconnectDelay.async_wait(boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::reconnectWithCheck, shared_from_this(), std::placeholders::_1))); errorDetected(error); return; } assert(m_strand.running_in_this_thread()); // Notice that we always only use the first reported DNS entry. Which is likely Ok. auto iter = results.cbegin(); if (iter != results.cend()) { m_remote.ipAddress = iter->endpoint().address(); logInfo(Log::NWM) << m_remote.connectionId << "Outgoing connection to" << m_remote.hostname << "resolved to:" << m_remote.ipAddress.to_string(); std::lock_guard lock(d->connectionMutex); m_socketProxy->newConnection(iter->endpoint(), shared_from_this(), boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::onConnectComplete, shared_from_this(), std::placeholders::_1))); } else { logWarning(Log::NWM) << "DNS resolve failed, no results for" << m_remote.hostname; m_isConnecting = false; } } void NetworkManagerConnection::onConnectComplete(const boost::system::error_code& error) { m_isConnecting = false; if (m_isClosingDown) return; if (m_isConnected) return; if (error) { if (error == boost::asio::error::operation_aborted) return; logWarning(Log::NWM) << "connect to" << m_remote << "error:" << error.message(); if (m_remote.peerPort != m_remote.announcePort) // incoming connection return; m_reconnectDelay.expires_after(boost::asio::chrono::seconds(reconnectTimeoutForStep(m_reconnectStep++))); if (m_reconnectStep < 0) // if after not connecting for almost 17 days, we wrap around... m_reconnectStep = 20; m_reconnectDelay.async_wait(boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::reconnectWithCheck,shared_from_this(), std::placeholders::_1))); errorDetected(error); return; } m_isConnected = true; assert(m_strand.running_in_this_thread()); logInfo(Log::NWM) << m_remote.connectionId << "Successfully made TCP connection to" << m_remote.hostname.c_str() << m_remote.announcePort; // send login message if (m_loginMessageCreator) { auto message = m_loginMessageCreator(); std::vector socketQueue; if (!message.hasHeader()) { const Streaming::ConstBuffer constBuf = createHeader(message); socketQueue.push_back(constBuf); // keep a reference to this one until sending is completed. m_sendQHeaders->append(constBuf); } auto data = message.rawData(); m_sendQHeaders->append(data); // keep refcount up socketQueue.push_back(data); m_socketProxy->async_write(socketQueue, std::bind(&NetworkManagerConnection::sentSomeBytes, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } for (auto it = m_onConnectedCallbacks.begin(); it != m_onConnectedCallbacks.end(); ++it) { try { it->second(m_remote); } catch (const std::exception &ex) { logWarning(Log::NWM) << "onConnected threw exception, ignoring:" << ex.what(); } } if (! m_loginMessageCreator) { runMessageQueue(); } requestMoreBytes(); // setup a callback for receiving. // for outgoing connections, ping. if (m_messageHeaderType == FloweeNative) { m_lastPong = boost::posix_time::second_clock::universal_time(); // ping every 30 seconds while the remote will disconnect after 90 seconds of no pings. m_pingTimer.expires_after(boost::asio::chrono::seconds(30)); m_pingTimer.async_wait(boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::sendPing, shared_from_this(), std::placeholders::_1))); } } Streaming::ConstBuffer NetworkManagerConnection::createHeader(const Message &message) { assert(message.serviceId() >= 0); if (message.serviceId() == Api::LegacyP2P) { const auto body = message.body(); auto sendHelperBuffer = pool(4 + 12 + 4 + 4); memcpy(sendHelperBuffer->data(), d->networkId, 4); sendHelperBuffer->markUsed(4); auto m = d->messageIds.find(message.messageId()); std::string messageId; if (m != d->messageIds.end()) messageId = m->second; else logCritical() << m_remote.connectionId << "createHeader[legacy]: P2P message Id unknown:" << message.messageId(); assert(messageId.size() <= 12); memcpy(sendHelperBuffer->data(), messageId.c_str(), messageId.size()); for (size_t i = messageId.size(); i < 12; ++i) { // rest of version is zero-filled sendHelperBuffer->data()[i] = 0; } sendHelperBuffer->markUsed(12); const uint32_t messageSize = body.size(); WriteLE32(reinterpret_cast(sendHelperBuffer->data()), messageSize); sendHelperBuffer->markUsed(4); uint256 hash = Hash(body.begin(), body.end()); unsigned int checksum = 0; memcpy(&checksum, &hash, 4); WriteLE32(reinterpret_cast(sendHelperBuffer->data()), checksum); return sendHelperBuffer->commit(4); } else { const auto map = message.headerData(); auto sendHelperBuffer = pool(10 * static_cast(map.size()) + 1); Streaming::MessageBuilder builder(sendHelperBuffer, Streaming::HeaderOnly); auto iter = map.begin(); while (iter != map.end()) { assert(iter->first >= 0); builder.add(static_cast(iter->first), iter->second); ++iter; } builder.add(Network::HeaderEnd, true); assert(sendHelperBuffer->size() + message.size() < MAX_MESSAGE_SIZE); builder.setMessageSize(sendHelperBuffer->size() + message.size()); logDebug(Log::NWM) << m_remote.connectionId << "createHeader of message of length;" << sendHelperBuffer->size() << '+' << message.size(); return builder.buffer(); } } void NetworkManagerConnection::errorDetected(const boost::system::error_code &error) { m_isConnecting = false; if (error == boost::asio::error::operation_aborted || !error) // no need to push those up the stack return; std::vector > callbacks; callbacks.reserve(m_onErrorCallbacks.size()); for (auto it = m_onErrorCallbacks.begin(); it != m_onErrorCallbacks.end(); ++it) { callbacks.push_back(it->second); } for (const auto &callback : callbacks) { try { callback(m_remote.connectionId, error); } catch (const std::exception &e) { logCritical(Log::NWM) << "Callback 'onError' threw with" << e; } } } void NetworkManagerConnection::setCertificate(const Streaming::ConstBuffer &cert) { #ifdef FLOWEE_NET_SSL assert(m_remote.encrypted); assert(m_socketProxy); auto *ssl = dynamic_cast(m_socketProxy); assert(ssl); ssl->setClientCertificate(cert); #endif } void NetworkManagerConnection::runMessageQueue() { assert(m_strand.running_in_this_thread()); if (m_sendingInProgress || (m_messageQueue->isRead() && m_priorityMessageQueue->isRead()) || !isConnected()) return; m_sendingInProgress = true; /* * This method will schedule sending of data. * The data to send is pushed async to the network stack and the callback will come in essentially * the moment the network stack has accepted the data. This is not at all any confirmation that * the other side accepted it! * But at the same time, the network stack has limited buffers and will only push to the network * an amount based on the TCP window size. So at minimum we know that the speed with which we * send stuff is indicative of the throughput. * * The idea here is to send a maximum amount of 250KB at a time. Which should be enough to avoid * delays. The speed limiter here mean we still allow messages that were pushed to the front of the * queue to be handled at a good speed. */ int bytesLeft = 250*1024; std::vector socketQueue; // the stuff we will send over the socket while (m_priorityMessageQueue->hasUnread()) { const Message &message = m_priorityMessageQueue->unreadTip(); if (m_sendQHeaders->isFull()) break; int headerSize; if (message.hasHeader()) { headerSize = message.header().size(); } else { // build a simple header const Streaming::ConstBuffer constBuf = createHeader(message); headerSize = constBuf.size(); bytesLeft -= headerSize; socketQueue.push_back(constBuf); m_sendQHeaders->append(constBuf); } assert(message.body().size() + headerSize < MAX_MESSAGE_SIZE); socketQueue.push_back(message.rawData()); bytesLeft -= message.rawData().size(); m_priorityMessageQueue->markRead(); if (bytesLeft <= 0) break; } while (m_messageQueue->hasUnread()) { if (bytesLeft <= 0) break; if (m_sendQHeaders->isFull()) break; const Message &message = m_messageQueue->unreadTip(); if (message.rawData().size() > CHUNK_SIZE && message.serviceId() != Api::LegacyP2P) { assert(!message.hasHeader()); // should have been blocked from entering in queueMessage(); /* * The maximum size of a message is 9KB. This helps a lot with memory allocations and zero-copy ;) * A large message is then split into smaller ones and send with individual headers * to the other side where they can be re-connected. */ Streaming::ConstBuffer body(message.body()); const char *begin = body.begin() + m_messageBytesSend; const char *end = body.end(); Streaming::ConstBuffer chunkHeader;// the first and last are different, but all the ones in the middle are duplicates. bool first = m_messageBytesSend == 0; while (begin < end) { const char *p = begin + CHUNK_SIZE; if (p > end) p = end; m_messageBytesSend += p - begin; Streaming::ConstBuffer bodyChunk(body.internal_buffer(), begin, p); begin = p; Streaming::ConstBuffer header; if (first || begin == end || !chunkHeader.isValid()) { const auto headerData = message.headerData(); auto sendHelperBuffer = pool(20 + 8 * headerData.size()); Streaming::MessageBuilder headerBuilder(sendHelperBuffer, Streaming::HeaderOnly); headerBuilder.add(Network::ServiceId, message.serviceId()); if (first) { for (auto iter = headerData.begin(); iter != headerData.end(); ++iter) { if (iter->first == Network::ServiceId) // forced to be first. continue; headerBuilder.add(iter->first, iter->second); } headerBuilder.add(Network::SequenceStart, body.size()); } else if (message.messageId() >= 0) { headerBuilder.add(Network::MessageId, message.messageId()); } headerBuilder.add(Network::LastInSequence, (begin == end)); headerBuilder.add(Network::HeaderEnd, true); assert(sendHelperBuffer->size() + bodyChunk.size() < MAX_MESSAGE_SIZE); headerBuilder.setMessageSize(sendHelperBuffer->size() + bodyChunk.size()); header = headerBuilder.buffer(); if (!first) chunkHeader = header; first = false; } else { header = chunkHeader; } bytesLeft -= header.size(); socketQueue.push_back(header); m_sendQHeaders->append(header); socketQueue.push_back(bodyChunk); bytesLeft -= bodyChunk.size(); if (bytesLeft <= 0) break; } if (begin >= end) { // done with message. m_messageBytesSend = 0; m_messageQueue->markRead(); } } else { if (!message.hasHeader()) { // build a simple header const Streaming::ConstBuffer constBuf = createHeader(message); bytesLeft -= constBuf.size(); socketQueue.push_back(constBuf); m_sendQHeaders->append(constBuf); } socketQueue.push_back(message.rawData()); bytesLeft -= message.rawData().size(); m_messageQueue->markRead(); } } assert(m_messageBytesSend >= 0); m_socketProxy->async_write(socketQueue, std::bind(&NetworkManagerConnection::sentSomeBytes, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } void NetworkManagerConnection::sentSomeBytes(const boost::system::error_code& error, std::size_t bytes_transferred) { if (m_isClosingDown) return; m_sendingInProgress = false; if (error) { logWarning(Log::NWM) << "send received error" << error.message(); m_messageBytesSend = 0; m_sendQHeaders->clear(); m_messageQueue->markAllUnread(); m_priorityMessageQueue->markAllUnread(); runOnStrand(std::bind(&NetworkManagerConnection::connect, shared_from_this())); return; } assert(m_strand.running_in_this_thread()); if (!m_socketProxy->is_open()) return; logDebug(Log::NWM) << m_remote.connectionId << "Managed to send" << bytes_transferred << "bytes"; m_reconnectStep = 0; m_messageQueue->removeAllRead(); m_priorityMessageQueue->removeAllRead(); m_sendQHeaders->clear(); runMessageQueue(); // if we interrupted the received-message-processing, resume that now. if (m_receiveStream.size() > 4) { const unsigned int rawHeader = *(reinterpret_cast(m_receiveStream.begin())); const int packetLength = (rawHeader & 0xFFFF); if (packetLength <= m_receiveStream.size()) { logDebug(Log::NWM) << m_remote.connectionId << "Resuming processing. ReceiveStream-size:" << m_receiveStream.size() << "holds packet:" << packetLength << "Message Queue now:" << m_messageQueue->size(); receivedSomeBytes(boost::system::error_code(), 0); } } } void NetworkManagerConnection::receivedSomeBytes(const boost::system::error_code& error, std::size_t bytes_transferred) { if (m_isClosingDown) return; if (error || m_receiveStream.begin() == nullptr) { logDebug(Log::NWM) << "receivedSomeBytes errored:" << error.message() << "connectionId:" << m_remote.connectionId; close(); return; } assert(m_strand.running_in_this_thread()); assert(static_cast(bytes_transferred) <= m_receiveStream.capacity()); m_receiveStream.markUsed(static_cast(bytes_transferred)); // move write pointer while (true) { // get all packets out const size_t blockSize = static_cast(m_receiveStream.size()); if (blockSize < 4) // need more data break; // Check ring buffer capacity and send if low. if (m_messageQueue->size() > m_forceSendLimit) { logDebug(Log::NWM) << m_remote.connectionId << "Waiting with the processing of receive, too much outgoing queued"; logDebug(Log::NWM) << " + Leaving" << m_receiveStream.size() << "bytes for later processing"; runMessageQueue(); return; } Streaming::ConstBuffer data = m_receiveStream.createBufferSlice(m_receiveStream.begin(), m_receiveStream.end()); if (m_firstPacket) { m_firstPacket = false; if (m_messageHeaderType == FloweeNative) { if (data.begin()[2] != 8) { // Positive integer (0) and Network::ServiceId (1 << 3) logWarning(Log::NWM) << m_remote.connectionId << "receive; Data error from remote - this is NOT an NWM peer. Disconnecting" << m_remote.hostname; disconnect(); return; } } else { assert(m_messageHeaderType == LegacyP2P); bool ok = true; for (size_t i = 0; ok && i < 4; ++i) ok = static_cast(data.begin()[i]) == d->networkId[i]; if (!ok) { logWarning(Log::NWM) << m_remote.connectionId << "receive; Data error from remote - this is NOT an P2P server. Disconnecting" << m_remote.hostname; disconnect(); return; } } } if (m_messageHeaderType == LegacyP2P) { if (data.size() < LEGACY_HEADER_SIZE) // wait for entire header break; const uint32_t bodyLength = ReadLE32(reinterpret_cast(data.begin() + 16)); if (bodyLength > 32000000) { logWarning(Log::NWM).nospace() << "receive; Data error from server - stream is corrupt (" << "bl=" << bodyLength << ")"; close(false); return; } if (data.size() < LEGACY_HEADER_SIZE + static_cast(bodyLength)) // do we have all data for this one? break; if (!processLegacyPacket(m_receiveStream.internal_buffer(), data.begin())) return; m_receiveStream.forget(bodyLength + LEGACY_HEADER_SIZE); } else { const unsigned int rawHeader = *(reinterpret_cast(data.begin())); const int packetLength = (rawHeader & 0xFFFF); logDebug(Log::NWM) << m_remote.connectionId << "Processing incoming packet. Size" << packetLength; if (packetLength < 2 || packetLength > MAX_MESSAGE_SIZE) { logWarning(Log::NWM).nospace() << m_remote.connectionId << "receive; Data error from server - stream is corrupt (" << "pl=" << packetLength << ")"; close(); return; } if (data.size() < packetLength) // do we have all data for this one? break; if (!processPacket(m_receiveStream.internal_buffer(), data.begin())) return; m_receiveStream.forget(packetLength); } } requestMoreBytes_callback(boost::system::error_code()); } /* * when we generate more messages than can be sent, we start throttling the incoming * message flow. The basic thought is that more incoming messages means more outgoing * messages will be generated. * As such it makes sense to start slowing down what we sent in order to avoid memory buffers * for send-queues growing out of proportion. */ void NetworkManagerConnection::requestMoreBytes_callback(const boost::system::error_code &error) { if (error) return; const int backlog = m_messageQueue->size() + m_priorityMessageQueue->size(); if (backlog < m_throttleReceiveAtSendLimitL1) requestMoreBytes(); else if (isConnected()) { int wait = 2; if (backlog > m_throttleReceiveAtSendLimitL3) wait = 30; else if (backlog > m_throttleReceiveAtSendLimitL2) wait = 10; m_sendTimer.expires_after(boost::asio::chrono::milliseconds(wait)); m_sendTimer.async_wait(boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::requestMoreBytes_callback, shared_from_this(), std::placeholders::_1))); runMessageQueue(); } } void NetworkManagerConnection::requestMoreBytes() { m_receiveStream.reserve(MAX_MESSAGE_SIZE); assert(m_receiveStream.capacity() > 0); m_socketProxy->async_receive(m_receiveStream, std::bind(&NetworkManagerConnection::receivedSomeBytes, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } bool NetworkManagerConnection::processPacket(const std::shared_ptr &buffer, const char *data) { assert(m_strand.running_in_this_thread()); const unsigned int rawHeader = *(reinterpret_cast(data)); // packet length is found in the first 2 bytes of the message header // (see MessageBuilder::setMessageSize()) // the packet size is the message header plus the message body. const int packetLength = (rawHeader & 0xFFFF); logDebug(Log::NWM) << m_remote.connectionId << "Receive packet length" << packetLength; const char *messageStart = data + 2; const char *messageEnd = data + packetLength; Streaming::MessageParser parser(Streaming::ConstBuffer(buffer, messageStart, messageEnd)); Streaming::ParsedType type = parser.next(); int headerSize = 0; int messageId = -1; int serviceId = -1; int lastInSequence = -1; int sequenceSize = -1; bool isPing = false; std::map messageHeaderData; bool inHeader = true; while (inHeader && type == Streaming::FoundTag) { switch (parser.tag()) { case Network::HeaderEnd: headerSize = parser.consumed(); inHeader = false; break; case Network::MessageId: if (!parser.isInt()) { close(); return false; } messageId = parser.intData(); break; case Network::ServiceId: if (!parser.isInt()) { close(); return false; } serviceId = parser.intData(); break; case Network::LastInSequence: if (!parser.isBool()) { close(); return false; } lastInSequence = parser.boolData() ? 1 : 0; break; case Network::SequenceStart: if (!parser.isInt()) { close(); return false; } sequenceSize = parser.intData(); if (sequenceSize < CHUNK_SIZE) { logWarning(Log::NWM) << "Unneeded sequence. Size:" << sequenceSize << "<" << CHUNK_SIZE; close(); return false; } if (sequenceSize > d->m_maxIncomingMessageSize) { logWarning(Log::NWM) << "Sequence limit exceeded:" << sequenceSize << ">" << d->m_maxIncomingMessageSize; close(); return false; } break; case Network::Ping: isPing = true; break; case Network::Pong: m_lastPong = boost::posix_time::second_clock::universal_time(); logDebug() << "got a pong"; break; default: if (parser.isInt() && parser.tag() < 0xFFFFFF) { if (parser.tag() < 10) { // illegal header tag for users. logInfo(Log::NWM) << " header uses illegal tag. Malformed: re-connecting"; close(); return false; } messageHeaderData.insert(std::make_pair(static_cast(parser.tag()), parser.intData())); } break; } type = parser.next(); } if (inHeader) { logInfo(Log::NWM) << " header malformed, re-connecting"; close(); return false; } if (headerSize + 2 > packetLength) { // We skipped the message-length 2 bytes logInfo(Log::NWM) << " header extends past packet, re-connecting"; close(); return false; } if (serviceId == -1) { // an obligatory field logWarning(Log::NWM) << "peer sent message without serviceId"; close(); return false; } if (serviceId == Network::SystemServiceId) { // Handle System level messages if (isPing) { if (m_remote.peerPort == m_remote.announcePort) { // we should never get pings from a remote when we initiated the connection. disconnect(); return false; } m_pingTimer.cancel(); if (!m_priorityMessageQueue->isFull()) { // send pong. queueMessage(m_pingMessage, NetworkConnection::HighPriority); m_pingTimer.expires_after(boost::asio::chrono::seconds(90)); m_pingTimer.async_wait( boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::pingTimeout, shared_from_this(), std::placeholders::_1))); } } return true; } Message message; // we assume they are in sequence (which is Ok with TCP sockets), but we don't assume that // each packet is part of the sequence. if (lastInSequence != -1) { if (sequenceSize != -1) { if (m_chunkedMessageId != -1 || m_chunkedServiceId != -1) { // Didn't finish another. Thats illegal. logWarning(Log::NWM) << "peer sent sequenced message with wrong combination of headers"; close(); return false; } m_chunkedMessageId = messageId; m_chunkedServiceId = serviceId; m_chunkedMessageBuffer = Streaming::BufferPool(sequenceSize); m_chunkedHeaderData = messageHeaderData; } else if (m_chunkedMessageId != messageId || m_chunkedServiceId != serviceId) { // Changed. Thats illegal. close(); logWarning(Log::NWM) << "peer sent sequenced message with inconsistent service/messageId"; return false; } const int bodyLength = packetLength - headerSize- 2; if (m_chunkedMessageBuffer.capacity() < bodyLength) { logWarning(Log::NWM) << "peer sent sequenced message with too much data"; return false; } logDebug(Log::NWM) << m_remote.connectionId << "Message received as part of sequence; last:" << lastInSequence << "total-size:" << sequenceSize; std::copy(data + headerSize + 2, data + packetLength, m_chunkedMessageBuffer.data()); m_chunkedMessageBuffer.markUsed(bodyLength); if (lastInSequence == 0) return true; message = Message(m_chunkedMessageBuffer.commit(), m_chunkedServiceId); messageHeaderData = m_chunkedHeaderData; m_chunkedMessageId = -1; m_chunkedServiceId = -1; m_chunkedMessageBuffer.clear(); } else { message = Message(buffer, data + 2, data + 2 + headerSize, data + packetLength); } message.setMessageId(messageId); message.setServiceId(serviceId); for (auto iter = messageHeaderData.begin(); iter != messageHeaderData.end(); ++iter) { message.setHeaderInt(iter->first, iter->second); } message.remote = m_remote.connectionId; // first copy to avoid problems if a callback removes its callback or closes the connection. std::vector > callbacks; callbacks.reserve(m_onIncomingMessageCallbacks.size()); for (auto it = m_onIncomingMessageCallbacks.begin(); it != m_onIncomingMessageCallbacks.end(); ++it) { if (!m_firstMessageIsForAcceptConnection || 1 == it->first) // the first NetworkConnection always has callback id 1 callbacks.push_back(it->second); } for (const auto &callback : callbacks) { try { callback(message); } catch (const NetworkQueueFullError &e) { logDebug(Log::NWM) << "connection::onIncomingMessage tried to send, but failed (and didn't catch exception) dropping message" << e; } catch (const std::exception &ex) { logWarning(Log::NWM) << "connection::onIncomingMessage threw exception, ignoring:" << ex; } if (!m_socketProxy->is_open()) break; } std::list servicesCopy; if (m_firstMessageIsForAcceptConnection) { // first message has thus been handled. m_firstMessageIsForAcceptConnection = false; } else { // Services don't work on connection, they don't handle handshake-messages std::lock_guard lock(d->mutex); servicesCopy = d->services; } for (auto service : servicesCopy) { if (!m_socketProxy->is_open()) break; if (service->id() == serviceId) { try { service->onIncomingMessage(message, m_remote); } catch (const std::exception &ex) { logWarning(Log::NWM).nospace() << "service::onIncomingMessage (" << service->id() << ") threw exception, ignoring: " << ex; } } } return m_socketProxy->is_open(); // if the user called disconnect, then stop processing packages } bool NetworkManagerConnection::processLegacyPacket(const std::shared_ptr &buffer, const char *data) { assert(m_strand.running_in_this_thread()); const int bodyLength = ReadLE32(reinterpret_cast(data + 16)); logDebug(Log::NWM) << "Receive legacy-packet Body-length:" << bodyLength; const char *body = data + LEGACY_HEADER_SIZE; const uint32_t expectedChecksum = ReadLE32(reinterpret_cast(data + 20)); const uint256 hash = Hash(body, body + bodyLength); uint32_t actualChecksum = 0; memcpy(&actualChecksum, &hash, sizeof(actualChecksum)); if (actualChecksum != expectedChecksum) { logWarning(Log::NWM) << "Incoming legacy message has invalid checksum"; close(false); return false; } char buf[13]; memcpy(buf, data + 4, 12); buf[12] = 0; auto m = d->messageIdsReverse.find(std::string(buf)); if (m == d->messageIdsReverse.end()) { char readableBuf[12]; // sanitized copy for (int i = 0; i < 12; ++i) { readableBuf[i] = data[4 + i]; if (readableBuf[i] == 0) // replace zeros with spaces readableBuf[i] = ' '; } logWarning(Log::NWM) << "Incoming message has unknown type:" << std::string(readableBuf, 12); return true; // skip } Message message(buffer, data, data + LEGACY_HEADER_SIZE, data + LEGACY_HEADER_SIZE + bodyLength); message.setMessageId(m->second); message.setServiceId(Api::LegacyP2P); message.remote = m_remote.connectionId; // first copy to avoid problems if a callback removes its callback or closes the connection. std::vector > callbacks; callbacks.reserve(m_onIncomingMessageCallbacks.size()); for (auto it = m_onIncomingMessageCallbacks.begin(); it != m_onIncomingMessageCallbacks.end(); ++it) { callbacks.push_back(it->second); } for (const auto &callback : callbacks) { try { callback(message); } catch (const NetworkQueueFullError &e) { logDebug(Log::NWM) << "connection::onIncomingMessage tried to send, but failed (and didn't catch exception) dropping message" << e; } catch (const std::exception &ex) { logWarning(Log::NWM) << "connection::onIncomingMessage (LegacyP2P) threw exception, ignoring:" << ex; } if (!m_socketProxy->is_open()) break; } return m_socketProxy->is_open(); // if the user called disconnect, then stop processing packages } void NetworkManagerConnection::addOnConnectedCallback(int id, std::function callback) { assert(m_strand.running_in_this_thread()); m_onConnectedCallbacks.insert(std::make_pair(id, callback)); } void NetworkManagerConnection::addOnDisconnectedCallback(int id, std::function callback) { assert(m_strand.running_in_this_thread()); m_onDisConnectedCallbacks.insert(std::make_pair(id, callback)); } void NetworkManagerConnection::addOnIncomingMessageCallback(int id, std::function callback) { assert(m_strand.running_in_this_thread()); m_onIncomingMessageCallbacks.insert(std::make_pair(id, callback)); } void NetworkManagerConnection::addOnError(int id, std::function callback) { assert(m_strand.running_in_this_thread()); m_onErrorCallbacks.insert(std::make_pair(id, callback)); } void NetworkManagerConnection::setLoginCreator(const std::function &creator) { assert(m_strand.running_in_this_thread()); m_loginMessageCreator = creator; } void NetworkManagerConnection::queueMessage(const Message &message, NetworkConnection::MessagePriority priority) { if (!message.hasHeader() && message.serviceId() == -1) throw NetworkException("queueMessage: Can't deliver a message with unset service ID"); if (message.hasHeader() && message.body().size() > CHUNK_SIZE) throw NetworkException("queueMessage: Can't send large message and can't auto-chunk because it already has a header"); if (priority != NetworkConnection::NormalPriority && message.rawData().size() > CHUNK_SIZE) throw NetworkException("queueMessage: Can't send large message in the priority queue"); // we have a chunk size of 8K and a max message size of 9K. The 1000 bytes is for headers and worse case is around // 10 bytes per item plus some extra stuff. So we reject any messages with more than 95 header items. if (message.headerData().size() > 95) throw NetworkException("queueMessage: Can't send message with too much header items"); if (m_strand.running_in_this_thread()) { allocateBuffers(); if (priority == NetworkConnection::NormalPriority) { if (m_messageQueue->isFull()) throw NetworkQueueFullError("MessageQueue full"); m_messageQueue->append(message); } else { if (m_priorityMessageQueue->isFull()) throw NetworkQueueFullError("PriorityMessageQueue full"); m_priorityMessageQueue->append(message); } if (isConnected()) runMessageQueue(); else if (m_reconnectStep == 0 && isOutgoing()) connect_priv(); } else { auto self = shared_from_this(); runOnStrand([self, message, priority]() { try { self->queueMessage(message, priority); } catch (const NetworkQueueFullError &e) { logWarning(Log::NWM) << "queueMessage(): dropping message." << e; } catch (const std::exception &e) { logDebug(Log::NWM) << "queueMessage on network strand threw, dropping message:" << e; } }); } } void NetworkManagerConnection::close(bool reconnect) { assert(m_strand.running_in_this_thread()); if (!isOutgoing()) { std::lock_guard lock(d->mutex); shutdown(); d->connections.erase(m_remote.connectionId); return; } if (!reconnect) m_isClosingDown = true; m_receiveStream.clear(); m_chunkedMessageBuffer.clear(); m_chunkedMessageId = -1; m_chunkedServiceId = -1; m_chunkedHeaderData.clear(); m_messageBytesSend = 0; m_reconnectDelay.cancel(); m_resolver.cancel(); m_sendQHeaders->clear(); if (m_isConnected) m_socketProxy->close(); m_pingTimer.cancel(); m_firstPacket = true; m_isConnecting = false; if (m_isConnected) { m_isConnected = false; callOnDisconnectedCallbacks(); } if (reconnect && !m_isClosingDown) { // auto reconnect. if (m_firstPacket) { // this means the network is there, someone is listening. They just don't speak our language. // slow down reconnect due to bad peer. m_reconnectDelay.expires_after(boost::asio::chrono::seconds(15)); m_reconnectDelay.async_wait(boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::reconnectWithCheck, shared_from_this(), std::placeholders::_1))); } else { connect_priv(); } } } void NetworkManagerConnection::sendPing(const boost::system::error_code &error) { if (error) return; logDebug(Log::NWM) << "ping"; if (m_isClosingDown) return; assert (m_messageHeaderType != LegacyP2P); assert(m_strand.running_in_this_thread()); if (!m_socketProxy->is_open()) return; // lets check we are getting pongs, as a TCP connection may have been closed without us being // let in on that. auto duration = boost::posix_time::second_clock::universal_time() - m_lastPong; if (duration.total_seconds() > 80) { logWarning(Log::NWM) << "Didn't receive a pong from peer for too long, disconnecting dead connection"; close(true); } int time = 30; if (m_priorityMessageQueue->isFull()) time = 2; // delay sending ping else queueMessage(m_pingMessage, NetworkConnection::HighPriority); m_pingTimer.expires_after(boost::asio::chrono::seconds(time)); m_pingTimer.async_wait(boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::sendPing, shared_from_this(), std::placeholders::_1))); } void NetworkManagerConnection::pingTimeout(const boost::system::error_code &error) { // note that this is only for incoming connections. if (!error) { logWarning(Log::NWM) << "Didn't receive a ping from peer for too long, disconnecting dead connection"; disconnect(); } } void NetworkManagerConnection::allocateBuffers() { if (m_messageQueue.get() == nullptr || m_messageQueue->reserved() != m_queueSizeMain) { m_messageQueue.reset(new RingBuffer(m_queueSizeMain)); m_priorityMessageQueue.reset(new RingBuffer(m_priorityQueueSize)); m_sendQHeaders.reset(new RingBuffer(m_queueSizeMain)); m_pingMessage = buildPingMessage(m_remote.peerPort == m_remote.announcePort); } } void NetworkManagerConnection::callOnDisconnectedCallbacks() { std::vector > callbacks; callbacks.reserve(m_onDisConnectedCallbacks.size()); for (auto it = m_onDisConnectedCallbacks.begin(); it != m_onDisConnectedCallbacks.end(); ++it) { callbacks.push_back(it->second); } for (const auto &callback : callbacks) { try { callback(m_remote); } catch (const std::exception &ex) { logInfo(Log::NWM) << "onDisconnected caused exception, ignoring:" << ex; } } } void NetworkManagerConnection::reconnectWithCheck(const boost::system::error_code& error) { if (!error) { if (m_isConnected || m_isConnecting) // no need return; m_socketProxy->close(); connect_priv(); } } int NetworkManagerConnection::nextCallbackId() { return m_lastCallbackId.fetch_add(1); } void NetworkManagerConnection::removeAllCallbacksFor(int id) { assert(m_strand.running_in_this_thread()); m_onConnectedCallbacks.erase(id); m_onDisConnectedCallbacks.erase(id); m_onIncomingMessageCallbacks.erase(id); m_onErrorCallbacks.erase(id); } void NetworkManagerConnection::shutdown() { if (!m_isClosingDown && m_isConnected) callOnDisconnectedCallbacks(); m_isClosingDown = true; if (m_strand.running_in_this_thread()) { m_onConnectedCallbacks.clear(); m_onDisConnectedCallbacks.clear(); m_onIncomingMessageCallbacks.clear(); m_onErrorCallbacks.clear(); if (isConnected()) m_socketProxy->close(); m_resolver.cancel(); m_reconnectDelay.cancel(); m_strand.post(std::bind(&NetworkManagerConnection::finalShutdown, shared_from_this()), std::allocator()); } else { m_strand.post(std::bind(&NetworkManagerConnection::shutdown, shared_from_this()), std::allocator()); } } void NetworkManagerConnection::accept(NetworkConnection::AcceptLimit limit) { if (m_acceptedConnection) return; m_acceptedConnection = true; if (limit != NetworkConnection::AcceptConnection) m_firstMessageIsForAcceptConnection = true; allocateBuffers(); if (m_remote.encrypted) { #ifdef FLOWEE_NET_SSL // first do the handshake, and only when that completes do we setup the callbacks for receiving auto sslproxy = dynamic_cast(m_socketProxy); assert(sslproxy); // we pass in 'connection' just to make sure that the async call keeps it, and thus the proxy, alive. sslproxy->doASyncHandshake(shared_from_this()); #endif } else { // just setup a callback for receiving. requestMoreBytes(); } // for incoming connections, take action when no ping comes in. // On initial connection after 150 seconds, later refreshes are 90 seconds. m_pingTimer.expires_after(boost::asio::chrono::seconds(150)); m_pingTimer.async_wait(boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::pingTimeout, shared_from_this(), std::placeholders::_1))); } void NetworkManagerConnection::recycleConnection() { assert(m_strand.running_in_this_thread()); m_onConnectedCallbacks.clear(); m_onDisConnectedCallbacks.clear(); m_onIncomingMessageCallbacks.clear(); m_onErrorCallbacks.clear(); setMessageQueueSizes(2000, 20); // set back to defaults. m_punishment = 0; close(false); std::lock_guard lock(d->mutex); // protects connections maps if (d->connections.erase(m_remote.connectionId)) d->unusedConnections.push_back(shared_from_this()); } void NetworkManagerConnection::runOnStrand(const std::function &function) { if (m_isClosingDown) return; m_strand.post(function, std::allocator()); } void NetworkManagerConnection::punish(int amount) { d->punishNode(m_remote.connectionId, amount); } void NetworkManagerConnection::setMessageQueueSizes(int main, int priority) { m_queueSizeMain = main; assert(main > 10); assert(priority > 2); m_priorityQueueSize = std::max(priority, 3); // guard against stupid users // Calculate the limits. We only really use 'main' here // These numbers may be tweaked with some more testing, if someone wants to put the time in. m_forceSendLimit = main / 8 * 3; m_throttleReceiveAtSendLimitL1 = main / 2; m_throttleReceiveAtSendLimitL2 = main / 4 * 3; m_throttleReceiveAtSendLimitL3 = main - (main / 20); } void NetworkManagerConnection::setMessageHeaderType(MessageHeaderType messageHeaderType) { if (m_messageHeaderType == messageHeaderType) return; m_messageHeaderType = messageHeaderType; switch (m_messageHeaderType) { case FloweeNative: if (isOutgoing()) { // ping every 30 seconds while the remote will disconnect after 90 seconds of no pings. m_pingTimer.expires_after(boost::asio::chrono::seconds(30)); m_pingTimer.async_wait(boost::asio::bind_executor(m_strand, std::bind(&NetworkManagerConnection::sendPing, shared_from_this(), std::placeholders::_1))); } break; case LegacyP2P: m_pingTimer.cancel(); break; default: assert(false); break; } } void NetworkManagerConnection::finalShutdown() { } NetworkManagerServerBase::NetworkManagerServerBase(const std::shared_ptr &parent, tcp::endpoint endpoint, const std::function &callback) : d(parent), m_acceptor(parent->ioContext, endpoint), m_endpoint(endpoint), onIncomingConnection(callback) { } void NetworkManagerServerBase::acceptConnection(tcp::socket && socket) { logDebug(Log::NWM) << "acceptTcpConnection"; std::shared_ptr priv = d.lock(); if (!priv.get()) return; std::lock_guard lock(priv->mutex); if (priv->isClosingDown) return; try { // catch ENOTCONN (Transport endpoint is not connected) which remote_endpoint() may throw const auto peerAddress = socket.remote_endpoint().address(); for (const BannedNode &bn : priv->banned) { if (bn.endPoint.ipAddress == peerAddress) { if (bn.banTimeout > boost::posix_time::second_clock::universal_time()) { // incoming connection is banned. logInfo(Log::NWM) << "acceptTcpConnection; closing incoming connection (banned)" << bn.endPoint.hostname; boost::system::error_code ec; socket.close(ec); // drop only this connection } return; } } const int conId = ++priv->lastConnectionId; logDebug(Log::NWM) << "acceptTcpConnection; creating new connection object" << conId; // Never do a setupCallback until we do a 'std::move' (or disconnect) to avoid an "Already open" error auto connection = createNewConnection(conId, std::move(socket)); priv->connections.insert(std::make_pair(conId, connection)); logDebug(Log::NWM) << "Total connections now;" << priv->connections.size(); NetworkConnection con(connection, conId); try { onIncomingConnection(con); } catch (const std::exception &e) { logCritical(Log::NWM) << "subsystem handling onIncomingConnection threw. Ignoring" << e; } // someone needs to call accept(), if they didn't we shall disconnect if (!connection->acceptedConnection()) connection->m_strand.post(std::bind(&NetworkManagerConnection::disconnect, connection), std::allocator()); } catch (const boost::system::system_error &e) { // remote_endpoint() etc. can throw if the peer vanished immediately logInfo(Log::NWM) << "acceptConnection: peer disappeared pre-handshake:" << e.code().value() << " cat:" << e.code().category().name() << " msg:" << e.code().message() << " what:" << e.what(); boost::system::error_code ec; socket.close(ec); // drop only this connection return; } catch (...) { logWarning(Log::NWM) << "acceptConnection: unexpected exception while handling new socket; keeping acceptor alive"; boost::system::error_code ec; socket.close(ec); // drop only this connection return; } } // ------ NetworkManagerServerSimple::NetworkManagerServerSimple(const std::shared_ptr &parent, tcp::endpoint endpoint, const std::function &callback) : NetworkManagerServerBase(parent, endpoint, callback), m_socket(parent->ioContext) { setupCallback(); } void NetworkManagerServerSimple::shutdown() { m_acceptor.close(); m_socket.close(); } void NetworkManagerServerSimple::setupCallback() { if (!m_acceptor.is_open()) { logCritical(Log::NWM) << "No longer accepting incoming connections on" << m_endpoint.address().to_string(); return; } m_acceptor.async_accept(m_socket, [this](const boost::system::error_code &error) { if (error.value() == boost::asio::error::operation_aborted) return; if (!error) acceptConnection(std::move(m_socket)); setupCallback(); }); } std::shared_ptr NetworkManagerServerSimple::createNewConnection(int connectionId, tcp::socket && socket) { std::shared_ptr priv = d.lock(); assert (priv.get()); return std::make_shared(priv, new NetworkManagerBasicSocketProxy(std::move(socket)), connectionId); } // ------- NetworkManagerBasicSocketProxy::NetworkManagerBasicSocketProxy(boost::asio::io_context &io_context) : m_socket(io_context) { } NetworkManagerBasicSocketProxy::NetworkManagerBasicSocketProxy(tcp::socket && socket) : m_socket(std::move(socket)) { } tcp::endpoint NetworkManagerBasicSocketProxy::remote_endpoint() const { return m_socket.remote_endpoint(); } void NetworkManagerBasicSocketProxy::newConnection(const tcp::endpoint &to, const std::shared_ptr &owner, const std::function &callback) { assert(q); m_socket = boost::asio::ip::tcp::socket(owner->d->ioContext); m_socket.async_connect(to, boost::asio::bind_executor(q->m_strand, callback)); } void NetworkManagerBasicSocketProxy::async_write(const std::vector &data, const std::function &callback) { auto q = this->q; assert(q); boost::asio::async_write(m_socket, data, boost::asio::bind_executor(q->m_strand, [callback](const boost::system::error_code &e, std::size_t s) { callback(e, s); })); } void NetworkManagerBasicSocketProxy::async_receive(Streaming::BufferPool &pool, const std::function &callback) { auto q = this->q; assert(q); m_socket.async_receive(boost::asio::buffer(pool.data(), static_cast(pool.capacity())), boost::asio::bind_executor(q->m_strand, [callback](const boost::system::error_code &e, std::size_t s) { callback(e, s); })); } bool NetworkManagerBasicSocketProxy::is_open() const { return m_socket.is_open(); } void NetworkManagerBasicSocketProxy::close() { boost::system::error_code ignored; m_socket.close(ignored); } // ------- #ifdef FLOWEE_NET_SSL NetworkManagerServerSSL::NetworkManagerServerSSL(const std::shared_ptr &parent, tcp::endpoint endpoint, const std::function &callback) : NetworkManagerServerBase(parent, endpoint, callback), m_context(boost::asio::ssl::context::tlsv13_server) { m_context.set_options( boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::single_dh_use // use a new dh for each connection. // only sslv3 and tls 1.3 | boost::asio::ssl::context::no_sslv2 | boost::asio::ssl::context::no_tlsv1 | boost::asio::ssl::context::no_tlsv1_1 | boost::asio::ssl::context::no_tlsv1_2); } void NetworkManagerServerSSL::shutdown() { m_acceptor.close(); } void NetworkManagerServerSSL::setCertificateChain(const Streaming::ConstBuffer &certificateChain) { m_context.use_certificate_chain(certificateChain); } void NetworkManagerServerSSL::setPrivateKey(const Streaming::ConstBuffer &privateKey) { m_context.use_private_key(privateKey, boost::asio::ssl::context::pem); // notice that the boost stuff allows setting a password callback for the certificate // we're not at this time exposing that feature. } void NetworkManagerServerSSL::setDHTemp(const Streaming::ConstBuffer &dhTemp) { assert(!dhTemp.isEmpty()); m_context.use_tmp_dh(dhTemp); } void NetworkManagerServerSSL::setupCallback() { if (!m_acceptor.is_open()) { logCritical(Log::NWM) << "No longer accepting incoming connections on" << m_endpoint.address().to_string(); return; } m_acceptor.async_accept([this](const boost::system::error_code &error, tcp::socket socket) { if (error.value() == boost::asio::error::operation_aborted) return; if (!error) acceptConnection(std::move(socket)); setupCallback(); }); } std::shared_ptr NetworkManagerServerSSL::createNewConnection(int connectionId, tcp::socket && socket) { std::shared_ptr priv = d.lock(); assert (priv.get()); auto *proxy = new NetworkManagerSSLSocketProxy(boost::asio::ssl::stream(std::move(socket), m_context)); return std::make_shared(priv, proxy, connectionId); } // ---------------------- NetworkManagerSSLSocketProxy::NetworkManagerSSLSocketProxy(boost::asio::ssl::stream socket) : m_context(boost::asio::ssl::context::tlsv13_server), m_socket(std::move(socket)) { } NetworkManagerSSLSocketProxy::NetworkManagerSSLSocketProxy(boost::asio::io_context &ioContext) : m_context(boost::asio::ssl::context::tlsv13_client), m_socket(boost::asio::ssl::stream(ioContext, m_context)) { } void NetworkManagerSSLSocketProxy::setClientCertificate(const Streaming::ConstBuffer &data) { m_context.add_certificate_authority(data); } tcp::endpoint NetworkManagerSSLSocketProxy::remote_endpoint() const { return m_socket.next_layer().remote_endpoint(); } void NetworkManagerSSLSocketProxy::newConnection(const tcp::endpoint &to, const std::shared_ptr &owner, const std::function &callback) { m_socket = boost::asio::ssl::stream(owner->d->ioContext, m_context); m_socket.set_verify_mode(boost::asio::ssl::verify_peer); m_socket.lowest_layer().async_connect(to, boost::asio::bind_executor(owner->m_strand, [callback, to, owner, this](const boost::system::error_code& error) { if (error) { if (error == boost::asio::error::operation_aborted) return; logWarning(Log::NWM) << "connect to" << to << "error:" << error.message(); owner->errorDetected(error); } else { m_socket.async_handshake(boost::asio::ssl::stream_base::client, boost::asio::bind_executor(owner->m_strand, [callback, owner](const boost::system::error_code& error) { if (error) { if (error == boost::asio::error::operation_aborted) return; logWarning(Log::NWM) << "SSL handshake failed, pushing error up the stack." << error.message(); owner->errorDetected(error); } else { callback(error); } })); } })); } void NetworkManagerSSLSocketProxy::async_write(const std::vector &data, const std::function &callback) { auto q = this->q; assert(q); boost::asio::async_write(m_socket, data, boost::asio::bind_executor(q->m_strand, [callback](const boost::system::error_code &e, std::size_t s) { callback(e, s); })); } void NetworkManagerSSLSocketProxy::async_receive(Streaming::BufferPool &pool, const std::function &callback) { auto q = this->q; assert(q); m_socket.async_read_some(boost::asio::buffer(pool.data(), static_cast(pool.capacity())), boost::asio::bind_executor(q->m_strand, [callback](const boost::system::error_code &e, std::size_t s) { callback(e, s); })); } bool NetworkManagerSSLSocketProxy::is_open() const { return m_socket.next_layer().is_open(); } void NetworkManagerSSLSocketProxy::close() { boost::system::error_code ignored; m_socket.next_layer().close(ignored); } void NetworkManagerSSLSocketProxy::doASyncHandshake(const std::shared_ptr &owner) { // this is the server, we just got a new incoming connection and we need to validate the ssl part. m_socket.async_handshake(boost::asio::ssl::stream_base::server, boost::asio::bind_executor(owner->m_strand, [owner](const boost::system::error_code &error) { if (error) { owner->errorDetected(error); owner->disconnect(); } else { owner->requestMoreBytes(); } })); } #endif