Files
thehub/libs/p2p/SyncSPVAction.cpp
T

301 lines
13 KiB
C++
Raw Permalink Normal View History

2020-04-17 19:33:06 +02:00
/*
* This file is part of the Flowee project
2021-07-30 14:04:58 +02:00
* Copyright (C) 2020-2021 Tom Zander <tom@flowee.org>
2020-04-17 19:33:06 +02:00
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "SyncSPVAction.h"
#include "PrivacySegment.h"
#include "DownloadManager.h"
#include "Peer.h"
#include <set>
SyncSPVAction::SyncSPVAction(DownloadManager *parent)
: Action(parent),
MinPeersPerWallet(parent->chain() == P2PNet::MainChain ? 3 : 1)
2020-04-17 19:33:06 +02:00
{
}
2021-07-30 14:04:58 +02:00
namespace {
2020-04-17 19:33:06 +02:00
struct WalletInfo {
2021-07-30 14:04:58 +02:00
PrivacySegment *segment;
std::set<std::shared_ptr<Peer> > unassignedPeers;
std::set<std::shared_ptr<Peer> > assignedPeers;
std::shared_ptr<Peer> downloader;
2020-04-17 19:33:06 +02:00
};
2021-07-30 14:04:58 +02:00
}
2020-04-17 19:33:06 +02:00
void SyncSPVAction::execute(const boost::system::error_code &error)
{
if (error)
return;
2021-07-30 14:04:58 +02:00
bool didSomething = false;
2020-04-17 19:33:06 +02:00
const int currentBlockHeight = m_dlm->blockHeight();
2021-07-30 14:04:58 +02:00
// fill the 'wallets' struct with the current state of things.
std::vector<WalletInfo> wallets;
std::set<std::shared_ptr<Peer> > unassignedPeers;
for (auto *segment : m_dlm->connectionManager().segments()) {
WalletInfo info;
info.segment = segment;
2020-04-17 19:33:06 +02:00
2021-07-30 14:04:58 +02:00
for (const auto &peer : m_dlm->connectionManager().connectedPeers()) {
if (peer->privacySegment() == segment) {
info.assignedPeers.insert(peer);
2020-04-17 19:33:06 +02:00
if (peer->merkleDownloadInProgress())
2021-07-30 14:04:58 +02:00
info.downloader = peer;
2020-04-17 19:33:06 +02:00
}
2021-07-30 14:04:58 +02:00
else if (peer->privacySegment() == nullptr
&& peer->peerAddress().segment() == segment->segmentId()) {
info.unassignedPeers.insert(peer);
}
else if (peer->privacySegment() == nullptr && peer->peerAddress().segment() == 0) {
unassignedPeers.insert(peer);
}
}
wallets.push_back(info);
}
// sort by prio
std::sort(wallets.begin(), wallets.end(), [](const WalletInfo &a, const WalletInfo &b) {
return a.segment->priority() < b.segment->priority();
});
// Half of the work of this action is finding appropriate peers for a privacy-segment (aka wallet).
// We open connections to peers
// We wait for them to have 'received' recent headers, which indicates the peer is on our chain.
// We then assign a wallet to them.
// The other half is managing those peers actually downloading the merkle blocks to get our SPV based data.
// For stability sake, we take those steps backwards and because we repeat every second or so a peer will
// go through the whole process in steps.
const auto now = boost::posix_time::microsec_clock::universal_time();
const uint32_t nowInSec = time(nullptr);
const uint32_t headersOkTime = 3600 * 40; // 40 hours
//-- find peers that are unassigned and which are verified to be on our chain.
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
for (auto peer : wallet->unassignedPeers) {
if (peer->receivedHeaders()
|| nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
// assign peer and send old transactions
peer->setPrivacySegment(wallet->segment);
didSomething = true;
if (peer->relaysTransactions()) {
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
auto tx = weakTx.lock();
if (tx && peer->privacySegment() == wallet->segment)
peer->sendTx(tx);
}
}
2020-04-17 19:33:06 +02:00
}
}
}
2021-07-30 14:04:58 +02:00
// check if our unassigned peers can be assigned
for (auto peer : unassignedPeers) {
if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
// can be assigned!
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { // are sorted by prio
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
break;
if (wallet->assignedPeers.size() + wallet->unassignedPeers.size() < MinPeersPerWallet) {
2021-07-30 14:04:58 +02:00
peer->setPrivacySegment(wallet->segment);
if (peer->relaysTransactions()) {
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
auto tx = weakTx.lock();
if (tx && peer->privacySegment() == wallet->segment)
peer->sendTx(tx);
}
}
wallet->assignedPeers.insert(peer);
peer->peerAddress().setSegment(wallet->segment->segmentId()); // remember this assignment
didSomething = true;
break;
}
}
}
}
2020-04-17 19:33:06 +02:00
2021-07-30 14:04:58 +02:00
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
2020-10-19 14:04:57 +02:00
break;
2021-07-30 14:04:58 +02:00
int peerCount = wallet->unassignedPeers.size() + wallet->assignedPeers.size();
if (peerCount < MinPeersPerWallet) {
2021-07-30 14:04:58 +02:00
// start some new connections
if (wallet->segment->firstBlock() == -1 || wallet->segment->firstBlock() > currentBlockHeight)
continue;
// start new connections
while (peerCount < MinPeersPerWallet) {
2021-07-30 14:04:58 +02:00
auto address = m_dlm->connectionManager().peerAddressDb()
.findBest(/*network and bloom*/ 1 | 4, wallet->segment->segmentId());
if (!address.isValid())
break;
logInfo() << "creating a new connection for PrivacySegment" << wallet->segment->segmentId();
didSomething = true;
m_dlm->connectionManager().connect(address);
++peerCount;
}
}
}
// ----
// Now we focus on what those connected peers can do for our node. Downloading merkle blocks
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
auto infoIter = m_segmentInfos.find(wallet->segment);
2020-04-17 19:33:06 +02:00
if (infoIter == m_segmentInfos.end()) {
2021-07-30 14:04:58 +02:00
m_segmentInfos.insert(std::make_pair(wallet->segment, Info()));
infoIter = m_segmentInfos.find(wallet->segment);
2020-04-17 19:33:06 +02:00
}
assert(infoIter != m_segmentInfos.end());
Info &info = infoIter->second;
2020-11-13 20:11:06 +01:00
if (!info.previousDownloads.empty()) {
PeerDownloadInfo &pdi = info.previousDownloads.back();
if (pdi.toBlock == 0 // started download
2021-07-30 14:04:58 +02:00
&& wallet->downloader == nullptr ) { // finished download
2020-11-13 20:11:06 +01:00
// So it started a download and nobody is downloading anymore. Lets
// update the PeerDownloadInfo with the last blockheight we downloaded.
2021-07-30 14:04:58 +02:00
if (pdi.fromBlock == wallet->segment->backupSyncHeight() + 1)
pdi.toBlock = wallet->segment->lastBlockSynched();
2020-11-13 20:11:06 +01:00
else
2021-07-30 14:04:58 +02:00
pdi.toBlock = wallet->segment->backupSyncHeight();
2020-11-13 20:11:06 +01:00
if (pdi.fromBlock > pdi.toBlock) // download aborted
pdi.fromBlock = 0;
logDebug() << "registring a download completed for"
<< pdi.peerId << "from" << pdi.fromBlock << "to" << pdi.toBlock;
assert(pdi.toBlock >= pdi.fromBlock);
}
}
2021-07-30 14:04:58 +02:00
if (wallet->assignedPeers.empty())
2020-11-13 20:11:06 +01:00
continue;
2020-04-17 19:33:06 +02:00
2021-07-30 14:04:58 +02:00
auto privSegment = wallet->segment;
2020-11-09 18:38:38 +01:00
logDebug() << "WalletID:" << privSegment->segmentId()
<< "origStart:" << privSegment->firstBlock()
<< "lastBlockSynched:" << privSegment->lastBlockSynched()
<< "backupSyncHeight:" << privSegment->backupSyncHeight();
2021-10-25 21:11:54 +02:00
if (privSegment->firstBlock() > 1
&& currentBlockHeight > privSegment->firstBlock()
2020-04-17 19:33:06 +02:00
&& (privSegment->lastBlockSynched() < currentBlockHeight
|| privSegment->backupSyncHeight() < currentBlockHeight)) {
// is behind. Is someone downloading?
2021-07-30 14:04:58 +02:00
if (wallet->downloader) {
didSomething = true; // as long as there is a need to download, we continue.
auto curPeer = wallet->downloader;
2020-11-09 18:38:38 +01:00
// remember the downloader so we avoid asking the same peer to download the backup.
2020-04-17 19:33:06 +02:00
// lets see if the peer is making progress.
2020-11-09 18:38:38 +01:00
const int64_t timePassed = (now - info.lastCheckedTime).total_milliseconds();;
const int32_t blocksDone = curPeer->lastReceivedMerkle() - info.lastHeight;
logDebug() << "Downloading using peer" << curPeer->connectionId()
<< "prevHeight:" << info.lastHeight
<< "curHeight:" << curPeer->lastReceivedMerkle();
if (timePassed > 4200) {
if (blocksDone < timePassed / 1000) {
// peer is stalling. I expect at least 1 block a second.
// we give them 2 segments try, or only one if they never started a single merkle dl
if (info.slowPunishment++ > 2 || curPeer->lastReceivedMerkle() == 0) {
logWarning() << "SyncSPV disconnects peer"
2021-07-30 14:04:58 +02:00
<< curPeer->connectionId()
2020-11-09 18:38:38 +01:00
<< "that is stalling download of merkle-blocks";
2021-07-30 14:04:58 +02:00
m_dlm->connectionManager().punish(curPeer, PUNISHMENT_MAX);
wallet->downloader.reset();
2020-11-09 18:38:38 +01:00
curPeer = nullptr;
}
} else if (blocksDone > 20) {
info.slowPunishment = 0;
}
if (curPeer) { // start new section every couple of seconds
info.lastHeight = curPeer->lastReceivedMerkle();
info.lastCheckedTime = now;
2020-04-17 19:33:06 +02:00
}
}
}
/*
* lets assign a downloader.
* A 'wallet' (aka PrivacySegment) needs at least one pass by a peer to download all
* merkle blocks. We do this by picking a peer and calling startMerkleDownload() on it.
*
* This code also takes into consideration the fact that we should not trust
* a single random node on the Internet. So when we finished asking one peer
* we go and assign a second peer to be our backup. With probably the same
* result, but maybe with more transactions.
*/
2021-07-30 14:04:58 +02:00
if (wallet->downloader == nullptr && !wallet->assignedPeers.empty()) {
logDebug() << "looking for a new downloader";
2021-11-26 18:04:38 +01:00
const int from = privSegment->backupSyncHeight();
2021-07-30 14:04:58 +02:00
for (auto &p : wallet->assignedPeers) {
2020-11-13 20:11:06 +01:00
// logDebug() << " + " << p->connectionId();
bool ok = true;
for (auto pdi : info.previousDownloads) {
if (pdi.peerId == p->connectionId()) {
if (from < pdi.toBlock) {// this one already downloaded for us
// logDebug() << "Skipping peer because it downloaded before";
ok = false;
break;
}
2021-11-26 18:04:38 +01:00
if (p->peerHeight() <= from) {
// logDebug() << "Skipping peer because its behind";
ok = false;
break;
}
2020-11-13 20:11:06 +01:00
}
if (!ok)
break;
2020-04-17 19:33:06 +02:00
}
2020-11-13 20:11:06 +01:00
if (!ok)
continue;
2021-11-26 18:04:38 +01:00
wallet->downloader = p;
2020-11-13 20:11:06 +01:00
break;
2020-04-17 19:33:06 +02:00
}
2021-11-26 18:04:38 +01:00
if (wallet->downloader) {
logDebug() << "Wallet merkle-download started on peer" << wallet->downloader->connectionId()
2020-11-06 12:56:39 +01:00
<< privSegment->backupSyncHeight()
<< privSegment->lastBlockSynched();
2021-07-30 14:04:58 +02:00
didSomething = true;
2021-11-26 18:04:38 +01:00
wallet->downloader->startMerkleDownload(from + 1); // +1 because we start one after the last downloaded
info.previousDownloads.push_back({wallet->downloader->connectionId(), from + 1, 0});
2020-11-13 20:11:06 +01:00
info.lastHeight = from;
2020-11-09 18:38:38 +01:00
info.lastCheckedTime = now;
2020-04-17 19:33:06 +02:00
}
}
}
}
if (didSomething) {
m_quietCount = 0;
} else if (m_quietCount++ > 2) {
logInfo() << "SyncSPVAction done";
m_dlm->done(this);
return;
}
again();
}