b6fc84696a
This is an abstract class that the application using this library needs to subclass. Ownership and lifetime don't change, it still lies with the app using the library and they still need to add and remove it from the connectionManager, but this makes it much more stable for multi- threading environments and avoids issues on misuse.
410 lines
20 KiB
C++
410 lines
20 KiB
C++
/*
|
|
* This file is part of the Flowee project
|
|
* Copyright (C) 2020-2023 Tom Zander <tom@flowee.org>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "SyncSPVAction.h"
|
|
#include "BroadcastTxData.h"
|
|
#include "PrivacySegment.h"
|
|
#include "DownloadManager.h"
|
|
#include "Peer.h"
|
|
|
|
#include <set>
|
|
|
|
SyncSPVAction::SyncSPVAction(DownloadManager *parent)
|
|
: Action(parent),
|
|
MinPeersPerWallet(parent->chain() == P2PNet::MainChain ? 3 : 1)
|
|
{
|
|
}
|
|
|
|
namespace {
|
|
struct WalletInfo {
|
|
std::shared_ptr<PrivacySegment> segment;
|
|
/*
|
|
* peers that we connect to are remembered between runs and in that state
|
|
* we also store the segment-id.
|
|
* So an unassigned peer can actually be associated to one wallet and such
|
|
* peers are stored in this struct as opposed to a 'global' one (scope=execute() method).
|
|
*/
|
|
std::set<std::shared_ptr<Peer> > unassignedPeers;
|
|
std::set<std::shared_ptr<Peer> > assignedPeers;
|
|
std::shared_ptr<Peer> downloader;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
void SyncSPVAction::execute(const boost::system::error_code &error)
|
|
{
|
|
if (error)
|
|
return;
|
|
|
|
/*
|
|
* the bloom filters and the 'mempool' call.
|
|
*
|
|
* Every peer that we connect to should get the bloom filter set and it should
|
|
* also get the 'mempool' call sent once which will make that peer respond with
|
|
* all matches of the bloom filters which are in the mempool.
|
|
*
|
|
* The tricky part is that we should have the latest bloom filter set on each
|
|
* of those peers before the mempool is sent, since they work together.
|
|
* Also if we are behind, the mempool should not be sent. (It can cause problems
|
|
* in the wallet if we receive unconfirmed transactions before mined transactions)
|
|
*
|
|
* So, when a peer starts downloading merkle blocks, the bloom filter of all the
|
|
* other peers becomes invalid the moment a match is found by an actual wallet.
|
|
* This is Ok on one peer because the merkleblock automatically updates the
|
|
* filter on match on the server side, but obviously not on the other peers
|
|
* we have for that wallet.
|
|
*
|
|
* The approach we follow is that as soon as a sync-run is done on a single peer
|
|
* (we do a main and also a backup sync), we tell the wallet to re-build its
|
|
* bloom filter for _all_ peers and if the sync that peer did leads us to be at
|
|
* the chain-tip, we also send each peer the mempool command.
|
|
*/
|
|
|
|
bool didSomething = false;
|
|
const int currentBlockHeight = m_dlm->blockHeight();
|
|
// fill the 'wallets' struct with the current state of things.
|
|
std::vector<WalletInfo> wallets;
|
|
std::set<std::shared_ptr<Peer> > unassignedPeers;
|
|
for (const auto &segment : m_dlm->connectionManager().segments()) {
|
|
if (!segment->enabled())
|
|
continue;
|
|
WalletInfo info;
|
|
info.segment = segment;
|
|
|
|
for (const auto &peer : m_dlm->connectionManager().connectedPeers()) {
|
|
auto peerSegment = peer->privacySegment().lock();
|
|
if (peerSegment == segment) {
|
|
info.assignedPeers.insert(peer);
|
|
if (peer->merkleDownloadInProgress())
|
|
info.downloader = peer;
|
|
}
|
|
else if (peerSegment.get() == nullptr
|
|
&& peer->peerAddress().segment() == segment->segmentId()) {
|
|
info.unassignedPeers.insert(peer);
|
|
}
|
|
else if (peerSegment.get() == nullptr && peer->peerAddress().segment() == 0) {
|
|
unassignedPeers.insert(peer);
|
|
}
|
|
}
|
|
wallets.push_back(info);
|
|
}
|
|
// sort by prio
|
|
std::sort(wallets.begin(), wallets.end(), [](const WalletInfo &a, const WalletInfo &b) {
|
|
return a.segment->priority() < b.segment->priority();
|
|
});
|
|
|
|
// Half of the work of this action is finding appropriate peers for a privacy-segment (aka wallet).
|
|
// We open connections to peers
|
|
// We wait for them to have 'received' recent headers, which indicates the peer is on our chain.
|
|
// We then assign a wallet to them.
|
|
// The other half is managing those peers actually downloading the merkle blocks to get our SPV based data.
|
|
|
|
const auto now = boost::posix_time::microsec_clock::universal_time();
|
|
const uint32_t nowInSec = time(nullptr);
|
|
const uint32_t headersOkTime = 3600 * 24 * 7 * 3; // 3 weeks
|
|
//-- find peers that are unassigned and which are verified to be on our chain.
|
|
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
|
|
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
|
|
continue;
|
|
for (auto peer : wallet->unassignedPeers) {
|
|
if (peer->receivedHeaders()
|
|
|| nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
|
|
|
|
const auto segmentId = wallet->segment->segmentId();
|
|
logInfo(2025) << "Peer previously used is still Ok, assigning it to wallet" << segmentId;
|
|
// assign peer and send old transactions
|
|
peer->setPrivacySegment(wallet->segment);
|
|
didSomething = true;
|
|
if (peer->relaysTransactions()) {
|
|
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
|
|
auto tx = weakTx.lock();
|
|
if (tx && segmentId == tx->privSegment()) {
|
|
logInfo(2025) << "broadcasting transactions from wallet to new peer" << tx.get()->hash() << "peer:" << peer->connectionId();
|
|
peer->sendTx(tx);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// check if our unassigned peers (without segment-id) can be assigned
|
|
for (auto peer : unassignedPeers) {
|
|
if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) {
|
|
// can be assigned!
|
|
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { // are sorted by prio
|
|
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
|
|
break;
|
|
if (static_cast<int>(wallet->assignedPeers.size() + wallet->unassignedPeers.size()) < MinPeersPerWallet) {
|
|
const auto segmentId = wallet->segment->segmentId();
|
|
logInfo(2025) << "Found free floating peer, assigning it to wallet" << segmentId << "(count:" << wallet->assignedPeers.size() + 1 << ")";
|
|
peer->setPrivacySegment(wallet->segment);
|
|
if (peer->relaysTransactions()) {
|
|
for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) {
|
|
auto tx = weakTx.lock();
|
|
if (tx && segmentId == tx->privSegment()) {
|
|
logInfo(2025) << "broadcasting transactions from wallet to new peer" << tx.get()->hash() << "peer:" << peer->connectionId();
|
|
peer->sendTx(tx);
|
|
}
|
|
}
|
|
}
|
|
wallet->assignedPeers.insert(peer);
|
|
peer->peerAddress().setSegment(wallet->segment->segmentId()); // remember this assignment
|
|
didSomething = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
|
|
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
|
|
break;
|
|
int peerCount = wallet->unassignedPeers.size() + wallet->assignedPeers.size();
|
|
if (peerCount < MinPeersPerWallet) {
|
|
// start some new connections
|
|
if (wallet->segment->firstBlock() == -1 || wallet->segment->firstBlock() > currentBlockHeight)
|
|
continue;
|
|
|
|
// start new connections
|
|
int newConnectionsToOpen = MinPeersPerWallet - peerCount;
|
|
auto infoIter = m_segmentInfos.find(wallet->segment);
|
|
if (infoIter != m_segmentInfos.end() && infoIter->second.connectionsTriedForWallet > 18) {
|
|
assert(newConnectionsToOpen > 0);
|
|
// if we've already opened a lot of connections, we have to recognize that the address-DB
|
|
// has loads of unusable addresses. So we create more new transactions a second in order to
|
|
// find the crypto-coin in the haystack faster.
|
|
newConnectionsToOpen = 10;
|
|
}
|
|
|
|
while (newConnectionsToOpen-- > 0) {
|
|
auto address = m_dlm->connectionManager().peerAddressDb()
|
|
.findBest(/*network and bloom*/ 1 | 4, wallet->segment->segmentId());
|
|
if (!address.isValid())
|
|
break;
|
|
// remember how many IPs we tried.
|
|
auto infoIter = m_segmentInfos.find(wallet->segment);
|
|
if (infoIter != m_segmentInfos.end())
|
|
infoIter->second.connectionsTriedForWallet += 1;
|
|
|
|
logInfo(2025) << "Creating a new connection for PrivacySegment" << wallet->segment->segmentId() << "(count:" << peerCount + 1 << ")";
|
|
didSomething = true;
|
|
m_dlm->connectionManager().connect(address);
|
|
++peerCount;
|
|
}
|
|
}
|
|
else {
|
|
auto infoIter = m_segmentInfos.find(wallet->segment);
|
|
if (infoIter != m_segmentInfos.end() && infoIter->second.connectionsTriedForWallet > 250) {
|
|
infoIter->second.connectionsTriedForWallet = 0;
|
|
// We finished, got enough peers for wallet.
|
|
// but it too really a lot of IPs to try. Let's check if we can extend our DB
|
|
for (auto peer : wallet->assignedPeers) {
|
|
auto address = peer->peerAddress();
|
|
if (!address.askedAddresses()) {
|
|
address.setAskedAddresses(true);
|
|
logInfo() << "SyncSVP, Sending GetAddr msg to" << peer->connectionId();
|
|
peer->sendMessage(Message(Api::LegacyP2P, Api::P2P::GetAddr));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ----
|
|
// Now we focus on what those connected peers can do for our node. Downloading merkle blocks
|
|
|
|
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
|
|
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
|
|
break;
|
|
auto infoIter = m_segmentInfos.find(wallet->segment);
|
|
if (infoIter == m_segmentInfos.end()) {
|
|
m_segmentInfos.insert(std::make_pair(wallet->segment, Info()));
|
|
infoIter = m_segmentInfos.find(wallet->segment);
|
|
}
|
|
assert(infoIter != m_segmentInfos.end());
|
|
Info &info = infoIter->second;
|
|
|
|
if (!info.previousDownloads.empty()) {
|
|
PeerDownloadInfo &pdi = info.previousDownloads.back();
|
|
if (pdi.toBlock == 0 // started download
|
|
&& wallet->downloader == nullptr ) { // finished download
|
|
|
|
// So it started a download and nobody is downloading anymore. Lets
|
|
// update the PeerDownloadInfo with the last blockheight we downloaded.
|
|
if (pdi.fromBlock == wallet->segment->backupSyncHeight() + 1)
|
|
pdi.toBlock = wallet->segment->lastBlockSynched();
|
|
else
|
|
pdi.toBlock = wallet->segment->backupSyncHeight();
|
|
|
|
if (pdi.fromBlock > pdi.toBlock) // download aborted
|
|
pdi.fromBlock = 0;
|
|
assert(pdi.toBlock >= pdi.fromBlock);
|
|
// after a range of blocks finished downloading, we assume that it found something and thus the
|
|
// filter on the other peers need to have the new utxos added to be able to act on them.
|
|
logInfo(2025) << "A peer stopped downloading, broadcasting filter to its wallet-peers" << wallet->segment->segmentId();
|
|
wallet->segment->rebuildFilter(pdi.toBlock == currentBlockHeight
|
|
? PrivacySegment::FilterAtTip : PrivacySegment::NotAtTip);
|
|
}
|
|
}
|
|
if (wallet->assignedPeers.empty()) // nobody to assign the download to, no work to do
|
|
continue;
|
|
|
|
auto privSegment = wallet->segment;
|
|
|
|
// wallet is in need of downloading more merkle blocks ?
|
|
if (privSegment->firstBlock() > 1
|
|
&& currentBlockHeight > privSegment->firstBlock()
|
|
&& (privSegment->lastBlockSynched() < currentBlockHeight
|
|
|| privSegment->backupSyncHeight() < currentBlockHeight)) {
|
|
|
|
logInfo(2025) << "WalletID:" << privSegment->segmentId()
|
|
<< "origStart:" << privSegment->firstBlock()
|
|
<< "lastBlockSynched:" << privSegment->lastBlockSynched()
|
|
<< "backupSyncHeight:" << privSegment->backupSyncHeight();
|
|
|
|
// is behind. Is some peer downloading?
|
|
if (wallet->downloader) {
|
|
didSomething = true; // as long as there is a need to download, we continue.
|
|
auto curPeer = wallet->downloader;
|
|
// remember the downloader so we avoid asking the same peer to download the backup.
|
|
|
|
// lets see if the peer is making progress.
|
|
const int64_t timePassed = (now - info.lastCheckedTime).total_milliseconds();;
|
|
const int32_t blocksDone = curPeer->lastReceivedMerkle() - info.lastHeight;
|
|
logInfo(2025) << " +- downloading using peer" << curPeer->connectionId()
|
|
<< "prevHeight:" << info.lastHeight
|
|
<< "curHeight:" << curPeer->lastReceivedMerkle();
|
|
|
|
if (timePassed > 4200) {
|
|
if (blocksDone < timePassed / 1000) {
|
|
// peer is stalling. I expect at least 1 block a second.
|
|
// we give them 2 segments try, or only one if they never started a single merkle dl
|
|
if (info.slowPunishment++ > 2 || curPeer->lastReceivedMerkle() == 0) {
|
|
logWarning(2025) << "SyncSPV disconnects peer"
|
|
<< curPeer->connectionId()
|
|
<< "that is stalling download of merkle-blocks";
|
|
m_dlm->connectionManager().punish(curPeer, 210);
|
|
m_dlm->connectionManager().disconnect(curPeer);
|
|
wallet->downloader.reset();
|
|
curPeer = nullptr;
|
|
}
|
|
} else if (blocksDone > 20) {
|
|
info.slowPunishment = 0;
|
|
}
|
|
if (curPeer) { // start new section every couple of seconds
|
|
info.lastHeight = curPeer->lastReceivedMerkle();
|
|
info.lastCheckedTime = now;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* lets assign a downloader.
|
|
* A 'wallet' (aka PrivacySegment) needs at least one pass by a peer to download all
|
|
* merkle blocks. We do this by picking a peer and calling startMerkleDownload() on it.
|
|
*
|
|
* This code also takes into consideration the fact that we should not trust
|
|
* a single random node on the Internet. So when we finished asking one peer
|
|
* we go and assign a second peer to be our backup. With probably the same
|
|
* result, but maybe with more transactions.
|
|
*/
|
|
if (wallet->downloader == nullptr && !wallet->assignedPeers.empty()) {
|
|
logDebug(2025) << " +- looking for a new downloader";
|
|
logDebug(2025) << " += chain:" << m_dlm->blockHeight()
|
|
<< "wallet:" << privSegment->lastBlockSynched()
|
|
<< "wallet-backup:" << privSegment->backupSyncHeight();
|
|
int from = privSegment->backupSyncHeight();
|
|
// don't pick the same one every time, shuffle them.
|
|
std::vector<std::shared_ptr<Peer>> assignedPeers;
|
|
std::copy(wallet->assignedPeers.begin(), wallet->assignedPeers.end(), std::back_inserter(assignedPeers));
|
|
std::mt19937 g(m_randomDevice());
|
|
std::shuffle(assignedPeers.begin(), assignedPeers.end(), g);
|
|
for (auto &p : assignedPeers) {
|
|
// logDebug(2025) << "Can we download with peer:" << p->connectionId();
|
|
bool ok = true;
|
|
for (auto pdi : info.previousDownloads) {
|
|
if (pdi.peerId == p->connectionId()) {
|
|
// logDebug(2025) << " => last registred download:" << pdi.fromBlock << "-" << pdi.toBlock;
|
|
if (from < pdi.toBlock) {// this one already downloaded for us
|
|
// logDebug(2025) << " Skipping peer because it downloaded before";
|
|
// logDebug(2025) << " from:" << from << "peer downloaded until:" << pdi.toBlock;
|
|
ok = false;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
if (ok && p->peerHeight() <= from) {
|
|
// logDebug(2025) << "Skipping peer because its behind";
|
|
ok = false;
|
|
}
|
|
if (!ok)
|
|
continue;
|
|
|
|
wallet->downloader = p;
|
|
break;
|
|
}
|
|
if (wallet->downloader) {
|
|
logDebug(2025) << " +- wallet merkle-download started on peer" << wallet->downloader->connectionId()
|
|
<< privSegment->lastBlockSynched()
|
|
<< privSegment->backupSyncHeight()
|
|
<< "from:" << from;
|
|
didSomething = true;
|
|
const int boundary = from + 4000; // stop downloading after 4000 blocks to interleave secondary (backup) downloads.
|
|
wallet->downloader->startMerkleDownload(from + 1, boundary); // +1 because we start one after the last downloaded
|
|
for (auto i = info.previousDownloads.begin(); i != info.previousDownloads.end(); ++i) {
|
|
// remove an old one so we don't have duplicates.
|
|
if (i->peerId == wallet->downloader->connectionId()) {
|
|
info.previousDownloads.erase(i);
|
|
break;
|
|
}
|
|
}
|
|
info.previousDownloads.push_back({wallet->downloader->connectionId(), from + 1, 0});
|
|
info.lastHeight = from;
|
|
info.lastCheckedTime = now;
|
|
}
|
|
}
|
|
}
|
|
else if (!info.previousDownloads.empty()) { // wallet is at tip.
|
|
info.previousDownloads.clear(); // allow all peers to be used for next downloads.
|
|
}
|
|
}
|
|
bool suppliedAllPeers = true;
|
|
for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) {
|
|
if (wallet->segment->priority() >= PrivacySegment::OnlyManual)
|
|
continue;
|
|
if (static_cast<int>(wallet->assignedPeers.size()) < MinPeersPerWallet) {
|
|
suppliedAllPeers = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!suppliedAllPeers || didSomething) {
|
|
m_quietCount = 0;
|
|
} else if (m_quietCount < 5) {
|
|
// we print that we are done.
|
|
// notice that we do NOT exit.
|
|
// This action will continue to monitor if each wallet has enough peers.
|
|
if (++m_quietCount == 5)
|
|
logInfo(2025) << "==== SyncSPVAction ==== DONE";
|
|
}
|
|
again();
|
|
}
|