Files
thehub/libs/p2p/ConnectionManager.cpp
T
tomFlowee 5f6db1de18 Add feature: p2pnet power save mode.
Setting the "LowPower" mode on the main DownloadManager class will
cause it to halt all network activity and jobs will die.
You can revive it by changing it to NormalPower and optionally calling
the start() method to restart all jobs.
2025-03-07 14:31:02 +01:00

570 lines
19 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2020-2025 Tom Zander <tom@flowee.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ConnectionManager.h"
#include "DownloadManager.h"
#include "P2PNetInterface.h"
#include "Peer.h"
#include "PrivacySegment.h"
#include "BroadcastTxData.h"
#include <random.h>
#include <streaming/BufferPool.h>
#include <streaming/BufferPools.h>
#include <streaming/P2PBuilder.h>
#include <APIProtocol.h>
#include <boost/asio/bind_executor.hpp>
ConnectionManager::ConnectionManager(boost::asio::io_context &context, const boost::filesystem::path &basedir, DownloadManager *parent)
: m_shuttingDown(false),
m_ioContext(context),
m_cronTimer(m_ioContext),
m_peerAddressDb(this),
m_network(m_ioContext),
m_dlManager(parent),
m_basedir(basedir)
{
// The nonce is used in the status message to allow detection of connect-to-self.
m_appNonce = GetRand(-1);
std::map<int, std::string> table;
table.insert(std::make_pair(Api::P2P::Version, "version"));
table.insert(std::make_pair(Api::P2P::VersionAck, "verack"));
table.insert(std::make_pair(Api::P2P::Ping, "ping"));
table.insert(std::make_pair(Api::P2P::Pong, "pong"));
table.insert(std::make_pair(Api::P2P::PreferHeaders, "sendheaders"));
table.insert(std::make_pair(Api::P2P::GetHeaders, "getheaders"));
table.insert(std::make_pair(Api::P2P::GetBlocks, "getblocks"));
table.insert(std::make_pair(Api::P2P::Headers, "headers"));
table.insert(std::make_pair(Api::P2P::RejectData, "reject"));
table.insert(std::make_pair(Api::P2P::Inventory, "inv"));
table.insert(std::make_pair(Api::P2P::GetAddr, "getaddr"));
table.insert(std::make_pair(Api::P2P::Addresses, "addr"));
table.insert(std::make_pair(Api::P2P::Data_Transaction, "tx"));
table.insert(std::make_pair(Api::P2P::Data_MerkleBlock, "merkleblock"));
table.insert(std::make_pair(Api::P2P::Data_DSProof, "dsproof-beta"));
table.insert(std::make_pair(Api::P2P::FilterLoad, "filterload"));
table.insert(std::make_pair(Api::P2P::FilterClear, "filterclear"));
table.insert(std::make_pair(Api::P2P::GetData, "getdata"));
table.insert(std::make_pair(Api::P2P::Mempool, "mempool"));
// specially added to detect not-bitcoincash clients:
table.insert(std::make_pair(Api::P2P::AVAHello, "avahello"));
table.insert(std::make_pair(Api::P2P::AlertMessage, "alert"));
table.insert(std::make_pair(Api::P2P::ProtoConf, "protoconf"));
m_network.setMessageIdLookup(table);
// network selection
if (parent->chain() == P2PNet::Testnet4Chain) {
std::vector<uint8_t> magic(4);
magic[0] = 0xe2;
magic[1] = 0xb7;
magic[2] = 0xda;
magic[3] = 0xaf;
m_network.setLegacyNetworkId(magic);
m_peerAddressDb.setDefaultPortNr(28333);
}
m_cronTimer.expires_from_now(boost::posix_time::seconds(20));
m_cronTimer.async_wait(boost::asio::bind_executor(parent->strand(),
std::bind(&ConnectionManager::cron, this, std::placeholders::_1)));
m_userAgent = "Flowee-P2PNet-based app";
m_peerAddressDb.loadDatabase(m_basedir);
}
void ConnectionManager::addInvMessage(const Message &message, int sourcePeerId)
{
if (m_shuttingDown)
return;
m_dlManager->strand().post(std::bind(&DownloadManager::parseInvMessage, m_dlManager, message, sourcePeerId), std::allocator<void>());
}
void ConnectionManager::addTransaction(const Tx &message, int sourcePeerId)
{
if (m_shuttingDown)
return;
m_dlManager->strand().post(std::bind(&DownloadManager::parseTransaction, m_dlManager, message, sourcePeerId), std::allocator<void>());
}
void ConnectionManager::connect(PeerAddress &address)
{
if (m_shuttingDown)
return;
if (m_dlManager->powerMode() != P2PNet::NormalPower)
return;
auto con = m_network.connection(address.peerAddress());
std::unique_lock<std::mutex> lock(m_lock);
// first check if we are already have a Peer for this endpoint
if (m_peers.find(con.connectionId()) == m_peers.end()) {
address.punishPeer(100); // when the connection succeeds, we remove the 100 again.
con.setOnError(std::bind(&ConnectionManager::handleError, this, std::placeholders::_1, std::placeholders::_2));
con.setMessageQueueSizes(m_queueSize, 3);
auto p = std::make_shared<Peer>(this, address);
p->connect(std::move(con));
m_peers.insert(std::make_pair(p->connectionId(), p));
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->newConnection(p);
}
}
}
void ConnectionManager::disconnect(const std::shared_ptr<Peer> &peer)
{
if (m_shuttingDown || peer.get() == nullptr)
return;
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->lostPeer(peer);
}
std::unique_lock<std::mutex> lock(m_lock);
assert(m_peers.find(peer->connectionId()) != m_peers.end());
removePeer(peer);
}
uint64_t ConnectionManager::servicesBitfield() const
{
return m_servicesBitfield;
}
void ConnectionManager::setServicesBitfield(const uint64_t &servicesBitfield)
{
m_servicesBitfield = servicesBitfield;
}
int ConnectionManager::blockHeight() const
{
return m_blockHeight;
}
void ConnectionManager::setBlockHeight(int blockHeight)
{
assert(blockHeight >= 0);
m_blockHeight = blockHeight;
}
int ConnectionManager::blockHeightFor(const uint256 &blockId)
{
return m_dlManager->blockchain().blockHeightFor(blockId);
}
uint256 ConnectionManager::blockHashFor(int height)
{
return m_dlManager->blockchain().block(height).createHash();
}
uint64_t ConnectionManager::appNonce() const
{
return m_appNonce;
}
void ConnectionManager::connectionEstablished(const std::shared_ptr<Peer> &peer)
{
if (m_shuttingDown)
return;
assert(peer);
assert(peer->peerAddress().isValid());
peer->peerAddress().punishPeer(-100); // this mirrors the 100 when we started connecting.
std::unique_lock<std::mutex> lock(m_lock);
// don't use if the client doesn't support any usable services.
if (!peer->supplies_bloom() || !peer->supplies_network() || !peer->relaysTransactions()) {
logWarning() << "Rejecting. Need BLOOM and NETWORK and tx-relay. Peer:" << peer->connectionId() << peer->userAgent()
<< peer->peerAddress();
removePeer(peer);
return;
}
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->newPeer(peer);
}
m_connectedPeers.insert(peer->connectionId());
// we use headers to verify that the peer is on the same chain as us. Every
// peer will have to answer the request at least once, and we add repeat requests
// after a certain amount of time.
if (std::abs(m_dlManager->blockchain().expectedBlockHeight() - m_blockHeight) < 800
&& (peer->peerAddress().lastReceivedGoodHeaders() == 0
|| (time(nullptr) - peer->peerAddress().lastReceivedGoodHeaders() > 3600 * 24 * 14))) {
// check if this peer is using the same chain as us.
// notice that we picked 800 above because the default requestHeaders
// will work correcctly with us being behind up to 1000 headers.
requestHeaders(peer, 9);
}
}
void ConnectionManager::addBlockHeaders(const Message &message, int sourcePeerId)
{
if (m_shuttingDown)
return;
// TODO if downloadmanager triggered this
// then update metadata on the speed of this peer.
m_dlManager->strand().post(std::bind(&Blockchain::processBlockHeaders,
&m_dlManager->blockchain(), message, sourcePeerId), std::allocator<void>());
}
void ConnectionManager::addAddresses(const Message &message, int sourcePeerId)
{
if (m_shuttingDown)
return;
m_dlManager->strand().post(std::bind(&PeerAddressDB::processAddressMessage,
&m_peerAddressDb, message, sourcePeerId), std::allocator<void>());
}
bool ConnectionManager::punish(const std::shared_ptr<Peer> &peer, int amount)
{
assert(peer);
if (m_shuttingDown)
return false;
auto address = peer->peerAddress();
short total = PUNISHMENT_MAX;
short previous = total;
if (address.isValid()) {
previous = address.punishment();
total = address.punishPeer(amount);
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->punishmentChanged(peer);
}
}
if (total >= PUNISHMENT_MAX) { // too much punishment leads to a ban
logWarning() << "Ban peer:" << peer->connectionId() << previous << "=>" << total
<< "Address:" << peer->peerAddress();
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->lostPeer(peer);
}
std::unique_lock<std::mutex> lock(m_lock);
removePeer(peer);
return true;
}
return false;
}
bool ConnectionManager::punish(int connectionId, int amount)
{
std::shared_ptr<Peer> p;
{
std::unique_lock<std::mutex> lock(m_lock);
auto peerIter = m_peers.find(connectionId);
if (peerIter == m_peers.end())
return false;
p = peerIter->second;
}
return punish(p, amount);
}
void ConnectionManager::requestHeaders(const std::shared_ptr<Peer> &peer, int skipHeaders)
{
if (m_shuttingDown || peer.get() == nullptr)
return;
auto message = m_dlManager->blockchain().createGetHeadersRequest(skipHeaders);
peer->setRequestedHeader(true);
peer->sendMessage(message);
}
std::deque<std::shared_ptr<Peer>> ConnectionManager::connectedPeers() const
{
std::unique_lock<std::mutex> lock(m_lock);
std::deque<std::shared_ptr<Peer>> answer;
if (m_shuttingDown)
return answer;
for (auto i : m_connectedPeers) {
auto p = m_peers.find(i);
assert(p != m_peers.end());
answer.push_back(p->second);
}
return answer;
}
int ConnectionManager::unconnectedPeerCount() const
{
std::unique_lock<std::mutex> lock(m_lock);
int answer = 0;
if (m_shuttingDown)
return answer;
for (const auto &i : m_peers) {
if (i.second.get() && m_connectedPeers.find(i.first) == m_connectedPeers.end())
++answer;
}
return answer;
}
std::shared_ptr<Peer> ConnectionManager::peer(int connectionId) const
{
std::unique_lock<std::mutex> lock(m_lock);
auto i = m_peers.find(connectionId);
if (i == m_peers.end())
return nullptr;
return i->second;
}
void ConnectionManager::addPrivacySegment(const std::shared_ptr<PrivacySegment> &ps)
{
assert(ps);
#ifndef NDEBUG
// don't add it twice, please.
for (auto s : m_segments) {
assert (s != ps);
}
#endif
m_segments.push_back(ps);
}
void ConnectionManager::removePrivacySegment(const std::shared_ptr<PrivacySegment> &ps)
{
assert(ps);
for (auto s = m_segments.begin(); s != m_segments.end(); ++s) {
if (ps == *s) {
m_segments.erase(s);
// shutdown peers that still point to this segment.
auto copy = m_peers; // avoid iterator invalidation due to modification
for (auto i = copy.begin(); i != copy.end(); ++i) {
auto peersPs = i->second->privacySegment().lock();
if (peersPs == ps)
disconnect(i->second);
}
break;
}
}
}
void ConnectionManager::setUserAgent(const std::string &userAgent)
{
m_userAgent = userAgent;
}
void ConnectionManager::broadcastTransaction(const std::shared_ptr<BroadcastTxData> &txOwner)
{
const auto id = txOwner->privSegment();
std::unique_lock<std::mutex> lock(m_lock);
for (auto iter = m_peers.begin(); iter != m_peers.end(); ++iter) {
Peer *peer = iter->second.get();
auto privacySegment = peer->privacySegment().lock();
if (privacySegment && privacySegment->segmentId() == id) {
peer->sendTx(txOwner);
}
}
// store it here so when the SPV action connects new peers to a segment,
// it can send those transactions to them too.
m_transactionsToBroadcast.push_back(txOwner);
}
int ConnectionManager::peerCount() const
{
assert(m_peers.size() <= INT_MAX);
return int(m_peers.size());
}
void ConnectionManager::cron(const boost::system::error_code &error)
{
if (error)
return;
if (m_shuttingDown)
return;
if (m_dlManager->powerMode() != P2PNet::NormalPower)
return;
m_cronTimer.expires_from_now(boost::posix_time::seconds(20));
m_cronTimer.async_wait(boost::asio::bind_executor(m_dlManager->strand(),
std::bind(&ConnectionManager::cron, this, std::placeholders::_1)));
logDebug() << "Cron";
int now = static_cast<uint32_t>(time(nullptr));
// check for connections that don't seem to connect.
std::unique_lock<std::mutex> lock(m_lock);
for (auto iter = m_peers.begin(); iter != m_peers.end();) {
Peer *peer = iter->second.get();
bool kick = peer->status() != Peer::Connected || peer->protocolVersion() == 0;
if (peer->connectTime() == 0) // not connected yet
kick &= now - peer->timeOffset() > 10; // no more than 10 sec to try to connect
else
kick &= now - peer->connectTime() > 20; // no more than 20 seconds for version handshake.
if (kick) {
auto peerAddress = peer->peerAddress();
logInfo() << "peer:" << iter->first << "kicking. Address:" << peerAddress;
iter = m_peers.erase(iter);
peer->shutdown();
}
else {
auto log = logInfo() << "peer:" << iter->first;
if (peer->status() == Peer::Connecting)
log << "Address:" << peer->peerAddress() << "[connecting]";
else
log << peer->userAgent();
auto privacySegment = peer->privacySegment().lock();
if (privacySegment)
log << "Wallet:" << privacySegment->segmentId();
if (peer->connectTime() > 0)
log.nospace() << "(" << now - peer->connectTime() << "s)";
if (peer->peerHeight() > 0)
log.nospace() << " @" << peer->peerHeight();
++iter;
}
}
auto iter = m_transactionsToBroadcast.begin();
while (iter != m_transactionsToBroadcast.end()) {
auto locked = iter->lock();
if (!locked.get()) { // expired
logDebug() << "Transaction broadcast struct has expired.";
iter = m_transactionsToBroadcast.erase(iter);
} else {
++iter;
}
}
}
void ConnectionManager::handleError(int remoteId, const boost::system::error_code &error)
{
m_dlManager->strand().post(std::bind(&ConnectionManager::handleError_impl, this, remoteId, error), std::allocator<void>());
}
void ConnectionManager::handleError_impl(int peerId, const boost::system::error_code &error)
{
if (m_shuttingDown)
return;
bool remove = false;
int punishment = 180; // unknown error
if (error == boost::asio::error::host_unreachable || error == boost::asio::error::network_unreachable
|| error.value() == EADDRNOTAVAIL) {
remove = true;
punishment = 45; // Typically structural networking issue. Could be fixed later, though.
}
else if (error == boost::asio::error::host_not_found) {
remove = true;
punishment = 450; // faulty DNS name.
}
else if (error == boost::asio::error::connection_refused
|| error == boost::asio::error::connection_aborted
|| error == boost::asio::error::connection_reset) {
remove = true;
punishment = -90; // almost give back the 100, but not entirly to down-prioritize it on random connects
}
auto remotePeer = peer(peerId);
if (!remotePeer)
return;
logWarning().nospace() << "Peer: " << peerId << " " << remotePeer->peerAddress()
<< " got error. (" << error.value() << "=" << error.message()
<< ") Punishment: " << punishment;
bool removed = punish(remotePeer, punishment);
if (remove && !removed) {
logDebug() << "removing" << peerId;
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->lostPeer(remotePeer);
}
std::unique_lock<std::mutex> lock(m_lock);
removePeer(remotePeer);
}
}
void ConnectionManager::removePeer(const std::shared_ptr<Peer> &p)
{
const int id = p->connectionId();
p->shutdown();
auto i = m_connectedPeers.find(id);
if (i != m_connectedPeers.end()) {
m_connectedPeers.erase(i);
m_dlManager->peerDisconnected(id);
}
auto iter = m_peers.find(id);
assert (iter != m_peers.end());
m_peers.erase(iter);
}
P2PNet::NetworkCertainty ConnectionManager::blockHeightCertainty() const
{
return m_blockHeightCertainty;
}
void ConnectionManager::setBlockHeightCertainty(P2PNet::NetworkCertainty certainty)
{
if (certainty == m_blockHeightCertainty)
return;
m_blockHeightCertainty = certainty;
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->newBlockHeightCertainty(certainty);
}
}
std::deque<std::weak_ptr<BroadcastTxData> > ConnectionManager::transactionsToBroadcast() const
{
std::unique_lock<std::mutex> lock(m_lock);
return m_transactionsToBroadcast;
}
std::deque<std::shared_ptr<PrivacySegment> > ConnectionManager::segments() const
{
return m_segments;
}
void ConnectionManager::shutdown()
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_shuttingDown)
return;
m_shuttingDown = true;
m_cronTimer.cancel();
auto copy(m_peers);
for (const auto &peer : copy) {
removePeer(peer.second);
}
assert(m_peers.empty());
saveData();
}
void ConnectionManager::saveData()
{
m_peerAddressDb.saveDatabase(m_basedir);
}
void ConnectionManager::setMessageQueueSize(int size)
{
assert(size >= 1);
assert(size <= 0xeFFF);
m_queueSize = static_cast<short>(size);
}
void ConnectionManager::onPowerModeChanged()
{
switch (m_dlManager->powerMode()) {
case P2PNet::NormalPower:
m_cronTimer.cancel();
m_cronTimer.expires_from_now(boost::posix_time::seconds(20));
m_cronTimer.async_wait(boost::asio::bind_executor(m_dlManager->strand(),
std::bind(&ConnectionManager::cron, this, std::placeholders::_1)));
break;
case P2PNet::LowPower: {
m_cronTimer.cancel();
auto copy(m_peers);
for (const auto &peer : copy) {
disconnect(peer.second);
}
break;
}
}
}