Files
thehub/libs/p2p/DownloadManager.h
T
TomZ 652fc1e161 Introduce structure to broadcast transactions.
To send out transactions in the p2p net is quite a lot of work,
you need to find multiple peers to send the transaction to. First
you send an INV, then you respond to a getData to actually send
the transaction and last you wait for 'reject' messages that may
indicate that there is something wrong with the transaction.

This introduces the BroadcastTxData class that wraps a transaction
and gets callbacks for sending and for rejects, abstracting away
all the complexity for the user.
2020-06-08 21:35:10 +02:00

186 lines
5.9 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2020 Tom Zander <tomz@freedommail.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef DOWNLOADMANAGER_H
#define DOWNLOADMANAGER_H
#include "ConnectionManager.h"
#include "Action.h"
#include "Blockchain.h"
#include "InventoryItem.h"
#include "DataListenerInterface.h"
#include <uint256.h>
#include <condition_variable>
#include <deque>
#include <vector>
#include <boost/asio/io_context_strand.hpp>
#include <boost/unordered_map.hpp>
class P2PNetInterface;
class TransactionBroadcastListener;
/**
* A download manager as its name implies should manage what is to be downloaded.
* various peers could tell us about stuff that needs to be downloaded and we should
* round-robin over those that supply this info to actually download it.
*/
class DownloadManager
{
public:
/**
* @brief DownloadManager constructor.
* @param service the IO-service. See WorkerThreads
* @param basedir the directory to load and save state data.
*/
DownloadManager(boost::asio::io_service &service, const boost::filesystem::path &basedir);
// start reaching out and synchronizing.
void start();
const ConnectionManager &connectionManager() const;
ConnectionManager &connectionManager();
inline Blockchain &blockchain() {
return m_blockchain;
}
inline const Blockchain &blockchain() const {
return m_blockchain;
}
inline uint64_t servicesBitfield() const {
return m_connectionManager.servicesBitfield();
}
inline void setServicesBitfield(const uint64_t &servicesBitfield) {
m_connectionManager.setServicesBitfield(servicesBitfield);
}
inline int blockHeight() const {
return m_connectionManager.blockHeight();
}
/// called by the blockchain to let us know the blockchain changed size
void headersDownloadFinished(int newBlockHeight, int peerId);
inline boost::asio::io_context::strand &strand() {
return m_strand;
}
inline boost::asio::io_service &service() {
return m_connectionManager.service();
}
/**
* returns peerId that is downloading headers. Or -1 if nobody is.
*/
inline int peerDownloadingHeaders() const {
return m_peerDownloadingHeaders;
}
/// Handle INV messages a peer received.
void parseInvMessage(Message message, int sourcePeerId);
/// Handle a transaction a peer received.
void parseTransaction(Tx tx, int sourcePeerId);
/// notify that a peer is no longer connected.
void peerDisconnected(int connectionId);
// Callback to let us know the data is invalid.
// This typically leads us to ban the peer.
void reportDataFailure(int connectionId);
/// Create and run an action.
/// This will avoid duplicates by returning a running action if it already exists.
/// Actions are owned and deleted by the downloadmanager.
template<class T>
T* addAction() {
T *t;
{ // lock scope
std::unique_lock<std::mutex> lock(m_lock);
if (m_shuttingDown) return nullptr;
for (auto a : m_runningActions) {
t = dynamic_cast<T*>(a);
if (t)
return t;
}
t =new T(this);
m_runningActions.push_back(t);
}
t->start();
return t;
}
/// called by an action when it finds itself out of work.
void done(Action *action);
/// Observer pattern subscribe method.
void addDataListener(DataListenerInterface *listener);
/// Observer pattern unsubscribe method.
void removeDataListener(DataListenerInterface *listener);
/// Observer pattern subscribe method.
void addP2PNetListener(P2PNetInterface *listener);
/// Observer pattern unsubscribe method.
void removeP2PNetListener(P2PNetInterface *listener);
const std::deque<P2PNetInterface*> &p2pNetListeners();
/// Shut down this download manager, the connection manager and others.
/// It is required to call this prior to calling the destructor in order to
/// cleanly shut down the system and stop all tasks in all threads.
/// This method is blocking and wont return until finished.
void shutdown();
private:
void finishShutdown();
boost::asio::io_context::strand m_strand;
ConnectionManager m_connectionManager;
Blockchain m_blockchain;
std::deque<P2PNetInterface*> m_listeners;
std::deque<Action*> m_runningActions;
mutable std::mutex m_lock;
std::atomic<bool> m_shuttingDown;
std::condition_variable m_waitVariable;
int m_peerDownloadingHeaders = -1;
std::vector<DataListenerInterface*> m_dataListeners;
void runQueue(); ///< find new items to download
struct DownloadTarget {
DownloadTarget(const InventoryItem &item) : inv(item) {}
InventoryItem inv;
std::vector<int> sourcePeers;
};
struct ActiveDownload {
uint32_t targetId = 0;
uint32_t downloadStartTime = 0;
int primary = -1, secondary = -1; // downloaders.
};
mutable std::mutex m_downloadsLock;
std::map<uint32_t, DownloadTarget> m_downloadQueue;
typedef boost::unordered_map<uint256, uint32_t, HashShortener> DownloadTargetIds;
DownloadTargetIds m_downloadTargetIds; // uint256 -> downloadQueue-Id
uint32_t m_nextDownloadTarget = 0;
std::array<ActiveDownload, 10> m_downloads;
};
#endif