Files
thehub/libs/p2p/SyncSPVAction.cpp
T
tomFlowee 3a9863dbe3 Simplify the backup sync feature.
The bloom filter includes all the unspent outputs, so we can just
use the latest one if we want to get any transactions the first
peer omitted.
2021-11-26 18:04:38 +01:00

301 lines
13 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2020-2021 Tom Zander <tom@flowee.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "SyncSPVAction.h"
#include "PrivacySegment.h"
#include "DownloadManager.h"
#include "Peer.h"
#include <set>
SyncSPVAction::SyncSPVAction(DownloadManager *parent)
: Action(parent),
MinPeersPerWallet(parent->chain() == P2PNet::MainChain ? 3 : 1)
{
}
namespace {
struct WalletInfo {
PrivacySegment *segment;
std::set<std::shared_ptr<Peer> > unassignedPeers;
std::set<std::shared_ptr<Peer> > assignedPeers;
std::shared_ptr<Peer> downloader;
};
}
void SyncSPVAction::execute(const boost::system::error_code &error)
{
if (error)
return;
bool didSomething = false;
const int currentBlockHeight = m_dlm->blockHeight();
// fill the 'wallets' struct with the current state of things.
std::vector<WalletInfo> wallets;
std::set<std::shared_ptr<Peer> > unassignedPeers;
for (auto *segment : m_dlm->connectionManager().segments()) {
WalletInfo info;
info.segment = segment;
for (const auto &peer : m_dlm->connectionManager().connectedPeers()) {
if (peer->privacySegment() == segment) {
info.assignedPeers.insert(peer);
if (peer->merkleDownloadInProgress())
info.downloader = peer;
}
else if (peer->privacySegment() == nullptr
&& peer->peerAddress().segment() == segment->segmentId()) {
info.unassignedPeers.insert(peer);
}
else if (peer->privacySegment() == nullptr && peer->peerAddress().segment() == 0) {
unassignedPeers.insert(peer);
}
}
wallets.push_back(info);
}
// sort by prio
std::sort(wallets.begin(), wallets.end(), [](const WalletInfo &a, const WalletInfo &b) {
return a.segment->priority() < b.segment->priority();
});
// Half of the work of this action is finding appropriate peers for a privacy-segment (aka wallet).
// We open connections to peers
// We wait for them to have 'received' recent headers, which indicates the peer is on our chain.
// We then assign a wallet to them.
// The other half is managing those peers actually downloading the merkle blocks to get our SPV based data.
// For stability sake, we take those steps backwards and because we repeat every second or so a peer will
// go through the whole process in steps.
const auto now = boost::posix_time::microsec_clock::universal_time();
const uint32_t nowInSec = time(nullptr);
const uint32_t headersOkTime = 3600 * 40; // 40 hours
//-- find peers that are unassigned and which are verified to be on our chain.
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
for (auto peer : wallet->unassignedPeers) {
if (peer->receivedHeaders()
|| nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
// assign peer and send old transactions
peer->setPrivacySegment(wallet->segment);
didSomething = true;
if (peer->relaysTransactions()) {
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
auto tx = weakTx.lock();
if (tx && peer->privacySegment() == wallet->segment)
peer->sendTx(tx);
}
}
}
}
}
// check if our unassigned peers can be assigned
for (auto peer : unassignedPeers) {
if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
// can be assigned!
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { // are sorted by prio
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
break;
if (wallet->assignedPeers.size() + wallet->unassignedPeers.size() < MinPeersPerWallet) {
peer->setPrivacySegment(wallet->segment);
if (peer->relaysTransactions()) {
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
auto tx = weakTx.lock();
if (tx && peer->privacySegment() == wallet->segment)
peer->sendTx(tx);
}
}
wallet->assignedPeers.insert(peer);
peer->peerAddress().setSegment(wallet->segment->segmentId()); // remember this assignment
didSomething = true;
break;
}
}
}
}
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
break;
int peerCount = wallet->unassignedPeers.size() + wallet->assignedPeers.size();
if (peerCount < MinPeersPerWallet) {
// start some new connections
if (wallet->segment->firstBlock() == -1 || wallet->segment->firstBlock() > currentBlockHeight)
continue;
// start new connections
while (peerCount < MinPeersPerWallet) {
auto address = m_dlm->connectionManager().peerAddressDb()
.findBest(/*network and bloom*/ 1 | 4, wallet->segment->segmentId());
if (!address.isValid())
break;
logInfo() << "creating a new connection for PrivacySegment" << wallet->segment->segmentId();
didSomething = true;
m_dlm->connectionManager().connect(address);
++peerCount;
}
}
}
// ----
// Now we focus on what those connected peers can do for our node. Downloading merkle blocks
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
auto infoIter = m_segmentInfos.find(wallet->segment);
if (infoIter == m_segmentInfos.end()) {
m_segmentInfos.insert(std::make_pair(wallet->segment, Info()));
infoIter = m_segmentInfos.find(wallet->segment);
}
assert(infoIter != m_segmentInfos.end());
Info &info = infoIter->second;
if (!info.previousDownloads.empty()) {
PeerDownloadInfo &pdi = info.previousDownloads.back();
if (pdi.toBlock == 0 // started download
&& wallet->downloader == nullptr ) { // finished download
// So it started a download and nobody is downloading anymore. Lets
// update the PeerDownloadInfo with the last blockheight we downloaded.
if (pdi.fromBlock == wallet->segment->backupSyncHeight() + 1)
pdi.toBlock = wallet->segment->lastBlockSynched();
else
pdi.toBlock = wallet->segment->backupSyncHeight();
if (pdi.fromBlock > pdi.toBlock) // download aborted
pdi.fromBlock = 0;
logDebug() << "registring a download completed for"
<< pdi.peerId << "from" << pdi.fromBlock << "to" << pdi.toBlock;
assert(pdi.toBlock >= pdi.fromBlock);
}
}
if (wallet->assignedPeers.empty())
continue;
auto privSegment = wallet->segment;
logDebug() << "WalletID:" << privSegment->segmentId()
<< "origStart:" << privSegment->firstBlock()
<< "lastBlockSynched:" << privSegment->lastBlockSynched()
<< "backupSyncHeight:" << privSegment->backupSyncHeight();
if (privSegment->firstBlock() > 1
&& currentBlockHeight > privSegment->firstBlock()
&& (privSegment->lastBlockSynched() < currentBlockHeight
|| privSegment->backupSyncHeight() < currentBlockHeight)) {
// is behind. Is someone downloading?
if (wallet->downloader) {
didSomething = true; // as long as there is a need to download, we continue.
auto curPeer = wallet->downloader;
// remember the downloader so we avoid asking the same peer to download the backup.
// lets see if the peer is making progress.
const int64_t timePassed = (now - info.lastCheckedTime).total_milliseconds();;
const int32_t blocksDone = curPeer->lastReceivedMerkle() - info.lastHeight;
logDebug() << "Downloading using peer" << curPeer->connectionId()
<< "prevHeight:" << info.lastHeight
<< "curHeight:" << curPeer->lastReceivedMerkle();
if (timePassed > 4200) {
if (blocksDone < timePassed / 1000) {
// peer is stalling. I expect at least 1 block a second.
// we give them 2 segments try, or only one if they never started a single merkle dl
if (info.slowPunishment++ > 2 || curPeer->lastReceivedMerkle() == 0) {
logWarning() << "SyncSPV disconnects peer"
<< curPeer->connectionId()
<< "that is stalling download of merkle-blocks";
m_dlm->connectionManager().punish(curPeer, PUNISHMENT_MAX);
wallet->downloader.reset();
curPeer = nullptr;
}
} else if (blocksDone > 20) {
info.slowPunishment = 0;
}
if (curPeer) { // start new section every couple of seconds
info.lastHeight = curPeer->lastReceivedMerkle();
info.lastCheckedTime = now;
}
}
}
/*
* lets assign a downloader.
* A 'wallet' (aka PrivacySegment) needs at least one pass by a peer to download all
* merkle blocks. We do this by picking a peer and calling startMerkleDownload() on it.
*
* This code also takes into consideration the fact that we should not trust
* a single random node on the Internet. So when we finished asking one peer
* we go and assign a second peer to be our backup. With probably the same
* result, but maybe with more transactions.
*/
if (wallet->downloader == nullptr && !wallet->assignedPeers.empty()) {
logDebug() << "looking for a new downloader";
const int from = privSegment->backupSyncHeight();
for (auto &p : wallet->assignedPeers) {
// logDebug() << " + " << p->connectionId();
bool ok = true;
for (auto pdi : info.previousDownloads) {
if (pdi.peerId == p->connectionId()) {
if (from < pdi.toBlock) {// this one already downloaded for us
// logDebug() << "Skipping peer because it downloaded before";
ok = false;
break;
}
if (p->peerHeight() <= from) {
// logDebug() << "Skipping peer because its behind";
ok = false;
break;
}
}
if (!ok)
break;
}
if (!ok)
continue;
wallet->downloader = p;
break;
}
if (wallet->downloader) {
logDebug() << "Wallet merkle-download started on peer" << wallet->downloader->connectionId()
<< privSegment->backupSyncHeight()
<< privSegment->lastBlockSynched();
didSomething = true;
wallet->downloader->startMerkleDownload(from + 1); // +1 because we start one after the last downloaded
info.previousDownloads.push_back({wallet->downloader->connectionId(), from + 1, 0});
info.lastHeight = from;
info.lastCheckedTime = now;
}
}
}
}
if (didSomething) {
m_quietCount = 0;
} else if (m_quietCount++ > 2) {
logInfo() << "SyncSPVAction done";
m_dlm->done(this);
return;
}
again();
}