/* * This file is part of the Flowee project * Copyright (C) 2020-2021 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "SyncSPVAction.h" #include "PrivacySegment.h" #include "DownloadManager.h" #include "Peer.h" #include SyncSPVAction::SyncSPVAction(DownloadManager *parent) : Action(parent), MinPeersPerWallet(parent->chain() == P2PNet::MainChain ? 3 : 1) { } namespace { struct WalletInfo { PrivacySegment *segment; std::set > unassignedPeers; std::set > assignedPeers; std::shared_ptr downloader; }; } void SyncSPVAction::execute(const boost::system::error_code &error) { if (error) return; bool didSomething = false; const int currentBlockHeight = m_dlm->blockHeight(); // fill the 'wallets' struct with the current state of things. std::vector wallets; std::set > unassignedPeers; for (auto *segment : m_dlm->connectionManager().segments()) { WalletInfo info; info.segment = segment; for (const auto &peer : m_dlm->connectionManager().connectedPeers()) { if (peer->privacySegment() == segment) { info.assignedPeers.insert(peer); if (peer->merkleDownloadInProgress()) info.downloader = peer; } else if (peer->privacySegment() == nullptr && peer->peerAddress().segment() == segment->segmentId()) { info.unassignedPeers.insert(peer); } else if (peer->privacySegment() == nullptr && peer->peerAddress().segment() == 0) { unassignedPeers.insert(peer); } } wallets.push_back(info); } // sort by prio std::sort(wallets.begin(), wallets.end(), [](const WalletInfo &a, const WalletInfo &b) { return a.segment->priority() < b.segment->priority(); }); // Half of the work of this action is finding appropriate peers for a privacy-segment (aka wallet). // We open connections to peers // We wait for them to have 'received' recent headers, which indicates the peer is on our chain. // We then assign a wallet to them. // The other half is managing those peers actually downloading the merkle blocks to get our SPV based data. // For stability sake, we take those steps backwards and because we repeat every second or so a peer will // go through the whole process in steps. const auto now = boost::posix_time::microsec_clock::universal_time(); const uint32_t nowInSec = time(nullptr); const uint32_t headersOkTime = 3600 * 40; // 40 hours //-- find peers that are unassigned and which are verified to be on our chain. for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { for (auto peer : wallet->unassignedPeers) { if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) { // assign peer and send old transactions peer->setPrivacySegment(wallet->segment); didSomething = true; if (peer->relaysTransactions()) { for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) { auto tx = weakTx.lock(); if (tx && peer->privacySegment() == wallet->segment) peer->sendTx(tx); } } } } } // check if our unassigned peers can be assigned for (auto peer : unassignedPeers) { if (peer->receivedHeaders() || nowInSec - peer->peerAddress().lastReceivedGoodHeaders() < headersOkTime) { // can be assigned! for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { // are sorted by prio if (wallet->segment->priority() >= PrivacySegment::OnlyManual) break; if (wallet->assignedPeers.size() + wallet->unassignedPeers.size() < MinPeersPerWallet) { peer->setPrivacySegment(wallet->segment); if (peer->relaysTransactions()) { for (const auto &weakTx : m_dlm->connectionManager().transactionsToBroadcast()) { auto tx = weakTx.lock(); if (tx && peer->privacySegment() == wallet->segment) peer->sendTx(tx); } } wallet->assignedPeers.insert(peer); peer->peerAddress().setSegment(wallet->segment->segmentId()); // remember this assignment didSomething = true; break; } } } } for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { if (wallet->segment->priority() >= PrivacySegment::OnlyManual) break; int peerCount = wallet->unassignedPeers.size() + wallet->assignedPeers.size(); if (peerCount < MinPeersPerWallet) { // start some new connections if (wallet->segment->firstBlock() == -1 || wallet->segment->firstBlock() > currentBlockHeight) continue; // start new connections while (peerCount < MinPeersPerWallet) { auto address = m_dlm->connectionManager().peerAddressDb() .findBest(/*network and bloom*/ 1 | 4, wallet->segment->segmentId()); if (!address.isValid()) break; logInfo() << "creating a new connection for PrivacySegment" << wallet->segment->segmentId(); didSomething = true; m_dlm->connectionManager().connect(address); ++peerCount; } } } // ---- // Now we focus on what those connected peers can do for our node. Downloading merkle blocks for (auto wallet = wallets.begin(); wallet != wallets.end(); ++wallet) { auto infoIter = m_segmentInfos.find(wallet->segment); if (infoIter == m_segmentInfos.end()) { m_segmentInfos.insert(std::make_pair(wallet->segment, Info())); infoIter = m_segmentInfos.find(wallet->segment); } assert(infoIter != m_segmentInfos.end()); Info &info = infoIter->second; if (!info.previousDownloads.empty()) { PeerDownloadInfo &pdi = info.previousDownloads.back(); if (pdi.toBlock == 0 // started download && wallet->downloader == nullptr ) { // finished download // So it started a download and nobody is downloading anymore. Lets // update the PeerDownloadInfo with the last blockheight we downloaded. if (pdi.fromBlock == wallet->segment->backupSyncHeight() + 1) pdi.toBlock = wallet->segment->lastBlockSynched(); else pdi.toBlock = wallet->segment->backupSyncHeight(); if (pdi.fromBlock > pdi.toBlock) // download aborted pdi.fromBlock = 0; logDebug() << "registring a download completed for" << pdi.peerId << "from" << pdi.fromBlock << "to" << pdi.toBlock; assert(pdi.toBlock >= pdi.fromBlock); } } if (wallet->assignedPeers.empty()) continue; auto privSegment = wallet->segment; logDebug() << "WalletID:" << privSegment->segmentId() << "origStart:" << privSegment->firstBlock() << "lastBlockSynched:" << privSegment->lastBlockSynched() << "backupSyncHeight:" << privSegment->backupSyncHeight(); if (privSegment->firstBlock() > 1 && currentBlockHeight > privSegment->firstBlock() && (privSegment->lastBlockSynched() < currentBlockHeight || privSegment->backupSyncHeight() < currentBlockHeight)) { // is behind. Is someone downloading? if (wallet->downloader) { didSomething = true; // as long as there is a need to download, we continue. auto curPeer = wallet->downloader; // remember the downloader so we avoid asking the same peer to download the backup. // lets see if the peer is making progress. const int64_t timePassed = (now - info.lastCheckedTime).total_milliseconds();; const int32_t blocksDone = curPeer->lastReceivedMerkle() - info.lastHeight; logDebug() << "Downloading using peer" << curPeer->connectionId() << "prevHeight:" << info.lastHeight << "curHeight:" << curPeer->lastReceivedMerkle(); if (timePassed > 4200) { if (blocksDone < timePassed / 1000) { // peer is stalling. I expect at least 1 block a second. // we give them 2 segments try, or only one if they never started a single merkle dl if (info.slowPunishment++ > 2 || curPeer->lastReceivedMerkle() == 0) { logWarning() << "SyncSPV disconnects peer" << curPeer->connectionId() << "that is stalling download of merkle-blocks"; m_dlm->connectionManager().punish(curPeer, PUNISHMENT_MAX); wallet->downloader.reset(); curPeer = nullptr; } } else if (blocksDone > 20) { info.slowPunishment = 0; } if (curPeer) { // start new section every couple of seconds info.lastHeight = curPeer->lastReceivedMerkle(); info.lastCheckedTime = now; } } } /* * lets assign a downloader. * A 'wallet' (aka PrivacySegment) needs at least one pass by a peer to download all * merkle blocks. We do this by picking a peer and calling startMerkleDownload() on it. * * This code also takes into consideration the fact that we should not trust * a single random node on the Internet. So when we finished asking one peer * we go and assign a second peer to be our backup. With probably the same * result, but maybe with more transactions. */ if (wallet->downloader == nullptr && !wallet->assignedPeers.empty()) { logDebug() << "looking for a new downloader"; const int from = privSegment->backupSyncHeight(); for (auto &p : wallet->assignedPeers) { // logDebug() << " + " << p->connectionId(); bool ok = true; for (auto pdi : info.previousDownloads) { if (pdi.peerId == p->connectionId()) { if (from < pdi.toBlock) {// this one already downloaded for us // logDebug() << "Skipping peer because it downloaded before"; ok = false; break; } if (p->peerHeight() <= from) { // logDebug() << "Skipping peer because its behind"; ok = false; break; } } if (!ok) break; } if (!ok) continue; wallet->downloader = p; break; } if (wallet->downloader) { logDebug() << "Wallet merkle-download started on peer" << wallet->downloader->connectionId() << privSegment->backupSyncHeight() << privSegment->lastBlockSynched(); didSomething = true; wallet->downloader->startMerkleDownload(from + 1); // +1 because we start one after the last downloaded info.previousDownloads.push_back({wallet->downloader->connectionId(), from + 1, 0}); info.lastHeight = from; info.lastCheckedTime = now; } } } } if (didSomething) { m_quietCount = 0; } else if (m_quietCount++ > 2) { logInfo() << "SyncSPVAction done"; m_dlm->done(this); return; } again(); }