/* * This file is part of the Flowee project * Copyright (C) 2020-2025 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "SyncSPVAction.h" #include "BroadcastTxData.h" #include "PrivacySegment.h" #include "DownloadManager.h" #include "Peer.h" #include #include SyncSPVAction::SyncSPVAction(DownloadManager *parent) : Action(parent), MinPeersPerWallet(parent->chain() == P2PNet::MainChain ? 3 : 1) { } namespace { struct WalletInfo { std::shared_ptr segment; /* * peers that we connect to are remembered between runs and in that state * we also store the segment-id. * So an unassigned peer can actually be associated to one wallet and such * peers are stored in this struct as opposed to a 'global' one (scope=execute() method). */ std::set > unassignedPeers; // peers that already have been stated to be explicity attacked to the segmentid std::set > assignedPeers; std::vector > downloaders; }; } void SyncSPVAction::execute(const boost::system::error_code &error) { if (error) return; /* * the bloom filters and the 'mempool' call. * * Every peer that we connect to should get the bloom filter set and it should * also get the 'mempool' call sent once which will make that peer respond with * all matches of the bloom filters which are in the mempool. * * The tricky part is that we should have the latest bloom filter set on each * of those peers before the mempool is sent, since they work together. * Also if we are behind, the mempool should not be sent. (It can cause problems * in the wallet if we receive unconfirmed transactions before mined transactions) * * So, when a peer starts downloading merkle blocks, the bloom filter of all the * other peers becomes invalid the moment a match is found by an actual wallet. * This is Ok on one peer because the merkleblock automatically updates the * filter on match on the server side, but obviously not on the other peers * we have for that wallet. * * The approach we follow is that as soon as a sync-run is done on a single peer * (we do a main and also a backup sync), we tell the wallet to re-build its * bloom filter for _all_ peers and if the sync that peer did leads us to be at * the chain-tip, we also send each peer the mempool command. */ bool didSomething = false; const int currentBlockHeight = m_dlm->blockHeight(); // fill the 'wallets' struct with the current state of things. std::vector wallets; std::set > unassignedPeers; for (const auto &segment : m_dlm->connectionManager().segments()) { if (!segment->enabled()) continue; WalletInfo info; info.segment = segment; for (const auto &peer : m_dlm->connectionManager().connectedPeers()) { auto peerSegment = peer->privacySegment().lock(); if (peerSegment == segment) { info.assignedPeers.insert(peer); if (peer->merkleDownloadInProgress()) info.downloaders.push_back(peer); } else if (peerSegment.get() == nullptr && peer->peerAddress().segment() == segment->segmentId()) { info.unassignedPeers.insert(peer); } else if (peerSegment.get() == nullptr && peer->peerAddress().segment() == 0) { unassignedPeers.insert(peer); } } wallets.push_back(info); } // sort by prio std::sort(wallets.begin(), wallets.end(), [](const WalletInfo &a, const WalletInfo &b) { return a.segment->priority() < b.segment->priority(); }); // Half of the work of this action is finding appropriate peers for a privacy-segment (aka wallet). // We open connections to peers // We wait for them to have 'received' recent headers, which indicates the peer is on our chain. // We then assign a wallet to them. // The other half is managing those peers actually downloading the merkle blocks to get our SPV based data. const auto now = boost::posix_time::microsec_clock::universal_time(); const uint32_t nowInSec = time(nullptr); const uint32_t headersOkTime = 3600 * 24 * 7 * 3; // 3 weeks //-- find peers that are unassigned and which are verified to be on our chain. for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { if (wallet->segment->priority() >= PrivacySegment::OnlyManual) continue; for (auto peer : wallet->unassignedPeers) { if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) { const auto segmentId = wallet->segment->segmentId(); logInfo(2025) << "Peer previously used is still Ok, assigning it to wallet" << segmentId; // assign peer and send old transactions peer->setPrivacySegment(wallet->segment); didSomething = true; if (peer->relaysTransactions()) { for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) { auto tx = weakTx.lock(); if (tx && segmentId == tx->privSegment()) { logInfo(2025) << "broadcasting transactions from wallet to new peer" << tx.get()->hash() << "peer:" << peer->connectionId(); peer->sendTx(tx); } } } } } } // check if our unassigned peers (without segment-id) can be assigned for (auto peer : unassignedPeers) { if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) { // can be assigned! for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { // are sorted by prio if (wallet->segment->priority() >= PrivacySegment::OnlyManual) break; if (static_cast(wallet->assignedPeers.size() + wallet->unassignedPeers.size()) < MinPeersPerWallet) { const auto segmentId = wallet->segment->segmentId(); logInfo(2025) << "Found free floating peer, assigning it to wallet" << segmentId << "(count:" << wallet->assignedPeers.size() + 1 << ")"; peer->setPrivacySegment(wallet->segment); if (peer->relaysTransactions()) { for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) { auto tx = weakTx.lock(); if (tx && segmentId == tx->privSegment()) { logInfo(2025) << "broadcasting transactions from wallet to new peer" << tx.get()->hash() << "peer:" << peer->connectionId(); peer->sendTx(tx); } } } wallet->assignedPeers.insert(peer); peer->peerAddress().setSegment(wallet->segment->segmentId()); // remember this assignment didSomething = true; break; } } } } // Connect to new peers if needed. for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { if (wallet->segment->priority() >= PrivacySegment::OnlyManual) break; int peerCount = wallet->unassignedPeers.size() + wallet->assignedPeers.size(); if (peerCount < MinPeersPerWallet) { // start some new connections if (wallet->segment->firstBlock() == -1 || wallet->segment->firstBlock() > currentBlockHeight) continue; // start new connections int newConnectionsToOpen = MinPeersPerWallet - peerCount; auto infoIter = m_walletInfos.find(wallet->segment); if (infoIter != m_walletInfos.end() && infoIter->second.connectionsTriedForWallet > 18) { assert(newConnectionsToOpen > 0); // if we've already opened a lot of connections, we have to recognize that the address-DB // has loads of unusable addresses. So we create more new transactions a second in order to // find the crypto-coin in the haystack faster. newConnectionsToOpen = 10; } while (newConnectionsToOpen-- > 0) { auto address = m_dlm->connectionManager().peerAddressDb() .findBest(/*network and bloom*/ 1 | 4, wallet->segment->segmentId()); if (!address.isValid()) break; // remember how many IPs we tried. auto infoIter = m_walletInfos.find(wallet->segment); if (infoIter == m_walletInfos.end()) m_walletInfos.insert(std::make_pair(wallet->segment, WalletDownloadInfo())); else infoIter->second.connectionsTriedForWallet += 1; logInfo(2025) << "Creating a new connection for PrivacySegment" << wallet->segment->segmentId() << "(count:" << peerCount + 1 << ")"; didSomething = true; m_dlm->connectionManager().connect(address); ++peerCount; } } else { auto triedIter = m_walletInfos.find(wallet->segment); if (triedIter != m_walletInfos.end() && triedIter->second.connectionsTriedForWallet > 250) { triedIter->second.connectionsTriedForWallet = 0; // We finished, got enough peers for wallet. // but it took really a lot of IPs to try. Let's check if we can extend our DB for (auto peer : wallet->assignedPeers) { auto address = peer->peerAddress(); if (!address.askedAddresses()) { address.setAskedAddresses(true); logInfo() << "SyncSVP, Sending GetAddr msg to" << peer->connectionId(); peer->sendMessage(Message(Api::LegacyP2P, Api::P2P::GetAddr)); } } } } } // ---- // Now we focus on what those connected peers can do for our node. Downloading merkle blocks // first, garbage collect peers that are no longer connected. for (auto peerIter = m_peerInfos.begin(); peerIter != m_peerInfos.end();) { if (peerIter->first->status() == Peer::ShuttingDown) peerIter = m_peerInfos.erase(peerIter); else ++peerIter; } for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { if (wallet->segment->priority() >= PrivacySegment::OnlyManual) break; // first, check up on peers to see if they stopped downloading and if so // register how far they got. int highestBlockReceived = -1; for (const auto &peer : wallet->assignedPeers) { auto infoIter = m_peerInfos.find(peer); if (infoIter == m_peerInfos.end()) { m_peerInfos.insert(std::make_pair(peer, PeerInfo())); continue; } PeerInfo &info = infoIter->second; if (info.previousDownloads.empty()) continue; PeerDownloadInfo &pdi = info.previousDownloads.back(); if (pdi.toBlock == 0 // started download && !peer->merkleDownloadInProgress()) { // finished download // So it started a download and nobody is downloading anymore. Lets // update the PeerDownloadInfo with the last blockheight we downloaded. pdi.toBlock = peer->highestMerkleReceived(); if (pdi.fromBlock > pdi.toBlock) // download aborted info.previousDownloads.resize(info.previousDownloads.size() - 1); // remove it else highestBlockReceived = std::min(pdi.toBlock, highestBlockReceived); // seen by two... } } if (highestBlockReceived > 0 && wallet->downloaders.empty()) { // no longer downloading if (MinPeersPerWallet == 1) { // special case testnet. Not enough peers, so we cheat // mark 'backup' to be the same as the main, avoiding using a second peer, // since we might not have any. wallet->segment->blockSynched(wallet->segment->lastBlockSynched()); } else { // after a range of blocks finished downloading, we assume that it found something and thus the // filter on the other peers need to have the new utxos added to be able to act on them. logInfo(2025) << "A peer stopped downloading, broadcasting filter to its wallet-peers" << wallet->segment->segmentId(); wallet->segment->rebuildFilter(highestBlockReceived == currentBlockHeight ? PrivacySegment::FilterAtTip : PrivacySegment::NotAtTip); } } if (wallet->assignedPeers.empty()) // nobody to assign the download to, no work to do continue; const auto privSegment = wallet->segment; // wallet is in need of downloading more merkle blocks ? if (privSegment->firstBlock() > 1 && currentBlockHeight > privSegment->firstBlock() && (privSegment->lastBlockSynched() < currentBlockHeight || privSegment->backupSyncHeight() < currentBlockHeight)) { logInfo(2025) << "WalletID:" << privSegment->segmentId() << "origStart:" << privSegment->firstBlock() << "lastBlockSynched:" << privSegment->lastBlockSynched() << "backupSyncHeight:" << privSegment->backupSyncHeight(); // is behind. Is some peer downloading? for (auto iter = wallet->downloaders.begin(); iter != wallet->downloaders.end();) { const auto &peer = *iter; didSomething = true; // as long as there is a need to download, we continue. // lets see if the peer is making progress. auto infoIter = m_peerInfos.find(peer); assert(infoIter != m_peerInfos.end()); PeerInfo &info = infoIter->second; const int64_t timePassed = (now - info.lastCheckedTime).total_milliseconds();; const int32_t blocksDone = peer->lastReceivedMerkle() - info.lastHeight; logInfo(2025) << " +- downloading using peer" << peer->connectionId() << "prevHeight:" << info.lastHeight << "curHeight:" << peer->lastReceivedMerkle(); if (timePassed > 4200) { if (blocksDone < timePassed / 1000) { // peer is stalling. I expect at least 1 block a second. // we give them 2 segments try, or only one if they never started a single merkle dl if (info.slowPunishment++ > 2 || peer->lastReceivedMerkle() == 0) { logWarning(2025) << "SyncSPV disconnects peer" << peer->connectionId() << "that is stalling download of merkle-blocks"; m_dlm->connectionManager().punish(peer, 210); m_dlm->connectionManager().disconnect(peer); iter = wallet->downloaders.erase(iter); continue; } } else if (blocksDone > 20) { info.slowPunishment = 0; } // start new section every couple of seconds info.lastHeight = peer->lastReceivedMerkle(); info.lastCheckedTime = now; } ++iter; } // Do we need new downloaders? if (wallet->assignedPeers.empty()) continue; // well, next time... if (wallet->downloaders.size() == 2) continue; // nope. Carry on. auto walletInfoIter = m_walletInfos.find(wallet->segment); assert(m_walletInfos.end() != walletInfoIter); auto &walletInfo = walletInfoIter->second; int neededPeers = 2; if (wallet->downloaders.size() == 1) { neededPeers = 1; if (privSegment->lastBlockSynched() == walletInfo.boundaryHeight) // one peer already finished the run. continue; } /* * lets assign a downloader (or two). * A 'wallet' (aka PrivacySegment) needs at least one pass by a peer to download all * merkle blocks. We do this by picking a peer and calling startMerkleDownload() on it. * * This code also takes into consideration the fact that we should not trust * a single random node on the Internet. So when we finished asking one peer * we go and assign a second peer to be our backup. With probably the same * result, but maybe with more transactions. */ logDebug(2025) << " +- looking for downloader(s):" << neededPeers; logDebug(2025) << " += chain:" << m_dlm->blockHeight() << "wallet:" << privSegment->lastBlockSynched() << "wallet-backup:" << privSegment->backupSyncHeight(); logDebug(2025) << " previous boundary hight was at:" << walletInfo.boundaryHeight; while (neededPeers > 0) { assert(neededPeers <= 2); if (neededPeers == 2) { walletInfo.boundaryHeight = privSegment->lastBlockSynched(); // stop downloading after 7000 blocks to interleave peers walletInfo.boundaryHeight += 7000; if (MinPeersPerWallet == 1) // unless we're on testnet, then we avoid switching peers since there are so little of them. walletInfo.boundaryHeight += 1000000; } else if (MinPeersPerWallet == 1) { // testnet hack. Only one peer available. break; } const int from = neededPeers == 2 ? privSegment->lastBlockSynched() : privSegment->backupSyncHeight(); // don't pick the same one every time, shuffle them. std::vector> assignedPeers; std::copy(wallet->assignedPeers.begin(), wallet->assignedPeers.end(), std::back_inserter(assignedPeers)); std::mt19937 g(m_randomDevice()); std::shuffle(assignedPeers.begin(), assignedPeers.end(), g); std::shared_ptr downloader; for (auto &p : assignedPeers) { // logDebug(2025) << "Can we download with peer:" << p->connectionId(); bool ok = true; auto peerIter = m_peerInfos.find(p); if (peerIter == m_peerInfos.end()) { m_peerInfos.insert(std::make_pair(p, PeerInfo())); peerIter = m_peerInfos.find(p); } auto &info = peerIter->second; for (auto pdi : info.previousDownloads) { logDebug(2025) << " => last registred download:" << pdi.fromBlock << "-" << pdi.toBlock; if (pdi.toBlock == 0 || from < pdi.toBlock) {// this one already downloaded for us // logDebug(2025) << " Skipping peer because it downloaded before"; // logDebug(2025) << " from:" << from << "peer downloaded until:" << pdi.toBlock; ok = false; } break; } if (ok && p->peerHeight() <= from) { // logDebug(2025) << "Skipping peer because its behind"; ok = false; } if (ok) { downloader = p; info.previousDownloads.clear(); info.previousDownloads.push_back({from + 1, 0}); break; } } if (!downloader) break; logDebug(2025) << " +- wallet merkle-download started on peer" << downloader->connectionId() << "lastBlockSynced:" << privSegment->lastBlockSynched() << "backup:" << privSegment->backupSyncHeight() << "from:" << from << "upper boundary:" << walletInfo.boundaryHeight; didSomething = true; --neededPeers; wallet->downloaders.push_back(downloader); auto peerIter = m_peerInfos.find(downloader); assert(peerIter != m_peerInfos.end()); auto &info = peerIter->second; downloader->startMerkleDownload(from + 1, walletInfo.boundaryHeight); // +1 because we start one after the last downloaded info.lastHeight = from; info.lastCheckedTime = now; } } } bool suppliedAllPeers = true; for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { if (wallet->segment->priority() >= PrivacySegment::OnlyManual) continue; if (static_cast(wallet->assignedPeers.size()) < MinPeersPerWallet) { suppliedAllPeers = false; break; } } if (!suppliedAllPeers || didSomething) { m_quietCount = 0; } else if (m_quietCount < 5) { // we print that we are done. // notice that we do NOT exit. // This action will continue to monitor if each wallet has enough peers. if (++m_quietCount == 5) logInfo(2025) << "==== SyncSPVAction ==== DONE"; } again(); }