Files
thehub/libs/p2p/ConnectionManager.cpp
T

491 lines
16 KiB
C++
Raw Permalink Normal View History

2020-04-17 19:33:06 +02:00
/*
* This file is part of the Flowee project
* Copyright (C) 2020 Tom Zander <tomz@freedommail.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ConnectionManager.h"
#include "DownloadManager.h"
#include "P2PNetInterface.h"
#include "Peer.h"
#include "PrivacySegment.h"
2020-05-18 19:49:24 +02:00
#include <random.h>
2020-04-17 19:33:06 +02:00
#include <streaming/BufferPool.h>
#include <streaming/P2PBuilder.h>
#include <APIProtocol.h>
#include <version.h> // for PROTOCOL_VERSION
2020-05-11 12:49:10 +02:00
ConnectionManager::ConnectionManager(boost::asio::io_service &service, const boost::filesystem::path &basedir, DownloadManager *parent)
2020-04-17 19:33:06 +02:00
: m_shuttingDown(false),
m_ioService(service),
m_cronTimer(m_ioService),
m_peerAddressDb(this),
2020-05-09 19:58:44 +02:00
m_network(service),
2020-05-11 12:49:10 +02:00
m_dlManager(parent),
m_basedir(basedir)
2020-04-17 19:33:06 +02:00
{
// The nonce is used in the status message to allow detection of connect-to-self.
2020-05-18 19:49:24 +02:00
m_appNonce = GetRand(-1);
2020-04-17 19:33:06 +02:00
std::map<int, std::string> table;
table.insert(std::make_pair(Api::P2P::Version, "version"));
table.insert(std::make_pair(Api::P2P::VersionAck, "verack"));
table.insert(std::make_pair(Api::P2P::Ping, "ping"));
table.insert(std::make_pair(Api::P2P::Pong, "pong"));
table.insert(std::make_pair(Api::P2P::PreferHeaders, "sendheaders"));
table.insert(std::make_pair(Api::P2P::GetHeaders, "getheaders"));
table.insert(std::make_pair(Api::P2P::Headers, "headers"));
table.insert(std::make_pair(Api::P2P::RejectData, "reject"));
table.insert(std::make_pair(Api::P2P::Inventory, "inv"));
table.insert(std::make_pair(Api::P2P::GetAddr, "getaddr"));
table.insert(std::make_pair(Api::P2P::Addresses, "addr"));
table.insert(std::make_pair(Api::P2P::Inventory, "inv"));
table.insert(std::make_pair(Api::P2P::Data_Transaction, "tx"));
table.insert(std::make_pair(Api::P2P::Data_MerkleBlock, "merkleblock"));
table.insert(std::make_pair(Api::P2P::FilterLoad, "filterload"));
table.insert(std::make_pair(Api::P2P::FilterClear, "filterclear"));
table.insert(std::make_pair(Api::P2P::GetData, "getdata"));
m_network.setMessageIdLookup(table);
m_cronTimer.expires_from_now(boost::posix_time::seconds(20));
m_cronTimer.async_wait(parent->strand().wrap(std::bind(&ConnectionManager::cron, this, std::placeholders::_1)));
2020-05-09 19:58:44 +02:00
m_userAgent = "Flowee-P2PNet-based app";
2020-05-11 12:49:10 +02:00
m_peerAddressDb.loadDatabase(m_basedir);
2020-04-17 19:33:06 +02:00
}
void ConnectionManager::addInvMessage(const Message &message, int sourcePeerId)
{
if (m_shuttingDown)
return;
m_dlManager->strand().post(std::bind(&DownloadManager::parseInvMessage, m_dlManager, message, sourcePeerId));
}
void ConnectionManager::addTransaction(const Tx &message, int sourcePeerId)
{
if (m_shuttingDown)
return;
m_dlManager->strand().post(std::bind(&DownloadManager::parseTransaction, m_dlManager, message, sourcePeerId));
}
thread_local Streaming::BufferPool m_buffer;
Streaming::BufferPool &ConnectionManager::pool(int reserveSize)
{
m_buffer.reserve(reserveSize);
return m_buffer;
}
void ConnectionManager::connect(PeerAddress &address)
{
if (m_shuttingDown)
return;
auto con = m_network.connection(address.peerAddress());
std::unique_lock<std::mutex> lock(m_lock);
// first check if we are already have a Peer for this endpoint
if (m_peers.find(con.connectionId()) == m_peers.end()) {
address.punishPeer(100); // when the connection succeeds, we remove the 100 again.
2020-05-03 21:01:57 +02:00
con.setOnError(std::bind(&ConnectionManager::handleError, this, std::placeholders::_1, std::placeholders::_2));
2020-05-10 00:46:41 +02:00
con.setMessageQueueSizes(m_queueSize, 1);
2020-05-05 22:53:25 +02:00
auto p = std::make_shared<Peer>(this, address);
p->connect(std::move(con));
2020-04-17 19:33:06 +02:00
m_peers.insert(std::make_pair(p->connectionId(), p));
}
}
2020-05-06 10:42:58 +02:00
void ConnectionManager::disconnect(const std::shared_ptr<Peer> &peer)
2020-04-26 16:20:45 +02:00
{
2020-05-03 21:01:57 +02:00
assert(peer);
if (m_shuttingDown)
return;
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->lostPeer(peer->connectionId());
}
2020-04-26 16:20:45 +02:00
std::unique_lock<std::mutex> lock(m_lock);
2020-05-03 21:01:57 +02:00
assert(m_peers.find(peer->connectionId()) != m_peers.end());
2020-04-26 16:20:45 +02:00
removePeer(peer);
}
2020-04-17 19:33:06 +02:00
uint64_t ConnectionManager::servicesBitfield() const
{
return m_servicesBitfield;
}
void ConnectionManager::setServicesBitfield(const uint64_t &servicesBitfield)
{
m_servicesBitfield = servicesBitfield;
}
int ConnectionManager::blockHeight() const
{
return m_blockHeight;
}
void ConnectionManager::setBlockHeight(int blockHeight)
{
2020-05-11 23:15:30 +02:00
assert(blockHeight >= 0);
2020-04-17 19:33:06 +02:00
m_blockHeight = blockHeight;
}
int ConnectionManager::blockHeightFor(const uint256 &blockId)
{
return m_dlManager->blockchain().blockHeightFor(blockId);
}
uint256 ConnectionManager::blockHashFor(int height)
{
return m_dlManager->blockchain().block(height).createHash();
}
uint64_t ConnectionManager::appNonce() const
{
return m_appNonce;
}
2020-05-06 10:42:58 +02:00
void ConnectionManager::connectionEstablished(const std::shared_ptr<Peer> &peer)
2020-04-17 19:33:06 +02:00
{
if (m_shuttingDown)
return;
assert(peer);
assert(peer->peerAddress().isValid());
peer->peerAddress().punishPeer(-100); // this mirrors the 100 when we started connecting.
peer->peerAddress().setServices(peer->services());
std::unique_lock<std::mutex> lock(m_lock);
// don't use if the client doesn't support any usable services.
if (!peer->supplies_bloom() || !peer->supplies_network()) {
2020-05-11 19:54:49 +02:00
logWarning() << "Rejecting. Need BLOOM and NETWORK. Peer:" << peer->connectionId() << peer->userAgent()
2020-05-10 11:51:17 +02:00
<< peer->peerAddress();
2020-04-17 19:33:06 +02:00
removePeer(peer);
return;
}
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->newPeer(peer->connectionId(), peer->userAgent(), peer->startHeight(),
peer->peerAddress());
}
2020-05-05 22:53:25 +02:00
auto peerIter = m_peers.find(peer->connectionId());
assert(peerIter != m_peers.end());
2020-04-17 19:33:06 +02:00
m_connectedPeers.insert(peer->connectionId());
2020-05-11 18:49:16 +02:00
if (time(nullptr) - peer->peerAddress().lastReceivedGoodHeaders() > 3600 * 36) {
2020-04-17 19:33:06 +02:00
// check if this peer is using the same chain as us.
2020-05-05 22:53:25 +02:00
requestHeaders(peerIter->second);
2020-04-17 19:33:06 +02:00
}
const auto previousSegment = peer->peerAddress().segment();
if (previousSegment == 0) {
std::map<PrivacySegment*, int> segmentUsage;
for (auto ps : m_segments) {
segmentUsage.insert(std::make_pair(ps, 0));
}
// now populate it with segment usage.
const auto thisId = peer->connectionId();
for (auto peerId : m_connectedPeers) {
if (peerId != thisId) {
auto i = m_peers.find(peerId);
assert(i != m_peers.end());
auto s = i->second->privacySegment();
if (s) {
auto segmentIter = segmentUsage.find(s);
segmentIter->second++;
}
}
}
PrivacySegment *best = nullptr;
int usageCount = 1000;
for (auto s : segmentUsage) {
if (s.second < usageCount) {
usageCount = s.second;
best = s.first;
}
}
if (best) {
peer->setPrivacySegment(best);
peer->peerAddress().setSegment(best->segmentId());
}
} else {
for (auto ps : m_segments) {
if (ps->segmentId() == previousSegment) {
peer->setPrivacySegment(ps);
break;
}
}
}
}
void ConnectionManager::addBlockHeaders(const Message &message, int sourcePeerId)
{
if (m_shuttingDown)
return;
// TODO if downloadmanager triggered this (m_peerDownloadingHeaders)
// then update metadata on the speed of this peer.
m_dlManager->strand().post(std::bind(&Blockchain::processBlockHeaders,
&m_dlManager->blockchain(), message, sourcePeerId));
}
void ConnectionManager::addAddresses(const Message &message, int sourcePeerId)
{
if (m_shuttingDown)
return;
m_dlManager->strand().post(std::bind(&PeerAddressDB::processAddressMessage,
&m_peerAddressDb, message, sourcePeerId));
}
2020-05-07 15:38:54 +02:00
bool ConnectionManager::punish(const std::shared_ptr<Peer> &peer, int amount)
2020-04-17 19:33:06 +02:00
{
assert(peer);
if (m_shuttingDown)
2020-05-07 15:38:54 +02:00
return false;
2020-04-17 19:33:06 +02:00
auto address = peer->peerAddress();
short total = PUNISHMENT_MAX;
short previous = total;
if (address.isValid()) {
previous = address.punishment();
total = address.punishPeer(amount);
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->punishMentChanged(peer->connectionId());
}
}
if (total >= PUNISHMENT_MAX) { // too much punishment leads to a ban
2020-05-10 11:51:17 +02:00
logWarning() << "Ban peer:" << peer->connectionId() << previous << "=>" << total
<< "Address:" << peer->peerAddress();
2020-04-17 19:33:06 +02:00
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->lostPeer(peer->connectionId());
}
std::unique_lock<std::mutex> lock(m_lock);
2020-05-06 10:42:58 +02:00
removePeer(peer);
2020-05-07 15:38:54 +02:00
return true;
2020-04-17 19:33:06 +02:00
}
2020-05-07 15:38:54 +02:00
return false;
2020-04-17 19:33:06 +02:00
}
2020-05-07 15:38:54 +02:00
bool ConnectionManager::punish(int connectionId, int amount)
2020-04-17 19:33:06 +02:00
{
2020-05-05 22:53:25 +02:00
std::shared_ptr<Peer> p;
2020-04-17 19:33:06 +02:00
{
std::unique_lock<std::mutex> lock(m_lock);
auto peerIter = m_peers.find(connectionId);
if (peerIter == m_peers.end())
2020-05-07 15:38:54 +02:00
return false;
2020-04-17 19:33:06 +02:00
p = peerIter->second;
}
2020-05-07 15:38:54 +02:00
return punish(p, amount);
2020-04-17 19:33:06 +02:00
}
2020-05-06 10:42:58 +02:00
void ConnectionManager::requestHeaders(const std::shared_ptr<Peer> &peer)
2020-04-17 19:33:06 +02:00
{
if (m_shuttingDown)
return;
Streaming::P2PBuilder builder(pool(4 + 32 * 10));
builder.writeInt(PROTOCOL_VERSION);
auto message = m_dlManager->blockchain().createGetHeadersRequest(builder);
2020-05-11 18:49:16 +02:00
peer->setRequestedHeader(true);
2020-04-17 19:33:06 +02:00
peer->sendMessage(message);
}
2020-05-05 22:53:25 +02:00
std::deque<std::shared_ptr<Peer>> ConnectionManager::connectedPeers() const
2020-04-17 19:33:06 +02:00
{
std::unique_lock<std::mutex> lock(m_lock);
2020-05-05 22:53:25 +02:00
std::deque<std::shared_ptr<Peer>> answer;
2020-04-17 19:33:06 +02:00
if (m_shuttingDown)
return answer;
for (auto i : m_connectedPeers) {
auto p = m_peers.find(i);
assert(p != m_peers.end());
answer.push_back(p->second);
}
return answer;
}
2020-05-05 22:53:25 +02:00
std::shared_ptr<Peer> ConnectionManager::peer(int connectionId) const
2020-04-17 19:33:06 +02:00
{
std::unique_lock<std::mutex> lock(m_lock);
auto i = m_peers.find(connectionId);
if (i == m_peers.end())
return nullptr;
return i->second;
}
void ConnectionManager::addPrivacySegment(PrivacySegment *ps)
{
assert(ps);
#ifndef NDEBUG
// don't add it twice, please.
for (auto s : m_segments) {
assert (s != ps);
}
#endif
m_segments.push_back(ps);
}
void ConnectionManager::removePrivacySegment(PrivacySegment *ps)
{
assert(ps);
for (auto s = m_segments.begin(); s != m_segments.end(); ++s) {
if (ps == *s) {
m_segments.erase(s);
break;
}
}
return;
}
void ConnectionManager::setUserAgent(const std::string &userAgent)
{
m_userAgent = userAgent;
}
2020-04-26 16:20:45 +02:00
int ConnectionManager::peerCount() const
{
assert(m_peers.size() <= INT_MAX);
return int(m_peers.size());
}
2020-04-17 19:33:06 +02:00
void ConnectionManager::cron(const boost::system::error_code &error)
{
if (error)
return;
if (m_shuttingDown)
return;
m_cronTimer.expires_from_now(boost::posix_time::seconds(20));
m_cronTimer.async_wait(m_dlManager->strand().wrap(std::bind(&ConnectionManager::cron, this, std::placeholders::_1)));
logDebug() << "Cron";
int now = static_cast<uint32_t>(time(nullptr));
// check for connections that don't seem to connect.
std::unique_lock<std::mutex> lock(m_lock);
auto iter = m_peers.begin();
while (iter != m_peers.end()) {
2020-05-05 22:53:25 +02:00
Peer *peer = iter->second.get();
2020-05-11 17:42:44 +02:00
bool kick = peer->status() != Peer::Connected || peer->protocolVersion() == 0;
if (peer->connectTime() == 0) // not connected yet
kick &= now - peer->timeOffset() > 10; // no more than 10 sec to try to connect
else
kick &= now - peer->connectTime() > 20; // no more than 20 seconds for version handshake.
if (kick) {
2020-05-10 11:51:17 +02:00
auto peerAddress = peer->peerAddress();
2020-05-11 19:54:49 +02:00
logInfo() << "peer:" << iter->first << "kicking. Address:" << peerAddress;
2020-04-17 19:33:06 +02:00
iter = m_peers.erase(iter);
2020-05-10 11:51:17 +02:00
peer->shutdown();
2020-04-17 19:33:06 +02:00
}
else {
2020-05-11 19:54:49 +02:00
auto log = logInfo() << "peer:" << iter->first;
if (peer->status() == Peer::Connecting)
log << "Address:" << peer->peerAddress() << "[connecting]";
else
log << peer->userAgent();
2020-05-10 11:51:17 +02:00
if (peer->privacySegment())
log << "Wallet:" << peer->privacySegment()->segmentId();
2020-05-11 19:54:49 +02:00
if (peer->connectTime() > 0)
log.nospace() << "(" << now - peer->connectTime() << "s)";
2020-04-17 19:33:06 +02:00
++iter;
}
}
}
void ConnectionManager::handleError(int remoteId, const boost::system::error_code &error)
2020-04-26 16:20:45 +02:00
{
m_dlManager->strand().post(std::bind(&ConnectionManager::handleError_impl, this, remoteId, error));
}
2020-05-10 11:51:17 +02:00
void ConnectionManager::handleError_impl(int peerId, const boost::system::error_code &error)
2020-04-17 19:33:06 +02:00
{
if (m_shuttingDown)
return;
bool remove = false;
2020-05-07 15:38:54 +02:00
int punishment = 180; // unknown error
if (error == boost::asio::error::host_unreachable || error == boost::asio::error::network_unreachable
|| error.value() == EADDRNOTAVAIL) {
2020-04-17 19:33:06 +02:00
remove = true;
2020-05-07 15:38:54 +02:00
punishment = 900; // likely ipv6 while we don't have that.
2020-04-17 19:33:06 +02:00
}
2020-05-07 15:38:54 +02:00
else if (error == boost::asio::error::host_not_found) {
2020-04-17 19:33:06 +02:00
remove = true;
2020-05-07 15:38:54 +02:00
punishment = 450; // faulty DNS name.
2020-04-17 19:33:06 +02:00
}
2020-05-09 19:58:44 +02:00
else if (error == boost::asio::error::connection_refused
|| error == boost::asio::error::connection_aborted
|| error == boost::asio::error::connection_reset) {
remove = true;
2020-05-20 20:31:19 +02:00
punishment = 1; // to down-prioritize it on random connects
2020-05-09 19:58:44 +02:00
}
2020-05-10 11:51:17 +02:00
auto remotePeer = peer(peerId);
2020-05-07 17:01:03 +02:00
if (!remotePeer)
return;
2020-05-11 19:54:49 +02:00
logWarning().nospace() << "Peer: " << peerId << " got error. (" << error.value() << "=" << error.message()
<< ") Punishment: " << punishment;
2020-05-07 15:38:54 +02:00
bool removed = punish(remotePeer, punishment);
2020-04-17 19:33:06 +02:00
2020-05-07 15:38:54 +02:00
if (remove && !removed) {
2020-05-10 11:51:17 +02:00
logDebug() << "removing" << peerId;
2020-05-07 15:38:54 +02:00
for (auto iface : m_dlManager->p2pNetListeners()) {
iface->lostPeer(remotePeer->connectionId());
}
2020-04-17 19:33:06 +02:00
std::unique_lock<std::mutex> lock(m_lock);
2020-05-07 15:38:54 +02:00
removePeer(remotePeer);
2020-04-17 19:33:06 +02:00
}
}
2020-05-06 10:42:58 +02:00
void ConnectionManager::removePeer(const std::shared_ptr<Peer> &p)
2020-04-17 19:33:06 +02:00
{
const int id = p->connectionId();
2020-05-06 10:42:58 +02:00
p->shutdown();
2020-05-03 21:01:57 +02:00
2020-04-17 19:33:06 +02:00
auto i = m_connectedPeers.find(id);
if (i != m_connectedPeers.end()) {
m_connectedPeers.erase(i);
m_dlManager->peerDisconnected(id);
}
auto iter = m_peers.find(id);
assert (iter != m_peers.end());
m_peers.erase(iter);
}
std::deque<PrivacySegment *> ConnectionManager::segments() const
{
return m_segments;
}
void ConnectionManager::shutdown()
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_shuttingDown)
return;
m_shuttingDown = true;
m_cronTimer.cancel();
auto copy(m_peers);
for (auto peer : copy) {
2020-05-06 10:42:58 +02:00
removePeer(peer.second);
2020-04-17 19:33:06 +02:00
}
assert(m_peers.empty());
2020-05-11 12:49:10 +02:00
m_peerAddressDb.saveDatabase(m_basedir);
2020-04-17 19:33:06 +02:00
}
2020-05-09 19:58:44 +02:00
void ConnectionManager::setMessageQueueSize(int size)
{
assert(size >= 1);
assert(size <= 0xeFFF);
m_queueSize = static_cast<short>(size);
}