5f6db1de18
Setting the "LowPower" mode on the main DownloadManager class will cause it to halt all network activity and jobs will die. You can revive it by changing it to NormalPower and optionally calling the start() method to restart all jobs.
461 lines
16 KiB
C++
461 lines
16 KiB
C++
/*
|
|
* This file is part of the Flowee project
|
|
* Copyright (C) 2020-2025 Tom Zander <tom@flowee.org>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "DownloadManager.h"
|
|
#include "DataListenerInterface.h"
|
|
#include "FillAddressDBAction.h"
|
|
#include "HeaderSyncInterface.h"
|
|
#include "InventoryItem.h"
|
|
#include "P2PNetInterface.h"
|
|
#include "Peer.h"
|
|
#include "SyncChainAction.h"
|
|
#include "SyncSPVAction.h"
|
|
#include "CleanPeersAction.h"
|
|
|
|
#include <primitives/Tx.h>
|
|
#include <streaming/P2PParser.h>
|
|
#include <streaming/P2PBuilder.h>
|
|
#include <streaming/BufferPools.h>
|
|
|
|
DownloadManager::DownloadManager(boost::asio::io_context &context, const boost::filesystem::path &basedir, P2PNet::Chain chain)
|
|
: m_strand(context),
|
|
m_chain(chain),
|
|
m_connectionManager(context, basedir, this),
|
|
m_blockchain(this, basedir, chain),
|
|
m_shuttingDown(false)
|
|
{
|
|
m_connectionManager.setBlockHeight(m_blockchain.height());
|
|
m_isBehind = !isChainUpToDate();
|
|
|
|
// create basedir, and fail-fast if we don't have writing rights to do that.
|
|
try {
|
|
boost::filesystem::create_directories(basedir);
|
|
} catch (const boost::filesystem::filesystem_error&) {
|
|
if (!boost::filesystem::exists(basedir) || !boost::filesystem::is_directory(basedir)) {
|
|
logFatal() << "Failed to create datadir" << basedir.string();
|
|
throw;
|
|
}
|
|
// errors like "already exists" are safe to ignore.
|
|
}
|
|
}
|
|
|
|
const ConnectionManager &DownloadManager::connectionManager() const
|
|
{
|
|
return m_connectionManager;
|
|
}
|
|
|
|
ConnectionManager &DownloadManager::connectionManager()
|
|
{
|
|
return m_connectionManager;
|
|
}
|
|
|
|
void DownloadManager::headersDownloadFinished(int newBlockHeight, int peerId)
|
|
{
|
|
if (m_shuttingDown)
|
|
return;
|
|
// assert(m_strand.running_in_this_thread()); // disabled to make usable in unit test
|
|
// The blockchain lets us know the result of a successful headers-download.
|
|
if (m_peerDownloadingHeaders == peerId)
|
|
m_peerDownloadingHeaders = -1;
|
|
|
|
auto peer = m_connectionManager.peer(peerId);
|
|
if (peer.get()) {
|
|
peer->peerAddress().gotGoodHeaders();
|
|
peer->updatePeerHeight(newBlockHeight);
|
|
}
|
|
|
|
m_connectionManager.setBlockHeight(newBlockHeight);
|
|
getMoreHeaders();
|
|
for (auto iface : m_headerSyncListeners) {
|
|
iface->setHeaderSyncHeight(newBlockHeight);
|
|
}
|
|
if (!m_isBehind) // don't flood the user during initial sync.
|
|
m_notifications.notifyNewBlock(newBlockHeight);
|
|
if (m_isBehind && isChainUpToDate()) {
|
|
m_isBehind = false;
|
|
for (auto iface : m_headerSyncListeners) {
|
|
iface->headerSyncComplete();
|
|
}
|
|
}
|
|
|
|
addAction<SyncSPVAction>();
|
|
}
|
|
|
|
void DownloadManager::getMoreHeaders()
|
|
{
|
|
if (m_peerDownloadingHeaders == -1) { // check if we need to download more of them.
|
|
// TODO use the fastest peer.
|
|
for (auto &p : m_connectionManager.connectedPeers()) {
|
|
if (p->startHeight() > blockHeight()) {
|
|
m_peerDownloadingHeaders = p->connectionId();
|
|
m_connectionManager.requestHeaders(p);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void DownloadManager::parseInvMessage(Message message, int sourcePeerId)
|
|
{
|
|
if (m_shuttingDown)
|
|
return;
|
|
// this is called as a result of an INV received by a peer.
|
|
// We check this and insert into the m_downloadQueue a target to download.
|
|
try {
|
|
Streaming::P2PParser parser(message);
|
|
const size_t count = parser.readCompactInt();
|
|
logDebug() << "Received" << count << "Inv messages from" << sourcePeerId;
|
|
std::unique_lock<std::mutex> lock(m_downloadsLock);
|
|
for (size_t i = 0; i < count; ++i) {
|
|
uint32_t type = parser.readInt();
|
|
auto inv = InventoryItem(parser.readUint256(), type);
|
|
|
|
// if block type, check if we already know about it
|
|
if (type == InventoryItem::BlockType) {
|
|
auto height = m_blockchain.blockHeightFor(inv.hash());
|
|
logDebug() << " \\- inv is of 'block' type." << inv.hash();
|
|
if (height > 0) {
|
|
// a block-inv we already have seen and approved.
|
|
auto peer = m_connectionManager.peer(sourcePeerId);
|
|
if (peer)
|
|
peer->updatePeerHeight(height);
|
|
// no need for further action.
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// we update the downloads queue
|
|
if (type == InventoryItem::TransactionType || type == InventoryItem::BlockType
|
|
|| type == InventoryItem::DoubleSpendType) {
|
|
auto findIter = m_downloadTargetIds.find(inv.hash());
|
|
if (findIter == m_downloadTargetIds.end()) {
|
|
#ifndef BCH_NO_DEBUG_OUTPUT
|
|
auto debug = logDebug() << " \\- inv is";
|
|
if (type ==InventoryItem::TransactionType)
|
|
debug << "Tx" << inv.hash();
|
|
else if (type ==InventoryItem::DoubleSpendType)
|
|
debug << "DSProof";
|
|
else
|
|
debug << "block";
|
|
#endif
|
|
// new download target.
|
|
DownloadTarget dlt(inv);
|
|
dlt.sourcePeers.push_back(sourcePeerId);
|
|
m_downloadQueue.insert(std::make_pair(m_nextDownloadTarget, dlt));
|
|
m_downloadTargetIds.insert(std::make_pair(inv.hash(), m_nextDownloadTarget++));
|
|
} else {
|
|
logDebug() << " \\- inv is duplicate";
|
|
// add source to existing one
|
|
auto targetIter = m_downloadQueue.find(findIter->second);
|
|
assert(targetIter != m_downloadQueue.end());
|
|
targetIter->second.sourcePeers.push_back(sourcePeerId);
|
|
}
|
|
}
|
|
}
|
|
} catch (const std::exception &e) {
|
|
logInfo() << "Inv messsage parsing failed" << e << "peer:" << sourcePeerId;
|
|
m_connectionManager.punish(sourcePeerId, 250);
|
|
}
|
|
logDebug() << " Queue size now" << m_downloadQueue.size();
|
|
|
|
// call runQueue in a next event.
|
|
m_strand.post(std::bind(&DownloadManager::runQueue, this), std::allocator<void>());
|
|
}
|
|
|
|
void DownloadManager::parseTransaction(Tx tx, int sourcePeerId)
|
|
{
|
|
// we get called by the peer about a transaction just received.
|
|
// Now we find the downloads data that requested it and update
|
|
// m_downloads and m_downloadTargetIds and m_downloadQueue.
|
|
auto hash = tx.createHash();
|
|
std::unique_lock<std::mutex> lock(m_downloadsLock);
|
|
const size_t downloadSlots = m_downloads.size();
|
|
bool found = false;
|
|
for (size_t i = 0; i < downloadSlots; ++i) {
|
|
if (m_downloads[i].primary == sourcePeerId
|
|
|| m_downloads[i].secondary == sourcePeerId) {
|
|
auto dlIter = m_downloadQueue.find(m_downloads[i].targetId);
|
|
assert(dlIter != m_downloadQueue.end());
|
|
if (dlIter->second.inv.type() == InventoryItem::TransactionType
|
|
&& dlIter->second.inv.hash() == hash) {
|
|
// mark download complete.
|
|
auto tIter = m_downloadTargetIds.find(dlIter->second.inv.hash());
|
|
if (m_downloadTargetIds.end() != tIter)
|
|
m_downloadTargetIds.erase(tIter);
|
|
m_downloads[i] = ActiveDownload();
|
|
m_downloadQueue.erase(dlIter);
|
|
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (!found) {
|
|
logWarning() << "Peer" << sourcePeerId << "sent unsolicited tx. This breaks protocol";
|
|
m_connectionManager.punish(sourcePeerId, 34);
|
|
}
|
|
try {
|
|
for (auto iface : m_dataListeners) {
|
|
iface->newTransaction(tx);
|
|
}
|
|
} catch (const std::exception &e) {
|
|
// assume that anything wrong happening in the interface is our fault for not checking the
|
|
// validity of the transaction.
|
|
// Then we just blame the source peer for providing us with bad data.
|
|
m_connectionManager.punish(sourcePeerId, 501);
|
|
}
|
|
}
|
|
|
|
void DownloadManager::peerDisconnected(int connectionId)
|
|
{
|
|
if (connectionId == m_peerDownloadingHeaders)
|
|
m_peerDownloadingHeaders = -1;
|
|
}
|
|
|
|
void DownloadManager::reportDataFailure(int connectionId)
|
|
{
|
|
m_connectionManager.punish(connectionId, 1001);
|
|
}
|
|
|
|
void DownloadManager::done(Action *action)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
for (auto iter = m_runningActions.begin(); iter != m_runningActions.end(); ++iter) {
|
|
if (*iter == action) {
|
|
m_runningActions.erase(iter);
|
|
delete action;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
void DownloadManager::addDataListener(DataListenerInterface *listener)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
for (auto iface : m_dataListeners) {
|
|
if (iface == listener)
|
|
return;
|
|
}
|
|
m_dataListeners.push_back(listener);
|
|
}
|
|
|
|
void DownloadManager::removeDataListener(DataListenerInterface *listener)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
for (auto iter = m_dataListeners.begin(); iter != m_dataListeners.end(); ++iter) {
|
|
if (*iter == listener) {
|
|
m_dataListeners.erase(iter);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
void DownloadManager::addP2PNetListener(P2PNetInterface *listener)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
for (auto iface : m_netListeners) {
|
|
if (iface == listener)
|
|
return;
|
|
}
|
|
m_netListeners.push_back(listener);
|
|
}
|
|
|
|
void DownloadManager::removeP2PNetListener(P2PNetInterface *listener)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
for (auto iter = m_netListeners.begin(); iter != m_netListeners.end(); ++iter) {
|
|
if (*iter == listener) {
|
|
m_netListeners.erase(iter);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
std::vector<P2PNetInterface *> DownloadManager::p2pNetListeners() const
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
return m_netListeners;
|
|
}
|
|
|
|
void DownloadManager::addHeaderListener(HeaderSyncInterface *listener)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
for (auto iface : m_headerSyncListeners) {
|
|
if (iface == listener)
|
|
return;
|
|
}
|
|
m_headerSyncListeners.push_back(listener);
|
|
}
|
|
|
|
void DownloadManager::removeHeaderListener(HeaderSyncInterface *listener)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
for (auto iter = m_headerSyncListeners.begin(); iter != m_headerSyncListeners.end(); ++iter) {
|
|
if (*iter == listener) {
|
|
m_headerSyncListeners.erase(iter);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
void DownloadManager::shutdown()
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
if (m_shuttingDown)
|
|
return;
|
|
m_shuttingDown = true;
|
|
|
|
for (auto a : m_runningActions) {
|
|
a->cancel();
|
|
}
|
|
m_connectionManager.shutdown();
|
|
m_blockchain.save();
|
|
m_strand.post(std::bind(&DownloadManager::finishShutdown, this), std::allocator<void>());
|
|
m_waitVariable.wait(lock);
|
|
}
|
|
|
|
void DownloadManager::saveData()
|
|
{
|
|
m_connectionManager.saveData();
|
|
m_blockchain.save();
|
|
}
|
|
|
|
void DownloadManager::finishShutdown()
|
|
{
|
|
assert(m_shuttingDown);
|
|
std::unique_lock<std::mutex> lock(m_lock);
|
|
m_waitVariable.notify_all();
|
|
}
|
|
|
|
P2PNet::Chain DownloadManager::chain() const
|
|
{
|
|
return m_chain;
|
|
}
|
|
|
|
void DownloadManager::runQueue()
|
|
{
|
|
if (m_shuttingDown)
|
|
return;
|
|
std::unique_lock<std::mutex> lock(m_downloadsLock);
|
|
auto iter = m_downloadQueue.begin();
|
|
const size_t downloadSlots = m_downloads.size();
|
|
for (size_t i = 0; i < downloadSlots; ++i) {
|
|
if (m_downloads[i].targetId == 0) { // slot unoccupied.
|
|
|
|
// iterate through downloadqueue to find a new job
|
|
while (true) {
|
|
if (iter == m_downloadQueue.end())
|
|
return; // nothing left to download
|
|
|
|
const DownloadTarget &dt = iter->second;
|
|
bool alreadyRunning = false;
|
|
for (size_t x = 0; !alreadyRunning && x < downloadSlots; ++x) {
|
|
// first check if nobody is downloading this one yet.
|
|
alreadyRunning = m_downloads[x].targetId == iter->first;
|
|
}
|
|
if (alreadyRunning) {
|
|
++iter;
|
|
continue;
|
|
}
|
|
if (dt.inv.type() == InventoryItem::BlockType) { // remove duplicates
|
|
auto height = m_blockchain.blockHeightFor(dt.inv.hash());
|
|
if (height > 0) {
|
|
// hash already known.
|
|
// No need to download it.
|
|
auto tIter = m_downloadTargetIds.find(dt.inv.hash());
|
|
if (m_downloadTargetIds.end() != tIter)
|
|
m_downloadTargetIds.erase(tIter);
|
|
|
|
// let the peer know the height that is related to the INV
|
|
for (auto id : iter->second.sourcePeers) {
|
|
auto peer = m_connectionManager.peer(id);
|
|
if (peer)
|
|
peer->updatePeerHeight(height);
|
|
}
|
|
iter = m_downloadQueue.erase(iter);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
assert(!dt.sourcePeers.empty());
|
|
int preferredDownload = -1;
|
|
for (auto peerId : dt.sourcePeers) {
|
|
// find a peer we assign the download to.
|
|
if (preferredDownload == -1) {
|
|
preferredDownload = peerId;
|
|
} else {
|
|
bool inUse = false;
|
|
for (size_t x = 0; !inUse && x < downloadSlots; ++x) {
|
|
if (m_downloads[x].primary == peerId || m_downloads[x].secondary == peerId) {
|
|
inUse = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!inUse) {
|
|
break;
|
|
preferredDownload = peerId;
|
|
}
|
|
}
|
|
}
|
|
auto peer = m_connectionManager.peer(preferredDownload);
|
|
assert(peer);
|
|
if (peer) {
|
|
logInfo() << "Requesting DL for inv from peer:" << preferredDownload;
|
|
logDebug() << " + inv:" << dt.inv.hash();
|
|
m_downloads[i].targetId = iter->first;
|
|
m_downloads[i].downloadStartTime = time(nullptr);
|
|
m_downloads[i].primary = preferredDownload;
|
|
|
|
if (dt.inv.type() == InventoryItem::TransactionType || dt.inv.type() == InventoryItem::DoubleSpendType) {
|
|
Streaming::P2PBuilder builder(Streaming::pool(40));
|
|
builder.writeCompactSize(1);
|
|
builder.writeInt(dt.inv.type());
|
|
builder.writeByteArray(dt.inv.hash(), Streaming::RawBytes);
|
|
peer->sendMessage(builder.message(Api::P2P::GetData));
|
|
}
|
|
else if (dt.inv.type() == InventoryItem::BlockType) {
|
|
m_connectionManager.requestHeaders(peer);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
P2PNet::PowerMode DownloadManager::powerMode() const
|
|
{
|
|
return m_powerMode;
|
|
}
|
|
|
|
void DownloadManager::setPowerMode(P2PNet::PowerMode newPowerMode)
|
|
{
|
|
m_powerMode = newPowerMode;
|
|
m_connectionManager.onPowerModeChanged();
|
|
}
|
|
|
|
void DownloadManager::start()
|
|
{
|
|
if (m_powerMode == P2PNet::NormalPower) {
|
|
addAction<SyncSPVAction>();
|
|
addAction<CleanPeersAction>();
|
|
// do these last, so they reuse connections started by the syncSPV one.
|
|
addAction<FillAddressDBAction>();
|
|
addAction<SyncChainAction>();
|
|
}
|
|
}
|