/* * This file is part of the Flowee project * Copyright (C) 2020-2023 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "SyncSPVAction.h" #include "PrivacySegment.h" #include "DownloadManager.h" #include "Peer.h" #include SyncSPVAction::SyncSPVAction(DownloadManager *parent) : Action(parent), MinPeersPerWallet(parent->chain() == P2PNet::MainChain ? 3 : 1) { } namespace { struct WalletInfo { PrivacySegment *segment; /* * peers that we connect to are remembered between runs and in that state * we also store the segment-id. * So an unassigned peer can actually be associated to one wallet and such * peers are stored in this struct as opposed to a 'global' one (scope=execute() method). */ std::set > unassignedPeers; std::set > assignedPeers; std::shared_ptr downloader; }; } void SyncSPVAction::execute(const boost::system::error_code &error) { if (error) return; /* * the bloom filters and the 'mempool' call. * * Every peer that we connect to should get the bloom filter set and it should * also get the 'mempool' call sent once which will make that peer respond with * all matches of the bloom filters which are in the mempool. * * The tricky part is that we should have the latest bloom filter set on each * of those peers before the mempool is sent, since they work together. * Also if we are behind, the mempool should not be sent. (It can cause problems * in the wallet if we receive unconfirmed transactions before mined transactions) * * So, when a peer starts downloading merkle blocks, the bloom filter of all the * other peers becomes invalid the moment a match is found by an actual wallet. * This is Ok on one peer because the merkleblock automatically updates the * filter on match on the server side, but obviously not on the other peers * we have for that wallet. * * The approach we follow is that as soon as a sync-run is done on a single peer * (we do a main and also a backup sync), we tell the wallet to re-build its * bloom filter for _all_ peers and if the sync that peer did leads us to be at * the chain-tip, we also send each peer the mempool command. */ bool didSomething = false; const int currentBlockHeight = m_dlm->blockHeight(); // fill the 'wallets' struct with the current state of things. std::vector wallets; std::set > unassignedPeers; for (auto *segment : m_dlm->connectionManager().segments()) { if (!segment->enabled()) continue; WalletInfo info; info.segment = segment; for (const auto &peer : m_dlm->connectionManager().connectedPeers()) { if (peer->privacySegment() == segment) { info.assignedPeers.insert(peer); if (peer->merkleDownloadInProgress()) info.downloader = peer; } else if (peer->privacySegment() == nullptr && peer->peerAddress().segment() == segment->segmentId()) { info.unassignedPeers.insert(peer); } else if (peer->privacySegment() == nullptr && peer->peerAddress().segment() == 0) { unassignedPeers.insert(peer); } } wallets.push_back(info); } // sort by prio std::sort(wallets.begin(), wallets.end(), [](const WalletInfo &a, const WalletInfo &b) { return a.segment->priority() < b.segment->priority(); }); // Half of the work of this action is finding appropriate peers for a privacy-segment (aka wallet). // We open connections to peers // We wait for them to have 'received' recent headers, which indicates the peer is on our chain. // We then assign a wallet to them. // The other half is managing those peers actually downloading the merkle blocks to get our SPV based data. const auto now = boost::posix_time::microsec_clock::universal_time(); const uint32_t nowInSec = time(nullptr); const uint32_t headersOkTime = 3600 * 24 * 7 * 3; // 3 weeks //-- find peers that are unassigned and which are verified to be on our chain. for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { for (auto peer : wallet->unassignedPeers) { if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) { logInfo(2025) << "Peer previously used is still Ok, assigning it to wallet" << wallet->segment->segmentId(); // assign peer and send old transactions peer->setPrivacySegment(wallet->segment); didSomething = true; if (peer->relaysTransactions()) { for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) { logInfo(2025) << "broadcasting transactions from wallet to new peer"; auto tx = weakTx.lock(); if (tx && peer->privacySegment() == wallet->segment) peer->sendTx(tx); } } } } } // check if our unassigned peers (without segment-id) can be assigned for (auto peer : unassignedPeers) { if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) { // can be assigned! for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { // are sorted by prio if (wallet->segment->priority() >= PrivacySegment::OnlyManual) break; if (static_cast(wallet->assignedPeers.size() + wallet->unassignedPeers.size()) < MinPeersPerWallet) { logInfo(2025) << "Found free floating peer, assigning it to wallet" << wallet->segment->segmentId() << "(count:" << wallet->assignedPeers.size() + 1 << ")"; peer->setPrivacySegment(wallet->segment); if (peer->relaysTransactions()) { for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) { auto tx = weakTx.lock(); if (tx && peer->privacySegment() == wallet->segment) peer->sendTx(tx); } } wallet->assignedPeers.insert(peer); peer->peerAddress().setSegment(wallet->segment->segmentId()); // remember this assignment didSomething = true; break; } } } } for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { if (wallet->segment->priority() >= PrivacySegment::OnlyManual) break; int peerCount = wallet->unassignedPeers.size() + wallet->assignedPeers.size(); if (peerCount < MinPeersPerWallet) { // start some new connections if (wallet->segment->firstBlock() == -1 || wallet->segment->firstBlock() > currentBlockHeight) continue; // start new connections while (peerCount < MinPeersPerWallet) { auto address = m_dlm->connectionManager().peerAddressDb() .findBest(/*network and bloom*/ 1 | 4, wallet->segment->segmentId()); if (!address.isValid()) break; logInfo(2025) << "Creating a new connection for PrivacySegment" << wallet->segment->segmentId() << "(count:" << peerCount + 1 << ")"; didSomething = true; m_dlm->connectionManager().connect(address); ++peerCount; } } } // ---- // Now we focus on what those connected peers can do for our node. Downloading merkle blocks for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { if (wallet->segment->priority() >= PrivacySegment::OnlyManual) break; auto infoIter = m_segmentInfos.find(wallet->segment); if (infoIter == m_segmentInfos.end()) { m_segmentInfos.insert(std::make_pair(wallet->segment, Info())); infoIter = m_segmentInfos.find(wallet->segment); } assert(infoIter != m_segmentInfos.end()); Info &info = infoIter->second; if (!info.previousDownloads.empty()) { PeerDownloadInfo &pdi = info.previousDownloads.back(); if (pdi.toBlock == 0 // started download && wallet->downloader == nullptr ) { // finished download // So it started a download and nobody is downloading anymore. Lets // update the PeerDownloadInfo with the last blockheight we downloaded. if (pdi.fromBlock == wallet->segment->backupSyncHeight() + 1) pdi.toBlock = wallet->segment->lastBlockSynched(); else pdi.toBlock = wallet->segment->backupSyncHeight(); if (pdi.fromBlock > pdi.toBlock) // download aborted pdi.fromBlock = 0; assert(pdi.toBlock >= pdi.fromBlock); // after a range of blocks finished downloading, we assume that it found something and thus the // filter on the other peers need to have the new utxos added to be able to act on them. logInfo(2025) << "A peer stopped downloading, broadcasting filter to its wallet-peers" << wallet->segment->segmentId(); wallet->segment->rebuildFilter(pdi.toBlock == currentBlockHeight ? PrivacySegment::FilterAtTIp : PrivacySegment::NotAtTip); } } if (wallet->assignedPeers.empty()) // nobody to assign the download to, no work to do continue; auto privSegment = wallet->segment; // wallet is in need of downloading more merkle blocks ? if (privSegment->firstBlock() > 1 && currentBlockHeight > privSegment->firstBlock() && (privSegment->lastBlockSynched() < currentBlockHeight || privSegment->backupSyncHeight() < currentBlockHeight)) { logInfo(2025) << "WalletID:" << privSegment->segmentId() << "origStart:" << privSegment->firstBlock() << "lastBlockSynched:" << privSegment->lastBlockSynched() << "backupSyncHeight:" << privSegment->backupSyncHeight(); // is behind. Is some peer downloading? if (wallet->downloader) { didSomething = true; // as long as there is a need to download, we continue. auto curPeer = wallet->downloader; // remember the downloader so we avoid asking the same peer to download the backup. // lets see if the peer is making progress. const int64_t timePassed = (now - info.lastCheckedTime).total_milliseconds();; const int32_t blocksDone = curPeer->lastReceivedMerkle() - info.lastHeight; logInfo(2025) << " +- downloading using peer" << curPeer->connectionId() << "prevHeight:" << info.lastHeight << "curHeight:" << curPeer->lastReceivedMerkle(); if (timePassed > 4200) { if (blocksDone < timePassed / 1000) { // peer is stalling. I expect at least 1 block a second. // we give them 2 segments try, or only one if they never started a single merkle dl if (info.slowPunishment++ > 2 || curPeer->lastReceivedMerkle() == 0) { logWarning(2025) << "SyncSPV disconnects peer" << curPeer->connectionId() << "that is stalling download of merkle-blocks"; m_dlm->connectionManager().punish(curPeer, PUNISHMENT_MAX); wallet->downloader.reset(); curPeer = nullptr; } } else if (blocksDone > 20) { info.slowPunishment = 0; } if (curPeer) { // start new section every couple of seconds info.lastHeight = curPeer->lastReceivedMerkle(); info.lastCheckedTime = now; } } } /* * lets assign a downloader. * A 'wallet' (aka PrivacySegment) needs at least one pass by a peer to download all * merkle blocks. We do this by picking a peer and calling startMerkleDownload() on it. * * This code also takes into consideration the fact that we should not trust * a single random node on the Internet. So when we finished asking one peer * we go and assign a second peer to be our backup. With probably the same * result, but maybe with more transactions. */ if (wallet->downloader == nullptr && !wallet->assignedPeers.empty()) { logDebug(2025) << " +- looking for a new downloader"; logDebug(2025) << " += chain:" << m_dlm->blockHeight() << "wallet:" << privSegment->lastBlockSynched() << "wallet-backup:" << privSegment->backupSyncHeight(); int from; if (m_dlm->blockHeight() == privSegment->lastBlockSynched()) from = privSegment->backupSyncHeight(); else from = privSegment->lastBlockSynched(); for (auto &p : wallet->assignedPeers) { // logDebug(2025) << " + " << p->connectionId(); bool ok = true; for (auto pdi : info.previousDownloads) { if (pdi.peerId == p->connectionId()) { if (from < pdi.toBlock) {// this one already downloaded for us // logDebug(2025) << "Skipping peer because it downloaded before"; ok = false; break; } if (p->peerHeight() <= from) { // logDebug(2025) << "Skipping peer because its behind"; ok = false; break; } } if (!ok) break; } if (!ok) continue; wallet->downloader = p; break; } if (wallet->downloader) { logDebug(2025) << " +- wallet merkle-download started on peer" << wallet->downloader->connectionId() << privSegment->lastBlockSynched() << privSegment->backupSyncHeight(); didSomething = true; wallet->downloader->startMerkleDownload(from + 1); // +1 because we start one after the last downloaded info.previousDownloads.push_back({wallet->downloader->connectionId(), from + 1, 0}); info.lastHeight = from; info.lastCheckedTime = now; } } } } bool suppliedAllPeers = true; for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { if (wallet->segment->priority() >= PrivacySegment::OnlyManual) continue; if (static_cast(wallet->assignedPeers.size()) < MinPeersPerWallet) { suppliedAllPeers = false; break; } } if (!suppliedAllPeers || didSomething) { m_quietCount = 0; } else if (m_quietCount < 5) { // we print that we are done. // notice that we do NOT exit. // This action will continue to monitor if each wallet has enough peers. if (++m_quietCount == 5) logInfo(2025) << "==== SyncSPVAction ==== DONE"; } again(); }