3a9863dbe3
The bloom filter includes all the unspent outputs, so we can just use the latest one if we want to get any transactions the first peer omitted.
301 lines
13 KiB
C++
301 lines
13 KiB
C++
/*
|
|
* This file is part of the Flowee project
|
|
* Copyright (C) 2020-2021 Tom Zander <tom@flowee.org>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "SyncSPVAction.h"
|
|
#include "PrivacySegment.h"
|
|
#include "DownloadManager.h"
|
|
#include "Peer.h"
|
|
|
|
#include <set>
|
|
|
|
SyncSPVAction::SyncSPVAction(DownloadManager *parent)
|
|
: Action(parent),
|
|
MinPeersPerWallet(parent->chain() == P2PNet::MainChain ? 3 : 1)
|
|
{
|
|
}
|
|
|
|
namespace {
|
|
struct WalletInfo {
|
|
PrivacySegment *segment;
|
|
std::set<std::shared_ptr<Peer> > unassignedPeers;
|
|
std::set<std::shared_ptr<Peer> > assignedPeers;
|
|
std::shared_ptr<Peer> downloader;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
void SyncSPVAction::execute(const boost::system::error_code &error)
|
|
{
|
|
if (error)
|
|
return;
|
|
|
|
bool didSomething = false;
|
|
const int currentBlockHeight = m_dlm->blockHeight();
|
|
// fill the 'wallets' struct with the current state of things.
|
|
std::vector<WalletInfo> wallets;
|
|
std::set<std::shared_ptr<Peer> > unassignedPeers;
|
|
for (auto *segment : m_dlm->connectionManager().segments()) {
|
|
WalletInfo info;
|
|
info.segment = segment;
|
|
|
|
for (const auto &peer : m_dlm->connectionManager().connectedPeers()) {
|
|
if (peer->privacySegment() == segment) {
|
|
info.assignedPeers.insert(peer);
|
|
if (peer->merkleDownloadInProgress())
|
|
info.downloader = peer;
|
|
}
|
|
else if (peer->privacySegment() == nullptr
|
|
&& peer->peerAddress().segment() == segment->segmentId()) {
|
|
info.unassignedPeers.insert(peer);
|
|
}
|
|
else if (peer->privacySegment() == nullptr && peer->peerAddress().segment() == 0) {
|
|
unassignedPeers.insert(peer);
|
|
}
|
|
}
|
|
wallets.push_back(info);
|
|
}
|
|
// sort by prio
|
|
std::sort(wallets.begin(), wallets.end(), [](const WalletInfo &a, const WalletInfo &b) {
|
|
return a.segment->priority() < b.segment->priority();
|
|
});
|
|
|
|
// Half of the work of this action is finding appropriate peers for a privacy-segment (aka wallet).
|
|
// We open connections to peers
|
|
// We wait for them to have 'received' recent headers, which indicates the peer is on our chain.
|
|
// We then assign a wallet to them.
|
|
// The other half is managing those peers actually downloading the merkle blocks to get our SPV based data.
|
|
|
|
// For stability sake, we take those steps backwards and because we repeat every second or so a peer will
|
|
// go through the whole process in steps.
|
|
|
|
const auto now = boost::posix_time::microsec_clock::universal_time();
|
|
const uint32_t nowInSec = time(nullptr);
|
|
const uint32_t headersOkTime = 3600 * 40; // 40 hours
|
|
//-- find peers that are unassigned and which are verified to be on our chain.
|
|
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
|
|
for (auto peer : wallet->unassignedPeers) {
|
|
if (peer->receivedHeaders()
|
|
|| nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
|
|
|
|
// assign peer and send old transactions
|
|
peer->setPrivacySegment(wallet->segment);
|
|
didSomething = true;
|
|
if (peer->relaysTransactions()) {
|
|
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
|
|
auto tx = weakTx.lock();
|
|
if (tx && peer->privacySegment() == wallet->segment)
|
|
peer->sendTx(tx);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// check if our unassigned peers can be assigned
|
|
for (auto peer : unassignedPeers) {
|
|
if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
|
|
// can be assigned!
|
|
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { // are sorted by prio
|
|
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
|
|
break;
|
|
if (wallet->assignedPeers.size() + wallet->unassignedPeers.size() < MinPeersPerWallet) {
|
|
peer->setPrivacySegment(wallet->segment);
|
|
if (peer->relaysTransactions()) {
|
|
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
|
|
auto tx = weakTx.lock();
|
|
if (tx && peer->privacySegment() == wallet->segment)
|
|
peer->sendTx(tx);
|
|
}
|
|
}
|
|
wallet->assignedPeers.insert(peer);
|
|
peer->peerAddress().setSegment(wallet->segment->segmentId()); // remember this assignment
|
|
didSomething = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
|
|
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
|
|
break;
|
|
int peerCount = wallet->unassignedPeers.size() + wallet->assignedPeers.size();
|
|
if (peerCount < MinPeersPerWallet) {
|
|
// start some new connections
|
|
if (wallet->segment->firstBlock() == -1 || wallet->segment->firstBlock() > currentBlockHeight)
|
|
continue;
|
|
|
|
// start new connections
|
|
while (peerCount < MinPeersPerWallet) {
|
|
auto address = m_dlm->connectionManager().peerAddressDb()
|
|
.findBest(/*network and bloom*/ 1 | 4, wallet->segment->segmentId());
|
|
if (!address.isValid())
|
|
break;
|
|
|
|
logInfo() << "creating a new connection for PrivacySegment" << wallet->segment->segmentId();
|
|
didSomething = true;
|
|
m_dlm->connectionManager().connect(address);
|
|
++peerCount;
|
|
}
|
|
}
|
|
}
|
|
|
|
// ----
|
|
// Now we focus on what those connected peers can do for our node. Downloading merkle blocks
|
|
|
|
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
|
|
auto infoIter = m_segmentInfos.find(wallet->segment);
|
|
if (infoIter == m_segmentInfos.end()) {
|
|
m_segmentInfos.insert(std::make_pair(wallet->segment, Info()));
|
|
infoIter = m_segmentInfos.find(wallet->segment);
|
|
}
|
|
assert(infoIter != m_segmentInfos.end());
|
|
Info &info = infoIter->second;
|
|
|
|
if (!info.previousDownloads.empty()) {
|
|
PeerDownloadInfo &pdi = info.previousDownloads.back();
|
|
if (pdi.toBlock == 0 // started download
|
|
&& wallet->downloader == nullptr ) { // finished download
|
|
|
|
// So it started a download and nobody is downloading anymore. Lets
|
|
// update the PeerDownloadInfo with the last blockheight we downloaded.
|
|
if (pdi.fromBlock == wallet->segment->backupSyncHeight() + 1)
|
|
pdi.toBlock = wallet->segment->lastBlockSynched();
|
|
else
|
|
pdi.toBlock = wallet->segment->backupSyncHeight();
|
|
|
|
if (pdi.fromBlock > pdi.toBlock) // download aborted
|
|
pdi.fromBlock = 0;
|
|
logDebug() << "registring a download completed for"
|
|
<< pdi.peerId << "from" << pdi.fromBlock << "to" << pdi.toBlock;
|
|
assert(pdi.toBlock >= pdi.fromBlock);
|
|
}
|
|
}
|
|
if (wallet->assignedPeers.empty())
|
|
continue;
|
|
|
|
auto privSegment = wallet->segment;
|
|
logDebug() << "WalletID:" << privSegment->segmentId()
|
|
<< "origStart:" << privSegment->firstBlock()
|
|
<< "lastBlockSynched:" << privSegment->lastBlockSynched()
|
|
<< "backupSyncHeight:" << privSegment->backupSyncHeight();
|
|
|
|
if (privSegment->firstBlock() > 1
|
|
&& currentBlockHeight > privSegment->firstBlock()
|
|
&& (privSegment->lastBlockSynched() < currentBlockHeight
|
|
|| privSegment->backupSyncHeight() < currentBlockHeight)) {
|
|
|
|
// is behind. Is someone downloading?
|
|
if (wallet->downloader) {
|
|
didSomething = true; // as long as there is a need to download, we continue.
|
|
auto curPeer = wallet->downloader;
|
|
// remember the downloader so we avoid asking the same peer to download the backup.
|
|
|
|
// lets see if the peer is making progress.
|
|
const int64_t timePassed = (now - info.lastCheckedTime).total_milliseconds();;
|
|
const int32_t blocksDone = curPeer->lastReceivedMerkle() - info.lastHeight;
|
|
logDebug() << "Downloading using peer" << curPeer->connectionId()
|
|
<< "prevHeight:" << info.lastHeight
|
|
<< "curHeight:" << curPeer->lastReceivedMerkle();
|
|
|
|
if (timePassed > 4200) {
|
|
if (blocksDone < timePassed / 1000) {
|
|
// peer is stalling. I expect at least 1 block a second.
|
|
// we give them 2 segments try, or only one if they never started a single merkle dl
|
|
if (info.slowPunishment++ > 2 || curPeer->lastReceivedMerkle() == 0) {
|
|
logWarning() << "SyncSPV disconnects peer"
|
|
<< curPeer->connectionId()
|
|
<< "that is stalling download of merkle-blocks";
|
|
m_dlm->connectionManager().punish(curPeer, PUNISHMENT_MAX);
|
|
wallet->downloader.reset();
|
|
curPeer = nullptr;
|
|
}
|
|
} else if (blocksDone > 20) {
|
|
info.slowPunishment = 0;
|
|
}
|
|
if (curPeer) { // start new section every couple of seconds
|
|
info.lastHeight = curPeer->lastReceivedMerkle();
|
|
info.lastCheckedTime = now;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* lets assign a downloader.
|
|
* A 'wallet' (aka PrivacySegment) needs at least one pass by a peer to download all
|
|
* merkle blocks. We do this by picking a peer and calling startMerkleDownload() on it.
|
|
*
|
|
* This code also takes into consideration the fact that we should not trust
|
|
* a single random node on the Internet. So when we finished asking one peer
|
|
* we go and assign a second peer to be our backup. With probably the same
|
|
* result, but maybe with more transactions.
|
|
*/
|
|
if (wallet->downloader == nullptr && !wallet->assignedPeers.empty()) {
|
|
logDebug() << "looking for a new downloader";
|
|
const int from = privSegment->backupSyncHeight();
|
|
for (auto &p : wallet->assignedPeers) {
|
|
// logDebug() << " + " << p->connectionId();
|
|
bool ok = true;
|
|
for (auto pdi : info.previousDownloads) {
|
|
if (pdi.peerId == p->connectionId()) {
|
|
if (from < pdi.toBlock) {// this one already downloaded for us
|
|
// logDebug() << "Skipping peer because it downloaded before";
|
|
ok = false;
|
|
break;
|
|
}
|
|
if (p->peerHeight() <= from) {
|
|
// logDebug() << "Skipping peer because its behind";
|
|
ok = false;
|
|
break;
|
|
}
|
|
}
|
|
if (!ok)
|
|
break;
|
|
}
|
|
if (!ok)
|
|
continue;
|
|
|
|
wallet->downloader = p;
|
|
break;
|
|
}
|
|
if (wallet->downloader) {
|
|
logDebug() << "Wallet merkle-download started on peer" << wallet->downloader->connectionId()
|
|
<< privSegment->backupSyncHeight()
|
|
<< privSegment->lastBlockSynched();
|
|
didSomething = true;
|
|
wallet->downloader->startMerkleDownload(from + 1); // +1 because we start one after the last downloaded
|
|
info.previousDownloads.push_back({wallet->downloader->connectionId(), from + 1, 0});
|
|
info.lastHeight = from;
|
|
info.lastCheckedTime = now;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (didSomething) {
|
|
m_quietCount = 0;
|
|
} else if (m_quietCount++ > 2) {
|
|
logInfo() << "SyncSPVAction done";
|
|
m_dlm->done(this);
|
|
return;
|
|
}
|
|
again();
|
|
}
|