/* * This file is part of the Flowee project * Copyright (C) 2020-2025 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "ConnectionManager.h" #include "DownloadManager.h" #include "P2PNetInterface.h" #include "Peer.h" #include "PrivacySegment.h" #include "BroadcastTxData.h" #include #include #include #include #include #include ConnectionManager::ConnectionManager(boost::asio::io_context &context, const boost::filesystem::path &basedir, DownloadManager *parent) : m_shuttingDown(false), m_ioContext(context), m_cronTimer(m_ioContext), m_peerAddressDb(this), m_network(m_ioContext), m_dlManager(parent), m_basedir(basedir) { // The nonce is used in the status message to allow detection of connect-to-self. m_appNonce = GetRand(-1); std::map table; table.insert(std::make_pair(Api::P2P::Version, "version")); table.insert(std::make_pair(Api::P2P::VersionAck, "verack")); table.insert(std::make_pair(Api::P2P::Ping, "ping")); table.insert(std::make_pair(Api::P2P::Pong, "pong")); table.insert(std::make_pair(Api::P2P::PreferHeaders, "sendheaders")); table.insert(std::make_pair(Api::P2P::GetHeaders, "getheaders")); table.insert(std::make_pair(Api::P2P::GetBlocks, "getblocks")); table.insert(std::make_pair(Api::P2P::Headers, "headers")); table.insert(std::make_pair(Api::P2P::RejectData, "reject")); table.insert(std::make_pair(Api::P2P::Inventory, "inv")); table.insert(std::make_pair(Api::P2P::GetAddr, "getaddr")); table.insert(std::make_pair(Api::P2P::Addresses, "addr")); table.insert(std::make_pair(Api::P2P::Data_Transaction, "tx")); table.insert(std::make_pair(Api::P2P::Data_MerkleBlock, "merkleblock")); table.insert(std::make_pair(Api::P2P::Data_DSProof, "dsproof-beta")); table.insert(std::make_pair(Api::P2P::FilterLoad, "filterload")); table.insert(std::make_pair(Api::P2P::FilterClear, "filterclear")); table.insert(std::make_pair(Api::P2P::GetData, "getdata")); table.insert(std::make_pair(Api::P2P::Mempool, "mempool")); // specially added to detect not-bitcoincash clients: table.insert(std::make_pair(Api::P2P::AVAHello, "avahello")); table.insert(std::make_pair(Api::P2P::AlertMessage, "alert")); table.insert(std::make_pair(Api::P2P::ProtoConf, "protoconf")); m_network.setMessageIdLookup(table); // network selection if (parent->chain() == P2PNet::Testnet4Chain) { std::vector magic(4); magic[0] = 0xe2; magic[1] = 0xb7; magic[2] = 0xda; magic[3] = 0xaf; m_network.setLegacyNetworkId(magic); m_peerAddressDb.setDefaultPortNr(28333); } m_cronTimer.expires_from_now(boost::posix_time::seconds(20)); m_cronTimer.async_wait(boost::asio::bind_executor(parent->strand(), std::bind(&ConnectionManager::cron, this, std::placeholders::_1))); m_userAgent = "Flowee-P2PNet-based app"; m_peerAddressDb.loadDatabase(m_basedir); } void ConnectionManager::addInvMessage(const Message &message, int sourcePeerId) { if (m_shuttingDown) return; m_dlManager->strand().post(std::bind(&DownloadManager::parseInvMessage, m_dlManager, message, sourcePeerId), std::allocator()); } void ConnectionManager::addTransaction(const Tx &message, int sourcePeerId) { if (m_shuttingDown) return; m_dlManager->strand().post(std::bind(&DownloadManager::parseTransaction, m_dlManager, message, sourcePeerId), std::allocator()); } void ConnectionManager::connect(PeerAddress &address) { if (m_shuttingDown) return; if (m_dlManager->powerMode() != P2PNet::NormalPower) return; auto con = m_network.connection(address.peerAddress()); std::unique_lock lock(m_lock); // first check if we are already have a Peer for this endpoint if (m_peers.find(con.connectionId()) == m_peers.end()) { address.punishPeer(100); // when the connection succeeds, we remove the 100 again. con.setOnError(std::bind(&ConnectionManager::handleError, this, std::placeholders::_1, std::placeholders::_2)); con.setMessageQueueSizes(m_queueSize, 3); auto p = std::make_shared(this, address); p->connect(std::move(con)); m_peers.insert(std::make_pair(p->connectionId(), p)); for (auto iface : m_dlManager->p2pNetListeners()) { iface->newConnection(p); } } } void ConnectionManager::disconnect(const std::shared_ptr &peer) { if (m_shuttingDown || peer.get() == nullptr) return; for (auto iface : m_dlManager->p2pNetListeners()) { iface->lostPeer(peer); } std::unique_lock lock(m_lock); assert(m_peers.find(peer->connectionId()) != m_peers.end()); removePeer(peer); } uint64_t ConnectionManager::servicesBitfield() const { return m_servicesBitfield; } void ConnectionManager::setServicesBitfield(const uint64_t &servicesBitfield) { m_servicesBitfield = servicesBitfield; } int ConnectionManager::blockHeight() const { return m_blockHeight; } void ConnectionManager::setBlockHeight(int blockHeight) { assert(blockHeight >= 0); m_blockHeight = blockHeight; } int ConnectionManager::blockHeightFor(const uint256 &blockId) { return m_dlManager->blockchain().blockHeightFor(blockId); } uint256 ConnectionManager::blockHashFor(int height) { return m_dlManager->blockchain().block(height).createHash(); } uint64_t ConnectionManager::appNonce() const { return m_appNonce; } void ConnectionManager::connectionEstablished(const std::shared_ptr &peer) { if (m_shuttingDown) return; assert(peer); assert(peer->peerAddress().isValid()); peer->peerAddress().punishPeer(-100); // this mirrors the 100 when we started connecting. std::unique_lock lock(m_lock); // don't use if the client doesn't support any usable services. if (!peer->supplies_bloom() || !peer->supplies_network() || !peer->relaysTransactions()) { logWarning() << "Rejecting. Need BLOOM and NETWORK and tx-relay. Peer:" << peer->connectionId() << peer->userAgent() << peer->peerAddress(); removePeer(peer); return; } for (auto iface : m_dlManager->p2pNetListeners()) { iface->newPeer(peer); } m_connectedPeers.insert(peer->connectionId()); // we use headers to verify that the peer is on the same chain as us. Every // peer will have to answer the request at least once, and we add repeat requests // after a certain amount of time. if (std::abs(m_dlManager->blockchain().expectedBlockHeight() - m_blockHeight) < 800 && (peer->peerAddress().lastReceivedGoodHeaders() == 0 || (time(nullptr) - peer->peerAddress().lastReceivedGoodHeaders() > 3600 * 24 * 14))) { // check if this peer is using the same chain as us. // notice that we picked 800 above because the default requestHeaders // will work correcctly with us being behind up to 1000 headers. requestHeaders(peer, 9); } } void ConnectionManager::addBlockHeaders(const Message &message, int sourcePeerId) { if (m_shuttingDown) return; // TODO if downloadmanager triggered this // then update metadata on the speed of this peer. m_dlManager->strand().post(std::bind(&Blockchain::processBlockHeaders, &m_dlManager->blockchain(), message, sourcePeerId), std::allocator()); } void ConnectionManager::addAddresses(const Message &message, int sourcePeerId) { if (m_shuttingDown) return; m_dlManager->strand().post(std::bind(&PeerAddressDB::processAddressMessage, &m_peerAddressDb, message, sourcePeerId), std::allocator()); } bool ConnectionManager::punish(const std::shared_ptr &peer, int amount) { assert(peer); if (m_shuttingDown) return false; auto address = peer->peerAddress(); short total = PUNISHMENT_MAX; short previous = total; if (address.isValid()) { previous = address.punishment(); total = address.punishPeer(amount); for (auto iface : m_dlManager->p2pNetListeners()) { iface->punishmentChanged(peer); } } if (total >= PUNISHMENT_MAX) { // too much punishment leads to a ban logWarning() << "Ban peer:" << peer->connectionId() << previous << "=>" << total << "Address:" << peer->peerAddress(); for (auto iface : m_dlManager->p2pNetListeners()) { iface->lostPeer(peer); } std::unique_lock lock(m_lock); removePeer(peer); return true; } return false; } bool ConnectionManager::punish(int connectionId, int amount) { std::shared_ptr p; { std::unique_lock lock(m_lock); auto peerIter = m_peers.find(connectionId); if (peerIter == m_peers.end()) return false; p = peerIter->second; } return punish(p, amount); } void ConnectionManager::requestHeaders(const std::shared_ptr &peer, int skipHeaders) { if (m_shuttingDown || peer.get() == nullptr) return; auto message = m_dlManager->blockchain().createGetHeadersRequest(skipHeaders); peer->setRequestedHeader(true); peer->sendMessage(message); } std::deque> ConnectionManager::connectedPeers() const { std::unique_lock lock(m_lock); std::deque> answer; if (m_shuttingDown) return answer; for (auto i : m_connectedPeers) { auto p = m_peers.find(i); assert(p != m_peers.end()); answer.push_back(p->second); } return answer; } int ConnectionManager::unconnectedPeerCount() const { std::unique_lock lock(m_lock); int answer = 0; if (m_shuttingDown) return answer; for (const auto &i : m_peers) { if (i.second.get() && m_connectedPeers.find(i.first) == m_connectedPeers.end()) ++answer; } return answer; } std::shared_ptr ConnectionManager::peer(int connectionId) const { std::unique_lock lock(m_lock); auto i = m_peers.find(connectionId); if (i == m_peers.end()) return nullptr; return i->second; } void ConnectionManager::addPrivacySegment(const std::shared_ptr &ps) { assert(ps); #ifndef NDEBUG // don't add it twice, please. for (auto s : m_segments) { assert (s != ps); } #endif m_segments.push_back(ps); } void ConnectionManager::removePrivacySegment(const std::shared_ptr &ps) { assert(ps); for (auto s = m_segments.begin(); s != m_segments.end(); ++s) { if (ps == *s) { m_segments.erase(s); // shutdown peers that still point to this segment. auto copy = m_peers; // avoid iterator invalidation due to modification for (auto i = copy.begin(); i != copy.end(); ++i) { auto peersPs = i->second->privacySegment().lock(); if (peersPs == ps) disconnect(i->second); } break; } } } void ConnectionManager::setUserAgent(const std::string &userAgent) { m_userAgent = userAgent; } void ConnectionManager::broadcastTransaction(const std::shared_ptr &txOwner) { const auto id = txOwner->privSegment(); std::unique_lock lock(m_lock); for (auto iter = m_peers.begin(); iter != m_peers.end(); ++iter) { auto peer = iter->second; auto privacySegment = peer->privacySegment().lock(); if (privacySegment && privacySegment->segmentId() == id) { peer->sendTx(txOwner); } } // store it here so when the SPV action connects new peers to a segment, // it can send those transactions to them too. m_transactionsToBroadcast.push_back(txOwner); } int ConnectionManager::peerCount() const { assert(m_peers.size() <= INT_MAX); return int(m_peers.size()); } void ConnectionManager::cron(const boost::system::error_code &error) { if (error) return; if (m_shuttingDown) return; if (m_dlManager->powerMode() != P2PNet::NormalPower) return; m_cronTimer.expires_from_now(boost::posix_time::seconds(20)); m_cronTimer.async_wait(boost::asio::bind_executor(m_dlManager->strand(), std::bind(&ConnectionManager::cron, this, std::placeholders::_1))); logDebug() << "Cron"; int now = static_cast(time(nullptr)); // check for connections that don't seem to connect. std::unique_lock lock(m_lock); for (auto iter = m_peers.begin(); iter != m_peers.end();) { Peer *peer = iter->second.get(); bool kick = peer->status() != Peer::Connected || peer->protocolVersion() == 0; if (peer->connectTime() == 0) // not connected yet kick &= now - peer->timeOffset() > 10; // no more than 10 sec to try to connect else kick &= now - peer->connectTime() > 20; // no more than 20 seconds for version handshake. if (kick) { auto peerAddress = peer->peerAddress(); logInfo() << "peer:" << iter->first << "kicking. Address:" << peerAddress; iter = m_peers.erase(iter); peer->shutdown(); } else { auto log = logInfo() << "peer:" << iter->first; if (peer->status() == Peer::Connecting) log << "Address:" << peer->peerAddress() << "[connecting]"; else log << peer->userAgent(); auto privacySegment = peer->privacySegment().lock(); if (privacySegment) log << "Wallet:" << privacySegment->segmentId(); if (peer->connectTime() > 0) log.nospace() << "(" << now - peer->connectTime() << "s)"; if (peer->peerHeight() > 0) log.nospace() << " @" << peer->peerHeight(); ++iter; } } auto iter = m_transactionsToBroadcast.begin(); while (iter != m_transactionsToBroadcast.end()) { auto locked = iter->lock(); if (!locked.get()) { // expired logDebug() << "Transaction broadcast struct has expired."; iter = m_transactionsToBroadcast.erase(iter); } else { ++iter; } } } void ConnectionManager::handleError(int remoteId, const boost::system::error_code &error) { m_dlManager->strand().post(std::bind(&ConnectionManager::handleError_impl, this, remoteId, error), std::allocator()); } void ConnectionManager::handleError_impl(int peerId, const boost::system::error_code &error) { if (m_shuttingDown) return; bool remove = false; int punishment = 180; // unknown error if (error == boost::asio::error::host_unreachable || error == boost::asio::error::network_unreachable || error.value() == EADDRNOTAVAIL) { remove = true; punishment = 45; // Typically structural networking issue. Could be fixed later, though. } else if (error == boost::asio::error::host_not_found) { remove = true; punishment = 450; // faulty DNS name. } else if (error == boost::asio::error::connection_refused || error == boost::asio::error::connection_aborted || error == boost::asio::error::connection_reset) { remove = true; punishment = -90; // almost give back the 100, but not entirly to down-prioritize it on random connects } auto remotePeer = peer(peerId); if (!remotePeer) return; logWarning().nospace() << "Peer: " << peerId << " " << remotePeer->peerAddress() << " got error. (" << error.value() << "=" << error.message() << ") Punishment: " << punishment; bool removed = punish(remotePeer, punishment); if (remove && !removed) { logDebug() << "removing" << peerId; for (auto iface : m_dlManager->p2pNetListeners()) { iface->lostPeer(remotePeer); } std::unique_lock lock(m_lock); removePeer(remotePeer); } } void ConnectionManager::removePeer(const std::shared_ptr &p) { const int id = p->connectionId(); p->shutdown(); auto i = m_connectedPeers.find(id); if (i != m_connectedPeers.end()) { m_connectedPeers.erase(i); m_dlManager->peerDisconnected(id); } auto iter = m_peers.find(id); assert (iter != m_peers.end()); m_peers.erase(iter); } P2PNet::NetworkCertainty ConnectionManager::blockHeightCertainty() const { return m_blockHeightCertainty; } void ConnectionManager::setBlockHeightCertainty(P2PNet::NetworkCertainty certainty) { if (certainty == m_blockHeightCertainty) return; m_blockHeightCertainty = certainty; for (auto iface : m_dlManager->p2pNetListeners()) { iface->newBlockHeightCertainty(certainty); } } std::deque > ConnectionManager::transactionsToBroadcast() const { std::unique_lock lock(m_lock); return m_transactionsToBroadcast; } std::deque > ConnectionManager::segments() const { return m_segments; } void ConnectionManager::shutdown() { std::unique_lock lock(m_lock); if (m_shuttingDown) return; m_shuttingDown = true; m_cronTimer.cancel(); auto copy(m_peers); for (const auto &peer : copy) { removePeer(peer.second); } assert(m_peers.empty()); saveData(); } void ConnectionManager::saveData() { m_peerAddressDb.saveDatabase(m_basedir); } void ConnectionManager::setMessageQueueSize(int size) { assert(size >= 1); assert(size <= 0xeFFF); m_queueSize = static_cast(size); } void ConnectionManager::onPowerModeChanged() { switch (m_dlManager->powerMode()) { case P2PNet::NormalPower: m_cronTimer.cancel(); m_cronTimer.expires_from_now(boost::posix_time::seconds(20)); m_cronTimer.async_wait(boost::asio::bind_executor(m_dlManager->strand(), std::bind(&ConnectionManager::cron, this, std::placeholders::_1))); break; case P2PNet::LowPower: { m_cronTimer.cancel(); auto copy(m_peers); for (const auto &peer : copy) { disconnect(peer.second); } break; } } }