Files
thehub/libs/p2p/Peer.cpp
T

479 lines
18 KiB
C++
Raw Permalink Normal View History

2020-04-17 19:33:06 +02:00
/*
* This file is part of the Flowee project
* Copyright (C) 2020-2021 Tom Zander <tom@flowee.org>
2020-04-17 19:33:06 +02:00
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Peer.h"
#include "ConnectionManager.h"
#include "PrivacySegment.h"
#include "InventoryItem.h"
#include "BroadcastTxData.h"
2020-04-17 19:33:06 +02:00
2020-05-09 19:58:44 +02:00
#include <version.h>
2020-04-17 19:33:06 +02:00
#include <streaming/P2PParser.h>
#include <streaming/P2PBuilder.h>
#include <boost/asio/error.hpp>
2020-05-05 22:53:25 +02:00
Peer::Peer(ConnectionManager *parent, const PeerAddress &address)
2020-05-03 21:01:57 +02:00
: m_peerAddress(address),
m_peerStatus(Connecting),
2020-04-17 19:33:06 +02:00
m_connectionManager(parent)
{
assert(m_peerAddress.isValid());
m_peerAddress.setInUse(true);
m_timeOffset = time(nullptr);
}
Peer::~Peer()
{
assert(m_peerAddress.isValid());
m_peerAddress.setInUse(false);
2020-05-18 14:32:34 +02:00
if (m_segment)
m_segment->removeListener(this);
2020-05-05 22:53:25 +02:00
}
void Peer::connect(NetworkConnection && server)
{
m_con = std::move(server);
m_con.setOnConnected(std::bind(&Peer::connected, shared_from_this(), std::placeholders::_1));
m_con.setOnDisconnected(std::bind(&Peer::disconnected, shared_from_this(), std::placeholders::_1));
m_con.setOnIncomingMessage(std::bind(&Peer::processMessage, shared_from_this(), std::placeholders::_1));
m_con.setMessageHeaderLegacy(true);
2021-08-05 23:03:19 +02:00
m_con.setMessageQueueSizes(50, 3);
2020-05-06 18:38:37 +02:00
m_con.connect();
2020-04-17 19:33:06 +02:00
}
2020-04-27 15:47:15 +02:00
void Peer::shutdown()
2020-04-17 19:33:06 +02:00
{
2020-05-05 22:53:25 +02:00
m_peerStatus = ShuttingDown;
2020-05-07 18:26:41 +02:00
m_con.shutdown(); // forgets callbacks (shared ptrs) to us.
2020-04-17 19:33:06 +02:00
}
void Peer::connected(const EndPoint &endPoint)
{
m_peerStatus = Connected;
2020-04-26 16:20:45 +02:00
m_connectTime = time(nullptr);
2020-04-17 19:33:06 +02:00
logDebug() << "connected. Peer:" << connectionId();
// send the version message.
auto &pool = m_connectionManager->pool(400);
Streaming::P2PBuilder builder(pool);
builder.writeInt(PROTOCOL_VERSION);
builder.writeLong(m_connectionManager->servicesBitfield());
builder.writeLong((uint64_t) time(nullptr));
// Version msg: target address
builder.writeLong((uint64_t) 2); // services again
char buf[16];
memset(buf, 0, 16);
buf[10] = buf[11] = 0xff; // mark address as an IPv4 one
builder.writeByteArray(buf, 16, Streaming::RawBytes);
builder.writeWord(endPoint.announcePort);
// Version msg: my address
builder.writeLong(3); // services again
builder.writeByteArray(buf, 16, Streaming::RawBytes);
builder.writeWord(7); // port
// Version msg: my status
builder.writeLong(m_connectionManager->appNonce());
builder.writeString(m_connectionManager->userAgent(), Streaming::WithLength);
builder.writeInt(m_connectionManager->blockHeight());
builder.writeBool(/* relay-txs */ false);
// version is always the first thing they expect on connect
Message message = builder.message(Api::P2P::Version);
2020-05-05 10:56:57 +02:00
logDebug().nospace() << "peer: " << connectionId() << ", sending message (" << message.body().size() << " bytes)";
2020-04-17 19:33:06 +02:00
m_con.send(message);
}
void Peer::disconnected(const EndPoint &)
{
logDebug() << "Disconnected. Peer:" << connectionId();
2020-05-03 21:01:57 +02:00
if (m_peerStatus == ShuttingDown)
return;
2020-05-06 10:42:58 +02:00
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
2020-04-17 19:33:06 +02:00
}
void Peer::processMessage(const Message &message)
{
2020-05-03 21:01:57 +02:00
if (m_peerStatus == ShuttingDown)
return;
2020-04-17 19:33:06 +02:00
try {
2020-05-20 15:20:09 +02:00
logDebug() << "Peer:" << connectionId() << "messageId:"
2020-04-17 19:33:06 +02:00
<< message.header().constData() << "of" << message.body().size() << "bytes";
if (message.messageId() == Api::P2P::Version) {
Streaming::P2PParser parser(message);
m_protocolVersion = parser.readInt();
m_services = parser.readLong();
2020-04-26 16:20:45 +02:00
const auto now = time(nullptr);
// offset of node, adjusted with the round-trip time
m_timeOffset = (now - parser.readLong()) - (now - m_connectTime);
2020-04-17 19:33:06 +02:00
// address
parser.skip(8 + 16 + 2); // IP (and services and port) of them
parser.skip(8 + 16 + 2); // IP of me.
parser.skip(8); // nonce
m_userAgent = parser.readString();
2020-11-13 20:11:06 +01:00
m_startHeight = m_peerHeight = parser.readInt();
2020-04-17 19:33:06 +02:00
m_relaysTransactions = parser.readBool();
2020-05-10 11:51:17 +02:00
logCritical() << "Peer:" << connectionId() << "is connected to" << m_userAgent
<< "Address:" << m_peerAddress;
2020-04-17 19:33:06 +02:00
m_con.send(Message(Api::LegacyP2P, Api::P2P::VersionAck));
m_con.send(Message(Api::LegacyP2P, Api::P2P::PreferHeaders));
2020-05-06 10:42:58 +02:00
m_connectionManager->connectionEstablished(shared_from_this());
2020-04-17 19:33:06 +02:00
m_peerAddress.successfullyConnected();
}
else if (message.messageId() == Api::P2P::Ping) {
m_con.send(Message(message.body(), Api::LegacyP2P, Api::P2P::Pong));
}
else if (message.messageId() == Api::P2P::PreferHeaders) {
m_preferHeaders = true;
}
else if (message.messageId() == Api::P2P::Headers) {
m_receivedHeaders = true;
m_connectionManager->addBlockHeaders(message, connectionId());
}
else if (message.messageId() == Api::P2P::RejectData) {
Streaming::P2PParser parser(message);
const std::string messageType = parser.readString();
const int errorCode = parser.readByte() ;
const std::string errorMessage = parser.readString();
logWarning() << "Reject received for" << messageType
<< errorCode << errorMessage;
// check m_transactions to call callback.
if (messageType == "tx") {
// we can only forward it if the additional info is a txid
uint256 txid;
try {
txid = parser.readUint256();
} catch (...) {
logDebug() << "No txid present in the reject message";
return;
}
for (auto i = m_transactions.begin(); i != m_transactions.end();) {
auto txOwner = i->lock();
if (txOwner) {
if (txOwner->hash() == txid) {
txOwner->txRejected(
static_cast<BroadcastTxData::RejectReason>(errorCode),
errorMessage);
m_transactions.erase(i); // it can only be rejected once...
return;
}
2020-11-05 16:56:02 +01:00
++i;
} else {
i = m_transactions.erase(i);
}
}
}
2020-04-17 19:33:06 +02:00
}
else if (message.messageId() == Api::P2P::Addresses) {
m_connectionManager->addAddresses(message, connectionId());
}
else if (message.messageId() == Api::P2P::Inventory) {
m_connectionManager->addInvMessage(message, connectionId());
}
else if (message.messageId() == Api::P2P::Data_Transaction) {
Tx tx(message.body());
if (m_segment)
processTransaction(tx);
else
m_connectionManager->addTransaction(tx, connectionId());
}
2021-02-04 17:40:06 +01:00
else if (message.messageId() == Api::P2P::Data_DSProof) {
logCritical() << "Received a DoubleSpendProof from peer:" << connectionId() << "Not implemented yet.";
}
2020-04-17 19:33:06 +02:00
else if (message.messageId() == Api::P2P::Data_MerkleBlock) {
if (!m_segment) { // Received merkleblock without asking for one
2020-10-28 22:06:15 +01:00
logWarning() << "Peer sent merkleblock without us askign for one. PeerId:" << connectionId();
2020-04-17 19:33:06 +02:00
m_connectionManager->punish(PUNISHMENT_MAX);
return;
}
Streaming::P2PParser parser(message);
auto header = BlockHeader::fromMessage(parser);
int blockHeight = m_connectionManager->blockHeightFor(header.createHash());
if (blockHeight == -1) { // not on our chain (anymore)
2020-10-28 22:06:15 +01:00
logWarning() << "Peer sent merkleblock not on our chain (anymore). PeerId:" << connectionId();
2020-04-17 19:33:06 +02:00
m_connectionManager->punish(PUNISHMENT_MAX);
return;
}
CPartialMerkleTree tree = CPartialMerkleTree::construct(parser);
if (tree.ExtractMatches(m_transactionHashes) != header.hashMerkleRoot) {
m_transactionHashes.clear();
m_merkleBlockHeight = -1;
throw Streaming::ParsingException("Bad merkle tree received");
}
if (!m_blockTransactions.empty())
throw Streaming::ParsingException("Did not receive all promised Txs for MerkleBlock");
2020-05-14 22:50:51 +02:00
m_merkleHeader = header;
2020-04-17 19:33:06 +02:00
m_merkleBlockHeight = blockHeight;
m_lastReceivedMerkle = blockHeight;
m_segment->blockSynched(blockHeight);
logDebug() << "Merkle received by" << connectionId() << "height:" << blockHeight;
if (m_lastReceivedMerkle == m_merkleDownloadTo) {
assert(m_segment);
const auto filterChangedHeight = m_segment->filterChangedHeight();
logDebug() << "Doing another round of merkle downloads" << m_merkleDownloadFrom << filterChangedHeight << m_merkleDownloadTo << m_bloomUploadHeight;
if (filterChangedHeight > m_merkleDownloadFrom) {
// the wallet updated the bloom filter AFTER we requested merkle blocks,
// we need to re-request those blocks with the updated filter.
sendFilter_priv(); // then send updated filter
const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom;
logDebug() << " filter changed, re-downloading them (n =" << count << "blocks)";
Streaming::P2PBuilder builder(m_connectionManager->pool(40 * count));
builder.writeCompactSize(count);
for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) {
builder.writeInt(InventoryItem::MerkleBlock);
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
}
m_con.send(builder.message(Api::P2P::GetData));
}
2020-11-13 20:11:06 +01:00
startMerkleDownload(m_merkleDownloadTo + 1);
}
2020-04-17 19:33:06 +02:00
}
else if (message.messageId() == Api::P2P::GetData) {
Streaming::P2PParser parser(message);
const size_t count = parser.readCompactInt();
logDebug() << "Received" << count << "Inv requests using GetData";
for (size_t i = 0; i < count; ++i) {
const uint32_t type = parser.readInt();
auto inv = InventoryItem(parser.readUint256(), type);
if (type != InventoryItem::TransactionType) { // ignore stupid question
m_connectionManager->punish(connectionId(), 10);
continue;
}
for (auto i = m_transactions.begin(); i != m_transactions.end();) {
auto tx = i->lock();
if (tx.get()) {
if (tx->hash() == inv.hash()) {
Streaming::P2PBuilder builder(m_connectionManager->pool(tx->transaction().size()));
builder.writeByteArray(tx->transaction().data(), Streaming::RawBytes);
m_con.send(builder.message(Api::P2P::Data_Transaction));
tx->sentOne();
break;
}
++i;
} else {
i = m_transactions.erase(i);
}
}
}
}
2020-04-17 19:33:06 +02:00
} catch (const Streaming::ParsingException &e) {
logCritical() << "Parsing failure" << e << "peer=" << m_con.connectionId();
m_peerAddress.punishPeer(200);
2020-05-06 10:42:58 +02:00
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
2020-04-17 19:33:06 +02:00
}
}
2020-05-18 14:32:34 +02:00
void Peer::sendFilter_priv()
2020-04-17 19:33:06 +02:00
{
2020-05-03 21:01:57 +02:00
if (m_peerStatus == ShuttingDown)
return;
logDebug() << "Sending filter (priv)! Now at height" << m_segment->lastBlockSynched();
2020-04-17 19:33:06 +02:00
assert(m_segment);
auto buf = m_segment->writeFilter(m_connectionManager->pool(0));
m_con.send(Message(buf, Api::LegacyP2P, Api::P2P::FilterLoad));
m_bloomUploadHeight = m_segment->lastBlockSynched();
}
2020-05-11 18:49:16 +02:00
void Peer::setRequestedHeader(bool requestedHeader)
{
m_requestedHeader = requestedHeader;
}
bool Peer::requestedHeader() const
{
return m_requestedHeader;
}
2020-04-26 16:20:45 +02:00
uint32_t Peer::connectTime() const
{
return m_connectTime;
}
2020-04-17 19:33:06 +02:00
void Peer::sendFilter(const CBloomFilter &bloom, int blockHeight)
{
2020-05-03 21:01:57 +02:00
if (m_peerStatus == ShuttingDown)
return;
logDebug() << "Sending filter! Now at height" << m_segment->lastBlockSynched();
2020-04-17 19:33:06 +02:00
Streaming::P2PBuilder builder(m_connectionManager->pool(bloom.GetSerializeSize(0, 0)));
bloom.store(builder);
m_con.send(Message(builder.buffer(), Api::LegacyP2P, Api::P2P::FilterLoad));
m_bloomUploadHeight = blockHeight;
}
int Peer::lastReceivedMerkle() const
{
return m_lastReceivedMerkle;
}
bool Peer::merkleDownloadInProgress() const
{
return m_merkleDownloadFrom >= m_bloomUploadHeight // started
2020-11-13 20:11:06 +01:00
&& m_merkleDownloadFrom <= m_merkleDownloadTo; // and has not stopped yet
2020-04-17 19:33:06 +02:00
}
void Peer::startMerkleDownload(int from)
{
2020-05-03 21:01:57 +02:00
if (m_peerStatus == ShuttingDown)
return;
2020-04-17 19:33:06 +02:00
assert(m_segment);
if (m_bloomUploadHeight < m_segment->filterChangedHeight() // filter changed since we uploaded it.
&& m_merkleDownloadFrom >= m_segment->filterChangedHeight()) // unless this method uploaded it
2020-05-18 14:32:34 +02:00
sendFilter_priv(); // then send updated filter
2020-04-17 19:33:06 +02:00
m_merkleDownloadFrom = from;
// we limit our MerkleBlock list to 10 per iteration.
m_merkleDownloadTo = std::min(m_merkleDownloadFrom + 9,
2020-11-13 20:11:06 +01:00
std::min(m_peerHeight, m_connectionManager->blockHeight()));
2020-04-17 19:33:06 +02:00
2020-11-13 20:11:06 +01:00
const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom;
if (count == 0) {
// send one last message to get the transactions (via INV)
// from the remote peers mempool.
m_con.send(Message(Api::LegacyP2P, Api::P2P::Mempool));
2020-05-03 21:01:57 +02:00
return;
}
2020-04-17 19:33:06 +02:00
Streaming::P2PBuilder builder(m_connectionManager->pool(40 * count));
builder.writeCompactSize(count);
2020-11-13 20:11:06 +01:00
for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) {
2020-04-17 19:33:06 +02:00
// write INV data-type
2020-06-08 21:31:00 +02:00
builder.writeInt(InventoryItem::MerkleBlock);
2020-04-17 19:33:06 +02:00
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
}
m_con.send(builder.message(Api::P2P::GetData));
}
int Peer::bloomUploadHeight() const
{
return m_bloomUploadHeight;
}
bool Peer::receivedHeaders() const
{
return m_receivedHeaders;
}
void Peer::setPrivacySegment(PrivacySegment *ps)
{
assert(ps);
if (ps != m_segment) {
assert(m_segment == nullptr);
m_segment = ps;
2020-05-18 14:32:34 +02:00
m_segment->addListener(this);
sendFilter_priv();
2020-04-17 19:33:06 +02:00
}
}
void Peer::sendTx(const std::shared_ptr<BroadcastTxData> &txOwner)
{
// move to our thread to avoid concurrency issues with the deque
m_con.postOnStrand(std::bind(&Peer::registerTxToSend, this, txOwner));
// send INV
Streaming::P2PBuilder builder(m_connectionManager->pool(40));
builder.writeCompactSize(1); // inv-count
builder.writeInt(InventoryItem::TransactionType);
builder.writeByteArray(txOwner->transaction().createHash(), Streaming::RawBytes);
m_con.send(builder.message(Api::P2P::Inventory));
}
2020-04-17 19:33:06 +02:00
void Peer::processTransaction(const Tx &tx)
{
2020-05-03 21:01:57 +02:00
if (m_peerStatus == ShuttingDown)
return;
2020-04-17 19:33:06 +02:00
if (m_merkleBlockHeight > 0) {
assert(m_segment);
const uint256 txHash = tx.createHash();
for (auto iter = m_transactionHashes.begin(); iter != m_transactionHashes.end(); ++iter) {
if (txHash == *iter) {
m_transactionHashes.erase(iter);
const int bh = m_merkleBlockHeight;
m_blockTransactions.push_back(tx);
2020-04-17 19:33:06 +02:00
if (m_transactionHashes.empty()) {
// done with this block
2020-04-17 19:33:06 +02:00
m_merkleBlockHeight = -1;
assert(m_segment);
m_segment->newTransactions(m_merkleHeader, bh, m_blockTransactions);
m_blockTransactions.clear();
2020-04-17 19:33:06 +02:00
}
return;
}
}
}
// must be a mempool transaction then.
m_segment->newTransaction(tx);
}
bool Peer::preferHeaders() const
{
return m_preferHeaders;
}
bool Peer::relaysTransactions() const
{
return m_relaysTransactions;
}
int Peer::startHeight() const
{
return m_startHeight;
}
std::string Peer::userAgent() const
{
return m_userAgent;
}
int Peer::protocolVersion() const
{
return m_protocolVersion;
}
int Peer::timeOffset() const
{
return m_timeOffset;
}
uint64_t Peer::services() const
{
return m_services;
}
2020-05-18 14:32:34 +02:00
void Peer::filterUpdated()
{
if (!merkleDownloadInProgress()) // the merkle download will take care of filter-uploads.
m_con.postOnStrand(std::bind(&Peer::sendFilter_priv, shared_from_this()));
2020-05-18 14:32:34 +02:00
}
void Peer::registerTxToSend(std::shared_ptr<BroadcastTxData> txOwner)
{
m_transactions.push_back(txOwner);
}
2020-11-13 20:11:06 +01:00
int Peer::peerHeight() const
{
return m_peerHeight;
}
void Peer::updatePeerHeight(int peerHeight)
{
m_peerHeight = std::max(peerHeight, m_peerHeight);
}