Files
thehub/libs/p2p/SyncSPVAction.cpp
T
tomFlowee f8e9b45229 Be more vigilant in getting peers.
This changes the SPV action to not actually exit when sync
is completed, but instead keep running in the background
so it will detect when a wallet loses a peer and reinstate one.
2023-06-15 14:30:29 +02:00

359 lines
17 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2020-2023 Tom Zander <tom@flowee.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "SyncSPVAction.h"
#include "PrivacySegment.h"
#include "DownloadManager.h"
#include "Peer.h"
#include <set>
SyncSPVAction::SyncSPVAction(DownloadManager *parent)
: Action(parent),
MinPeersPerWallet(parent->chain() == P2PNet::MainChain ? 3 : 1)
{
}
namespace {
struct WalletInfo {
PrivacySegment *segment;
/*
* peers that we connect to are remembered between runs and in that state
* we also store the segment-id.
* So an unassigned peer can actually be associated to one wallet and such
* peers are stored in this struct as opposed to a 'global' one (scope=execute() method).
*/
std::set<std::shared_ptr<Peer> > unassignedPeers;
std::set<std::shared_ptr<Peer> > assignedPeers;
std::shared_ptr<Peer> downloader;
};
}
void SyncSPVAction::execute(const boost::system::error_code &error)
{
if (error)
return;
/*
* the bloom filters and the 'mempool' call.
*
* Every peer that we connect to should get the bloom filter set and it should
* also get the 'mempool' call sent once which will make that peer respond with
* all matches of the bloom filters which are in the mempool.
*
* The tricky part is that we should have the latest bloom filter set on each
* of those peers before the mempool is sent, since they work together.
* Also if we are behind, the mempool should not be sent. (It can cause problems
* in the wallet if we receive unconfirmed transactions before mined transactions)
*
* So, when a peer starts downloading merkle blocks, the bloom filter of all the
* other peers becomes invalid the moment a match is found by an actual wallet.
* This is Ok on one peer because the merkleblock automatically updates the
* filter on match on the server side, but obviously not on the other peers
* we have for that wallet.
*
* The approach we follow is that as soon as a sync-run is done on a single peer
* (we do a main and also a backup sync), we tell the wallet to re-build its
* bloom filter for _all_ peers and if the sync that peer did leads us to be at
* the chain-tip, we also send each peer the mempool command.
*/
bool didSomething = false;
const int currentBlockHeight = m_dlm->blockHeight();
// fill the 'wallets' struct with the current state of things.
std::vector<WalletInfo> wallets;
std::set<std::shared_ptr<Peer> > unassignedPeers;
for (auto *segment : m_dlm->connectionManager().segments()) {
if (!segment->enabled())
continue;
WalletInfo info;
info.segment = segment;
for (const auto &peer : m_dlm->connectionManager().connectedPeers()) {
if (peer->privacySegment() == segment) {
info.assignedPeers.insert(peer);
if (peer->merkleDownloadInProgress())
info.downloader = peer;
}
else if (peer->privacySegment() == nullptr
&& peer->peerAddress().segment() == segment->segmentId()) {
info.unassignedPeers.insert(peer);
}
else if (peer->privacySegment() == nullptr && peer->peerAddress().segment() == 0) {
unassignedPeers.insert(peer);
}
}
wallets.push_back(info);
}
// sort by prio
std::sort(wallets.begin(), wallets.end(), [](const WalletInfo &a, const WalletInfo &b) {
return a.segment->priority() < b.segment->priority();
});
// Half of the work of this action is finding appropriate peers for a privacy-segment (aka wallet).
// We open connections to peers
// We wait for them to have 'received' recent headers, which indicates the peer is on our chain.
// We then assign a wallet to them.
// The other half is managing those peers actually downloading the merkle blocks to get our SPV based data.
const auto now = boost::posix_time::microsec_clock::universal_time();
const uint32_t nowInSec = time(nullptr);
const uint32_t headersOkTime = 3600 * 24 * 7 * 3; // 3 weeks
//-- find peers that are unassigned and which are verified to be on our chain.
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
for (auto peer : wallet->unassignedPeers) {
if (peer->receivedHeaders()
|| nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
logInfo(2025) << "Peer previously used is still Ok, assigning it to wallet" << wallet->segment->segmentId();
// assign peer and send old transactions
peer->setPrivacySegment(wallet->segment);
didSomething = true;
if (peer->relaysTransactions()) {
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
logInfo(2025) << "broadcasting transactions from wallet to new peer";
auto tx = weakTx.lock();
if (tx && peer->privacySegment() == wallet->segment)
peer->sendTx(tx);
}
}
}
}
}
// check if our unassigned peers (without segment-id) can be assigned
for (auto peer : unassignedPeers) {
if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
// can be assigned!
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { // are sorted by prio
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
break;
if (static_cast<int>(wallet->assignedPeers.size() + wallet->unassignedPeers.size()) < MinPeersPerWallet) {
logInfo(2025) << "Found free floating peer, assigning it to wallet" << wallet->segment->segmentId() << "(count:" << wallet->assignedPeers.size() + 1 << ")";
peer->setPrivacySegment(wallet->segment);
if (peer->relaysTransactions()) {
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
auto tx = weakTx.lock();
if (tx && peer->privacySegment() == wallet->segment)
peer->sendTx(tx);
}
}
wallet->assignedPeers.insert(peer);
peer->peerAddress().setSegment(wallet->segment->segmentId()); // remember this assignment
didSomething = true;
break;
}
}
}
}
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
break;
int peerCount = wallet->unassignedPeers.size() + wallet->assignedPeers.size();
if (peerCount < MinPeersPerWallet) {
// start some new connections
if (wallet->segment->firstBlock() == -1 || wallet->segment->firstBlock() > currentBlockHeight)
continue;
// start new connections
while (peerCount < MinPeersPerWallet) {
auto address = m_dlm->connectionManager().peerAddressDb()
.findBest(/*network and bloom*/ 1 | 4, wallet->segment->segmentId());
if (!address.isValid())
break;
logInfo(2025) << "Creating a new connection for PrivacySegment" << wallet->segment->segmentId() << "(count:" << peerCount + 1 << ")";
didSomething = true;
m_dlm->connectionManager().connect(address);
++peerCount;
}
}
}
// ----
// Now we focus on what those connected peers can do for our node. Downloading merkle blocks
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
break;
if (m_quietCount == 5) // we skip this phase when there is nothing to do.
break;
auto infoIter = m_segmentInfos.find(wallet->segment);
if (infoIter == m_segmentInfos.end()) {
m_segmentInfos.insert(std::make_pair(wallet->segment, Info()));
infoIter = m_segmentInfos.find(wallet->segment);
}
assert(infoIter != m_segmentInfos.end());
Info &info = infoIter->second;
if (!info.previousDownloads.empty()) {
PeerDownloadInfo &pdi = info.previousDownloads.back();
if (pdi.toBlock == 0 // started download
&& wallet->downloader == nullptr ) { // finished download
// So it started a download and nobody is downloading anymore. Lets
// update the PeerDownloadInfo with the last blockheight we downloaded.
if (pdi.fromBlock == wallet->segment->backupSyncHeight() + 1)
pdi.toBlock = wallet->segment->lastBlockSynched();
else
pdi.toBlock = wallet->segment->backupSyncHeight();
if (pdi.fromBlock > pdi.toBlock) // download aborted
pdi.fromBlock = 0;
assert(pdi.toBlock >= pdi.fromBlock);
// after a range of blocks finished downloading, we assume that it found something and thus the
// filter on the other peers need to have the new utxos added to be able to act on them.
logInfo(2025) << "A peer stopped downloading, broadcasting filter to its wallet-peers" << wallet->segment->segmentId();
wallet->segment->rebuildFilter(pdi.toBlock == currentBlockHeight
? PrivacySegment::FilterAtTIp : PrivacySegment::NotAtTip);
}
}
if (wallet->assignedPeers.empty()) // nobody to assign the download to, no work to do
continue;
auto privSegment = wallet->segment;
logInfo(2025) << "WalletID:" << privSegment->segmentId()
<< "origStart:" << privSegment->firstBlock()
<< "lastBlockSynched:" << privSegment->lastBlockSynched()
<< "backupSyncHeight:" << privSegment->backupSyncHeight();
// wallet is in need of downloading more merkle blocks ?
if (privSegment->firstBlock() > 1
&& currentBlockHeight > privSegment->firstBlock()
&& (privSegment->lastBlockSynched() < currentBlockHeight
|| privSegment->backupSyncHeight() < currentBlockHeight)) {
// is behind. Is some peer downloading?
if (wallet->downloader) {
didSomething = true; // as long as there is a need to download, we continue.
auto curPeer = wallet->downloader;
// remember the downloader so we avoid asking the same peer to download the backup.
// lets see if the peer is making progress.
const int64_t timePassed = (now - info.lastCheckedTime).total_milliseconds();;
const int32_t blocksDone = curPeer->lastReceivedMerkle() - info.lastHeight;
logInfo(2025) << " +- downloading using peer" << curPeer->connectionId()
<< "prevHeight:" << info.lastHeight
<< "curHeight:" << curPeer->lastReceivedMerkle();
if (timePassed > 4200) {
if (blocksDone < timePassed / 1000) {
// peer is stalling. I expect at least 1 block a second.
// we give them 2 segments try, or only one if they never started a single merkle dl
if (info.slowPunishment++ > 2 || curPeer->lastReceivedMerkle() == 0) {
logWarning(2025) << "SyncSPV disconnects peer"
<< curPeer->connectionId()
<< "that is stalling download of merkle-blocks";
m_dlm->connectionManager().punish(curPeer, PUNISHMENT_MAX);
wallet->downloader.reset();
curPeer = nullptr;
}
} else if (blocksDone > 20) {
info.slowPunishment = 0;
}
if (curPeer) { // start new section every couple of seconds
info.lastHeight = curPeer->lastReceivedMerkle();
info.lastCheckedTime = now;
}
}
}
/*
* lets assign a downloader.
* A 'wallet' (aka PrivacySegment) needs at least one pass by a peer to download all
* merkle blocks. We do this by picking a peer and calling startMerkleDownload() on it.
*
* This code also takes into consideration the fact that we should not trust
* a single random node on the Internet. So when we finished asking one peer
* we go and assign a second peer to be our backup. With probably the same
* result, but maybe with more transactions.
*/
if (wallet->downloader == nullptr && !wallet->assignedPeers.empty()) {
logDebug(2025) << " +- looking for a new downloader";
logDebug(2025) << " += chain:" << m_dlm->blockHeight()
<< "wallet:" << privSegment->lastBlockSynched()
<< "wallet-backup:" << privSegment->backupSyncHeight();
int from;
if (m_dlm->blockHeight() == privSegment->lastBlockSynched())
from = privSegment->backupSyncHeight();
else
from = privSegment->lastBlockSynched();
for (auto &p : wallet->assignedPeers) {
// logDebug(2025) << " + " << p->connectionId();
bool ok = true;
for (auto pdi : info.previousDownloads) {
if (pdi.peerId == p->connectionId()) {
if (from < pdi.toBlock) {// this one already downloaded for us
// logDebug(2025) << "Skipping peer because it downloaded before";
ok = false;
break;
}
if (p->peerHeight() <= from) {
// logDebug(2025) << "Skipping peer because its behind";
ok = false;
break;
}
}
if (!ok)
break;
}
if (!ok)
continue;
wallet->downloader = p;
break;
}
if (wallet->downloader) {
logDebug(2025) << " +- wallet merkle-download started on peer" << wallet->downloader->connectionId()
<< privSegment->lastBlockSynched()
<< privSegment->backupSyncHeight();
didSomething = true;
wallet->downloader->startMerkleDownload(from + 1); // +1 because we start one after the last downloaded
info.previousDownloads.push_back({wallet->downloader->connectionId(), from + 1, 0});
info.lastHeight = from;
info.lastCheckedTime = now;
}
}
}
}
bool suppliedAllPeers = true;
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
continue;
if (static_cast<int>(wallet->assignedPeers.size()) < MinPeersPerWallet) {
suppliedAllPeers = false;
break;
}
}
if (!suppliedAllPeers || didSomething) {
m_quietCount = 0;
} else if (m_quietCount < 5) {
// we print that we are done.
// notice that we do NOT exit.
// This action will continue to monitor if each wallet has enough peers.
if (++m_quietCount == 5)
logInfo(2025) << "==== SyncSPVAction ==== DONE";
}
again();
}