Files
thehub/libs/p2p/SyncSPVAction.cpp
T
tomFlowee 70e4f2292e Refactor the SPV merkleblock/mempool sending
Tuesdays idea of adding some code into the SyncSPVAction didn't feel
right.
A second look made clear that bloom filter updates make much more sense
to go hand in hand with sending a mempool message. Especially since they
depend on each other on the server side.

To-rehash:
the wallet may decide at any time that a new bloom filter is needed. It
then uses the superclass (code in p2plib) PrivacySegment, to build that
filter. As part of that we get a lock object which, when going out of
scope, makes the peers that are subscribed to the privacySegment send
out the filter.

This separation of concerns means that the subclass wallet in the app
doens't know about peers or messages, only its superclass PrivacySegment
does.

What we did in this change is make the PrivacySegment class decide to
combine a bloom update with a mempool call. Typicall only once per
connection.

This means I can remove hacks in the SyncSPVAction which forced the
sending of the mempool message separately.
2023-02-24 19:41:42 +01:00

323 lines
14 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2020-2023 Tom Zander <tom@flowee.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "SyncSPVAction.h"
#include "PrivacySegment.h"
#include "DownloadManager.h"
#include "Peer.h"
#include <set>
SyncSPVAction::SyncSPVAction(DownloadManager *parent)
: Action(parent),
MinPeersPerWallet(parent->chain() == P2PNet::MainChain ? 3 : 1)
{
}
namespace {
struct WalletInfo {
PrivacySegment *segment;
/*
* peers that we connect to are remembered between runs and in that state
* we also store the segment-id.
* So an unassigned peer can actually be associated to one wallet and such
* peers are stored in this struct as opposed to a 'global' one (scope=execute() method).
*/
std::set<std::shared_ptr<Peer> > unassignedPeers;
std::set<std::shared_ptr<Peer> > assignedPeers;
std::shared_ptr<Peer> downloader;
};
}
void SyncSPVAction::execute(const boost::system::error_code &error)
{
if (error)
return;
bool didSomething = false;
const int currentBlockHeight = m_dlm->blockHeight();
// fill the 'wallets' struct with the current state of things.
std::vector<WalletInfo> wallets;
std::set<std::shared_ptr<Peer> > unassignedPeers;
for (auto *segment : m_dlm->connectionManager().segments()) {
if (!segment->enabled())
continue;
WalletInfo info;
info.segment = segment;
for (const auto &peer : m_dlm->connectionManager().connectedPeers()) {
if (peer->privacySegment() == segment) {
info.assignedPeers.insert(peer);
if (peer->merkleDownloadInProgress())
info.downloader = peer;
}
else if (peer->privacySegment() == nullptr
&& peer->peerAddress().segment() == segment->segmentId()) {
info.unassignedPeers.insert(peer);
}
else if (peer->privacySegment() == nullptr && peer->peerAddress().segment() == 0) {
unassignedPeers.insert(peer);
}
}
wallets.push_back(info);
}
// sort by prio
std::sort(wallets.begin(), wallets.end(), [](const WalletInfo &a, const WalletInfo &b) {
return a.segment->priority() < b.segment->priority();
});
// Half of the work of this action is finding appropriate peers for a privacy-segment (aka wallet).
// We open connections to peers
// We wait for them to have 'received' recent headers, which indicates the peer is on our chain.
// We then assign a wallet to them.
// The other half is managing those peers actually downloading the merkle blocks to get our SPV based data.
// For stability sake, we take those steps backwards and because we repeat every second or so a peer will
// go through the whole process in steps.
const auto now = boost::posix_time::microsec_clock::universal_time();
const uint32_t nowInSec = time(nullptr);
const uint32_t headersOkTime = 3600 * 40; // 40 hours
//-- find peers that are unassigned and which are verified to be on our chain.
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
for (auto peer : wallet->unassignedPeers) {
if (peer->receivedHeaders()
|| nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
logInfo() << "Peer previously used is still Ok, assigning it to wallet" << wallet->segment->segmentId();
// assign peer and send old transactions
peer->setPrivacySegment(wallet->segment);
didSomething = true;
if (peer->relaysTransactions()) {
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
logInfo() << "broadcasting transactions from wallet to new peer";
auto tx = weakTx.lock();
if (tx && peer->privacySegment() == wallet->segment)
peer->sendTx(tx);
}
}
}
}
}
// check if our unassigned peers (without segment-id) can be assigned
for (auto peer : unassignedPeers) {
if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
// can be assigned!
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { // are sorted by prio
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
break;
if (static_cast<int>(wallet->assignedPeers.size() + wallet->unassignedPeers.size()) < MinPeersPerWallet) {
logInfo() << "Found free floating peer, assigning it to wallet" << wallet->segment->segmentId() << "(count:" << wallet->assignedPeers.size() + 1 << ")";
peer->setPrivacySegment(wallet->segment);
if (peer->relaysTransactions()) {
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
auto tx = weakTx.lock();
if (tx && peer->privacySegment() == wallet->segment)
peer->sendTx(tx);
}
}
wallet->assignedPeers.insert(peer);
peer->peerAddress().setSegment(wallet->segment->segmentId()); // remember this assignment
didSomething = true;
break;
}
}
}
}
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
break;
int peerCount = wallet->unassignedPeers.size() + wallet->assignedPeers.size();
if (peerCount < MinPeersPerWallet) {
// start some new connections
if (wallet->segment->firstBlock() == -1 || wallet->segment->firstBlock() > currentBlockHeight)
continue;
// start new connections
while (peerCount < MinPeersPerWallet) {
auto address = m_dlm->connectionManager().peerAddressDb()
.findBest(/*network and bloom*/ 1 | 4, wallet->segment->segmentId());
if (!address.isValid())
break;
logInfo() << "creating a new connection for PrivacySegment" << wallet->segment->segmentId() << "(count:" << peerCount + 1 << ")";
didSomething = true;
m_dlm->connectionManager().connect(address);
++peerCount;
}
}
}
// ----
// Now we focus on what those connected peers can do for our node. Downloading merkle blocks
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
break;
auto infoIter = m_segmentInfos.find(wallet->segment);
if (infoIter == m_segmentInfos.end()) {
m_segmentInfos.insert(std::make_pair(wallet->segment, Info()));
infoIter = m_segmentInfos.find(wallet->segment);
}
assert(infoIter != m_segmentInfos.end());
Info &info = infoIter->second;
if (!info.previousDownloads.empty()) {
PeerDownloadInfo &pdi = info.previousDownloads.back();
if (pdi.toBlock == 0 // started download
&& wallet->downloader == nullptr ) { // finished download
// So it started a download and nobody is downloading anymore. Lets
// update the PeerDownloadInfo with the last blockheight we downloaded.
if (pdi.fromBlock == wallet->segment->backupSyncHeight() + 1)
pdi.toBlock = wallet->segment->lastBlockSynched();
else
pdi.toBlock = wallet->segment->backupSyncHeight();
if (pdi.fromBlock > pdi.toBlock) // download aborted
pdi.fromBlock = 0;
logDebug() << "registring a download completed for"
<< pdi.peerId << "from" << pdi.fromBlock << "to" << pdi.toBlock;
assert(pdi.toBlock >= pdi.fromBlock);
}
}
if (wallet->assignedPeers.empty())
continue;
auto privSegment = wallet->segment;
logDebug() << "WalletID:" << privSegment->segmentId()
<< "origStart:" << privSegment->firstBlock()
<< "lastBlockSynched:" << privSegment->lastBlockSynched()
<< "backupSyncHeight:" << privSegment->backupSyncHeight();
if (privSegment->firstBlock() > 1
&& currentBlockHeight > privSegment->firstBlock()
&& (privSegment->lastBlockSynched() < currentBlockHeight
|| privSegment->backupSyncHeight() < currentBlockHeight)) {
// is behind. Is someone downloading?
if (wallet->downloader) {
didSomething = true; // as long as there is a need to download, we continue.
auto curPeer = wallet->downloader;
// remember the downloader so we avoid asking the same peer to download the backup.
// lets see if the peer is making progress.
const int64_t timePassed = (now - info.lastCheckedTime).total_milliseconds();;
const int32_t blocksDone = curPeer->lastReceivedMerkle() - info.lastHeight;
logDebug() << "Downloading using peer" << curPeer->connectionId()
<< "prevHeight:" << info.lastHeight
<< "curHeight:" << curPeer->lastReceivedMerkle();
if (timePassed > 4200) {
if (blocksDone < timePassed / 1000) {
// peer is stalling. I expect at least 1 block a second.
// we give them 2 segments try, or only one if they never started a single merkle dl
if (info.slowPunishment++ > 2 || curPeer->lastReceivedMerkle() == 0) {
logWarning() << "SyncSPV disconnects peer"
<< curPeer->connectionId()
<< "that is stalling download of merkle-blocks";
m_dlm->connectionManager().punish(curPeer, PUNISHMENT_MAX);
wallet->downloader.reset();
curPeer = nullptr;
}
} else if (blocksDone > 20) {
info.slowPunishment = 0;
}
if (curPeer) { // start new section every couple of seconds
info.lastHeight = curPeer->lastReceivedMerkle();
info.lastCheckedTime = now;
}
}
}
/*
* lets assign a downloader.
* A 'wallet' (aka PrivacySegment) needs at least one pass by a peer to download all
* merkle blocks. We do this by picking a peer and calling startMerkleDownload() on it.
*
* This code also takes into consideration the fact that we should not trust
* a single random node on the Internet. So when we finished asking one peer
* we go and assign a second peer to be our backup. With probably the same
* result, but maybe with more transactions.
*/
if (wallet->downloader == nullptr && !wallet->assignedPeers.empty()) {
logDebug() << "looking for a new downloader";
const int from = privSegment->backupSyncHeight();
for (auto &p : wallet->assignedPeers) {
// logDebug() << " + " << p->connectionId();
bool ok = true;
for (auto pdi : info.previousDownloads) {
if (pdi.peerId == p->connectionId()) {
if (from < pdi.toBlock) {// this one already downloaded for us
// logDebug() << "Skipping peer because it downloaded before";
ok = false;
break;
}
if (p->peerHeight() <= from) {
// logDebug() << "Skipping peer because its behind";
ok = false;
break;
}
}
if (!ok)
break;
}
if (!ok)
continue;
wallet->downloader = p;
break;
}
if (wallet->downloader) {
logDebug() << "Wallet merkle-download started on peer" << wallet->downloader->connectionId()
<< privSegment->backupSyncHeight()
<< privSegment->lastBlockSynched();
didSomething = true;
wallet->downloader->startMerkleDownload(from + 1); // +1 because we start one after the last downloaded
info.previousDownloads.push_back({wallet->downloader->connectionId(), from + 1, 0});
info.lastHeight = from;
info.lastCheckedTime = now;
}
}
}
}
bool suppliedAllPeers = true;
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
continue;
if (static_cast<int>(wallet->assignedPeers.size()) < MinPeersPerWallet) {
suppliedAllPeers = false;
break;
}
}
if (!suppliedAllPeers || didSomething) {
m_quietCount = 0;
} else if (m_quietCount++ > 2) {
logInfo() << "SyncSPVAction done";
m_dlm->done(this);
return;
}
again();
}