Files
thehub/libs/p2p/DownloadManager.h
T
tomFlowee fb927b9436 Continue shared_ptr convertion of Peer
Last year we started wrapping the Peer object in a shared pointer,
which is used now in the managers.
This continues the idea by making the PrivacySegment use smart
pointers too and the P2PNetInterface is changed to do the same
for downstream applications.
2024-01-04 17:06:32 +01:00

222 lines
7.1 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2020-2024 Tom Zander <tom@flowee.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef DOWNLOADMANAGER_H
#define DOWNLOADMANAGER_H
#include "ConnectionManager.h"
#include "Action.h"
#include "Blockchain.h"
#include "InventoryItem.h"
#include "P2PNet.h"
#include "NotificationCenter.h"
#include <uint256.h>
#include <condition_variable>
#include <deque>
#include <vector>
#include <functional>
#include <boost/asio/io_context_strand.hpp>
#include <boost/unordered_map.hpp>
class P2PNetInterface;
class HeaderSyncInterface;
class DataListenerInterface;
/**
* A download manager as its name implies should manage what is to be downloaded.
* various peers could tell us about stuff that needs to be downloaded and we should
* round-robin over those that supply this info to actually download it.
*/
class DownloadManager
{
public:
/**
* @brief DownloadManager constructor.
* @param service the IO-service. See WorkerThreads
* @param basedir the directory to load and save state data.
*/
DownloadManager(boost::asio::io_service &service, const boost::filesystem::path &basedir, P2PNet::Chain chain = P2PNet::MainChain);
// start reaching out and synchronizing.
void start();
const ConnectionManager &connectionManager() const;
ConnectionManager &connectionManager();
inline Blockchain &blockchain() {
return m_blockchain;
}
inline const Blockchain &blockchain() const {
return m_blockchain;
}
NotificationCenter &notifications() {
return m_notifications;
}
/// We sync our chain using headers only, this returns true if we found the tip
inline bool isChainUpToDate() {
return std::abs(m_blockchain.expectedBlockHeight() - m_blockchain.height()) < 3;
}
inline uint64_t servicesBitfield() const {
return m_connectionManager.servicesBitfield();
}
inline void setServicesBitfield(const uint64_t &servicesBitfield) {
m_connectionManager.setServicesBitfield(servicesBitfield);
}
/// Sync-height, cached.
inline int blockHeight() const {
return m_connectionManager.blockHeight();
}
/// called by the blockchain to let us know the blockchain changed size
void headersDownloadFinished(int newBlockHeight, int peerId);
inline boost::asio::io_context::strand &strand() {
return m_strand;
}
inline boost::asio::io_service &service() {
return m_connectionManager.service();
}
/// Returns which chain this DownloadManager is following.
P2PNet::Chain chain() const;
/**
* returns peerId that is downloading headers. Or -1 if nobody is.
*/
inline int peerDownloadingHeaders() const {
return m_peerDownloadingHeaders;
}
/// Handle INV messages a peer received.
void parseInvMessage(Message message, int sourcePeerId);
/// Handle a transaction a peer received.
void parseTransaction(Tx tx, int sourcePeerId);
/// notify that a peer is no longer connected.
void peerDisconnected(int connectionId);
// Callback to let us know the data is invalid.
// This typically leads us to ban the peer.
void reportDataFailure(int connectionId);
/// Create and run an action.
/// This will avoid duplicates by returning a running action if it already exists.
/// Actions are owned and deleted by the downloadmanager.
template<class T>
T* addAction() {
T *t;
{ // lock scope
std::unique_lock<std::mutex> lock(m_lock);
if (m_shuttingDown) return nullptr;
for (auto a : m_runningActions) {
t = dynamic_cast<T*>(a);
if (t)
return t;
}
t = new T(this);
m_runningActions.push_back(t);
}
t->start();
return t;
}
/// called by an action when it finds itself out of work.
void done(Action *action);
/// Observer pattern subscribe method.
void addDataListener(DataListenerInterface *listener);
/// Observer pattern unsubscribe method.
void removeDataListener(DataListenerInterface *listener);
/// Observer pattern subscribe method.
void addP2PNetListener(P2PNetInterface *listener);
/// Observer pattern unsubscribe method.
void removeP2PNetListener(P2PNetInterface *listener);
/**
* \internal return a copy of the listeners
* Due to the fact that this just lobs around raw pointers, this method is
* marked internal and only for usage by the ConnectionManager
*/
std::vector<P2PNetInterface*> p2pNetListeners() const;
/// Observer pattern subscribe method.
void addHeaderListener(HeaderSyncInterface *listener);
/// Observer pattern unsubscribe method.
void removeHeaderListener(HeaderSyncInterface *listener);
/// Shut down this download manager, the connection manager and others.
/// It is required to call this prior to calling the destructor in order to
/// cleanly shut down the system and stop all tasks in all threads.
/// This method is blocking and wont return until finished.
void shutdown();
/**
* Flush all changes to disk.
*/
void saveData();
void getMoreHeaders();
private:
void finishShutdown();
boost::asio::io_context::strand m_strand;
const P2PNet::Chain m_chain;
ConnectionManager m_connectionManager;
Blockchain m_blockchain;
NotificationCenter m_notifications;
std::deque<Action*> m_runningActions;
mutable std::mutex m_lock;
std::atomic<bool> m_shuttingDown;
std::condition_variable m_waitVariable;
int m_peerDownloadingHeaders = -1;
std::vector<DataListenerInterface*> m_dataListeners;
std::vector<P2PNetInterface*> m_netListeners;
std::vector<HeaderSyncInterface*> m_headerSyncListeners;
void runQueue(); ///< find new items to download
struct DownloadTarget {
DownloadTarget(const InventoryItem &item) : inv(item) {}
InventoryItem inv;
std::vector<int> sourcePeers;
};
struct ActiveDownload {
uint32_t targetId = 0;
uint32_t downloadStartTime = 0;
int primary = -1, secondary = -1; // downloaders.
};
mutable std::mutex m_downloadsLock;
std::map<uint32_t, DownloadTarget> m_downloadQueue;
typedef boost::unordered_map<uint256, uint32_t, HashShortener> DownloadTargetIds;
DownloadTargetIds m_downloadTargetIds; // uint256 -> downloadQueue-Id
uint32_t m_nextDownloadTarget = 1;
std::array<ActiveDownload, 10> m_downloads;
bool m_isBehind; // we expect new headers
};
#endif