Files
thehub/libs/p2p/Peer.cpp
tomFlowee 56561b4d98 Add new signal to broadcastTxData
This now also registers when a peer sends the INV.
2025-05-21 22:52:31 +02:00

526 lines
20 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2020-2025 Tom Zander <tom@flowee.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Peer.h"
#include "ConnectionManager.h"
#include "PrivacySegment.h"
#include "InventoryItem.h"
#include "BroadcastTxData.h"
#include <streaming/P2PParser.h>
#include <streaming/P2PBuilder.h>
#include <streaming/BufferPools.h>
#include <BitcoinVersion.h>
#include <boost/asio/error.hpp>
Peer::Peer(ConnectionManager *parent, const PeerAddress &address)
: m_peerAddress(address),
m_peerStatus(Connecting),
m_connectionManager(parent)
{
assert(m_peerAddress.isValid());
m_peerAddress.setInUse(true);
m_timeOffset = time(nullptr);
}
Peer::~Peer()
{
assert(m_peerAddress.isValid());
m_peerAddress.setInUse(false);
}
void Peer::connect(NetworkConnection && server)
{
m_con = std::move(server);
m_con.setOnConnected(std::bind(&Peer::connected, shared_from_this(), std::placeholders::_1));
m_con.setOnDisconnected(std::bind(&Peer::disconnected, shared_from_this(), std::placeholders::_1));
m_con.setOnIncomingMessage(std::bind(&Peer::processMessage, shared_from_this(), std::placeholders::_1));
m_con.setMessageHeaderLegacy(true);
m_con.setMessageQueueSizes(50, 3);
m_con.connect();
}
void Peer::shutdown()
{
m_peerStatus = ShuttingDown;
m_con.shutdown(); // forgets callbacks (shared ptrs) to us.
auto segment = m_segment.lock();
if (segment)
segment->removePeer(shared_from_this());
}
void Peer::connected(const EndPoint &endPoint)
{
m_peerStatus = Connected;
m_connectTime = time(nullptr);
logDebug() << "connected. Peer:" << connectionId();
// send the version message.
auto pool = Streaming::pool(400);
Streaming::P2PBuilder builder(pool);
builder.writeInt(PROTOCOL_VERSION);
builder.writeLong(m_connectionManager->servicesBitfield());
builder.writeLong((uint64_t) time(nullptr));
// Version msg: target address
builder.writeLong((uint64_t) 2); // services again
char buf[16];
memset(buf, 0, 16);
buf[10] = buf[11] = 0xff; // mark address as an IPv4 one
builder.writeByteArray(buf, 16, Streaming::RawBytes);
builder.writeWord(endPoint.announcePort);
// Version msg: my address
builder.writeLong(3); // services again
builder.writeByteArray(buf, 16, Streaming::RawBytes);
builder.writeWord(7); // port
// Version msg: my status
builder.writeLong(m_connectionManager->appNonce());
builder.writeString(m_connectionManager->userAgent(), Streaming::WithLength);
builder.writeInt(m_connectionManager->blockHeight());
builder.writeBool(/* relay-txs */ false);
// version is always the first thing they expect on connect
Message message = builder.message(Api::P2P::Version);
logDebug().nospace() << "peer: " << connectionId() << ", sending message (" << message.body().size() << " bytes)";
m_con.send(message);
}
void Peer::disconnected(const EndPoint &)
{
logDebug() << "Disconnected. Peer:" << connectionId();
if (m_peerStatus == ShuttingDown)
return;
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
}
void Peer::processMessage(const Message &message)
{
if (m_peerStatus == ShuttingDown)
return;
try {
logDebug() << "Peer:" << connectionId() << "type:"
<< message.header().constData() << "with body of" << message.body().size() << "bytes";
if (message.messageId() == Api::P2P::Version) {
Streaming::P2PParser parser(message);
m_protocolVersion = parser.readInt();
m_services = parser.readLong();
const auto now = time(nullptr);
// offset of node, adjusted with the round-trip time
m_timeOffset = (now - parser.readLong()) - (now - m_connectTime);
// address
parser.skip(8 + 16 + 2); // IP (and services and port) of them
parser.skip(8 + 16 + 2); // IP of me.
parser.skip(8); // nonce
m_userAgent = parser.readString();
m_startHeight = m_peerHeight = parser.readInt();
m_relaysTransactions = parser.readBool();
logCritical() << "Peer:" << connectionId() << "is connected to" << m_userAgent
<< "Address:" << m_peerAddress;
m_con.send(Message(Api::LegacyP2P, Api::P2P::VersionAck));
m_con.send(Message(Api::LegacyP2P, Api::P2P::PreferHeaders));
m_peerAddress.successfullyConnected(m_services); // update the peersDB with things like services
m_connectionManager->connectionEstablished(shared_from_this());
}
else if (message.messageId() == Api::P2P::Ping) {
m_con.send(Message(message.body(), Api::LegacyP2P, Api::P2P::Pong));
}
else if (message.messageId() == Api::P2P::PreferHeaders) {
m_preferHeaders = true;
}
else if (message.messageId() == Api::P2P::Headers) {
if (message.body().size() > 1)
m_receivedHeaders = true;
m_connectionManager->addBlockHeaders(message, connectionId());
}
else if (message.messageId() == Api::P2P::RejectData) {
Streaming::P2PParser parser(message);
const std::string messageType = parser.readString();
const int errorCode = parser.readByte() ;
const std::string errorMessage = parser.readString();
logWarning() << "Reject received for" << messageType
<< errorCode << errorMessage;
// check m_transactions to call callback.
if (messageType == "tx") {
// we can only forward it if the additional info is a txid
uint256 txid;
try {
txid = parser.readUint256();
} catch (...) {
logDebug() << "No txid present in the reject message";
return;
}
auto i = m_transactions.begin();
while (i != m_transactions.end()) {
auto txOwner = i->lock();
if (txOwner) {
if (txOwner->hash() == txid) {
txOwner->txRejected(connectionId(),
static_cast<BroadcastTxData::RejectReason>(errorCode),
errorMessage);
m_transactions.erase(i); // it can only be rejected once...
return;
}
++i;
} else {
i = m_transactions.erase(i);
}
}
}
}
else if (message.messageId() == Api::P2P::Addresses) {
m_connectionManager->addAddresses(message, connectionId());
}
else if (message.messageId() == Api::P2P::Inventory) {
m_connectionManager->addInvMessage(message, connectionId());
}
else if (message.messageId() == Api::P2P::Data_Transaction) {
auto segment = m_segment.lock();
Tx tx(message.body());
if (segment)
processTransaction(tx);
else
m_connectionManager->addTransaction(tx, connectionId());
}
else if (message.messageId() == Api::P2P::Data_DSProof) {
logCritical() << "Received a DoubleSpendProof from peer:" << connectionId() << "Not implemented yet.";
}
else if (message.messageId() == Api::P2P::Data_MerkleBlock) {
auto segment = m_segment.lock();
if (!segment) { // Received merkleblock without asking for one
logWarning() << "Peer sent merkleblock without us asking for one. PeerId:" << connectionId();
m_connectionManager->punish(shared_from_this(), PUNISHMENT_MAX);
return;
}
if (!segment->enabled()) // they want no more.
return;
Streaming::P2PParser parser(message);
auto header = BlockHeader::fromMessage(parser);
int blockHeight = m_connectionManager->blockHeightFor(header.createHash());
if (blockHeight == -1) { // not on our chain (anymore)
logWarning() << "Peer sent merkleblock not on our chain (anymore). PeerId:" << connectionId();
m_connectionManager->punish(shared_from_this(), PUNISHMENT_MAX);
return;
}
CPartialMerkleTree tree = CPartialMerkleTree::construct(parser);
if (tree.ExtractMatches(m_transactionHashes) != header.hashMerkleRoot) {
m_transactionHashes.clear();
m_merkleBlockHeight = -1;
throw Streaming::ParsingException("Bad merkle tree received");
}
if (!m_blockTransactions.empty())
throw Streaming::ParsingException("Did not receive all promised Txs for MerkleBlock");
m_merkleHeader = header;
m_merkleBlockHeight = blockHeight;
m_lastReceivedMerkle = blockHeight;
/*
* When we go back with 'another round of merkle downloads' we end up
* calling blockSynched again with a older number than we did before, which confuses
* the securitySegment in a bad way.
* Apart from us rolling back 10 blocks, the peer may also send us data we
* didn't ask for and try to confuse our internal state.
*
* So, lets make sure we only update our progress in one direction for this peer.
*/
if (blockHeight > m_highestMerkleReceived) {
m_highestMerkleReceived = blockHeight;
assert(segment);
segment->blockSynched(blockHeight);
}
logDebug() << "Merkle received by" << connectionId() << "height:" << blockHeight;
if (m_lastReceivedMerkle == m_merkleDownloadTo) {
assert(segment);
const auto filterChangedHeight = segment->filterChangedHeight();
logDebug() << "Doing another round of merkle downloads";
logDebug().nospace() << " +- height: " << m_merkleDownloadFrom << "-" << m_merkleDownloadTo
<< ", filterChanged: " << filterChangedHeight
<< ", filterUpload: " << m_bloomUploaded;
if (filterChangedHeight != m_bloomUploaded) {
// the wallet updated the bloom filter AFTER we requested merkle blocks,
// we need to re-request those blocks with the updated filter.
sendFilter(); // send updated filter
const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom;
logDebug() << " filter changed, re-downloading them (n =" << count << "blocks)";
Streaming::P2PBuilder builder(Streaming::pool(40 * count));
builder.writeCompactSize(count);
for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) {
builder.writeInt(InventoryItem::MerkleBlock);
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
}
m_con.send(builder.message(Api::P2P::GetData));
}
startMerkleDownload(m_merkleDownloadTo + 1, m_merkleDownloadBoundary);
}
}
else if (message.messageId() == Api::P2P::GetData) {
Streaming::P2PParser parser(message);
const size_t count = parser.readCompactInt();
logDebug() << "Received" << count << "Inv requests using GetData";
for (size_t i = 0; i < count; ++i) {
const uint32_t type = parser.readInt();
auto inv = InventoryItem(parser.readUint256(), type);
if (type != InventoryItem::TransactionType) { // ignore stupid question
m_connectionManager->punish(shared_from_this(), 70);
continue;
}
for (auto i = m_transactions.begin(); i != m_transactions.end();) {
auto tx = i->lock();
if (tx.get()) {
if (tx->hash() == inv.hash()) {
Streaming::P2PBuilder builder(Streaming::pool(tx->transaction().size()));
builder.writeByteArray(tx->transaction().data(), Streaming::RawBytes);
m_con.send(builder.message(Api::P2P::Data_Transaction));
tx->sentVia(shared_from_this());
break;
}
++i;
} else {
i = m_transactions.erase(i);
}
}
}
}
else if (message.messageId() == Api::P2P::AVAHello
|| message.messageId() == Api::P2P::AlertMessage
|| message.messageId() == Api::P2P::ProtoConf) {
// not a Bitcoin Cash peer
logCritical() << "Not a Bitcoin Cash peer, known message received";
m_connectionManager->punish(shared_from_this(), PUNISHMENT_MAX);
}
} catch (const Streaming::ParsingException &e) {
logCritical() << "Parsing failure" << e << "peer=" << m_con.connectionId();
if (!m_connectionManager->punish(shared_from_this(), 200))
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
}
}
void Peer::sendFilter(bool mempoolMessage)
{
if (m_peerStatus == ShuttingDown)
return;
auto segment = m_segment.lock();
assert(segment.get());
if (!segment->enabled())
return;
logDebug() << m_con.connectionId() << "/" << segment->segmentId() << "Sending filter! Now at height" << segment->lastBlockSynched();
auto buf = segment->writeFilter(Streaming::pool(0));
m_con.send(Message(buf, Api::LegacyP2P, Api::P2P::FilterLoad));
m_bloomUploaded = segment->filterChangedHeight();
if (mempoolMessage && !m_mempoolSent) {
m_con.send(Message(Api::LegacyP2P, Api::P2P::Mempool));
// any peer only ever should send the 'mempool' command once as the
// point is to get transactions broadcast before we connected to them.
m_mempoolSent = true;
}
}
void Peer::setRequestedHeader(bool requestedHeader)
{
m_requestedHeader = requestedHeader;
}
bool Peer::requestedHeader() const
{
return m_requestedHeader;
}
uint32_t Peer::connectTime() const
{
return m_connectTime;
}
int Peer::lastReceivedMerkle() const
{
return m_lastReceivedMerkle;
}
bool Peer::merkleDownloadInProgress() const
{
return m_merkleDownloadFrom != -1 // has started
&& 1 + m_merkleDownloadTo - m_merkleDownloadFrom > 0; // has not ended yet
}
void Peer::startMerkleDownload(int from, int maxHeight)
{
if (m_peerStatus == ShuttingDown)
return;
auto segment = m_segment.lock();
assert(segment);
if (m_bloomUploaded != segment->filterChangedHeight()) // filter changed since we uploaded it.
sendFilter();
m_merkleDownloadFrom = from;
m_merkleDownloadBoundary = maxHeight;
// we limit our MerkleBlock list to 10 per iteration.
m_merkleDownloadTo = std::min(m_merkleDownloadFrom + 9,
std::min(m_peerHeight, m_connectionManager->blockHeight()));
if (m_merkleDownloadBoundary >= 1 && m_merkleDownloadBoundary < m_merkleDownloadTo)
m_merkleDownloadTo = m_merkleDownloadBoundary;
const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom;
assert(count >= 0);
if (count <= 0) // done!
return;
assert(merkleDownloadInProgress());
Streaming::P2PBuilder builder(Streaming::pool(40 * count));
builder.writeCompactSize(count);
for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) {
// write INV data-type
builder.writeInt(InventoryItem::MerkleBlock);
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
}
m_con.send(builder.message(Api::P2P::GetData));
}
bool Peer::receivedHeaders() const
{
return m_receivedHeaders;
}
void Peer::setPrivacySegment(const std::shared_ptr<PrivacySegment> &ps)
{
assert(ps);
auto old = m_segment.lock();
if (ps != old) {
assert(old == nullptr);
m_segment = ps;
ps->addPeer(shared_from_this());
const bool atTip = ps->lastBlockSynched() == m_peerHeight;
if (m_bloomUploaded != ps->filterChangedHeight())
sendFilter(atTip);
}
}
void Peer::sendTx(const std::shared_ptr<BroadcastTxData> &txOwner)
{
// move to our thread to avoid concurrency issues with the deque
m_con.postOnStrand(std::bind(&Peer::registerTxToSend, this, txOwner));
// send INV
Streaming::P2PBuilder builder(Streaming::pool(40));
builder.writeCompactSize(1); // inv-count
builder.writeInt(InventoryItem::TransactionType);
builder.writeByteArray(txOwner->transaction().createHash(), Streaming::RawBytes);
m_con.send(builder.message(Api::P2P::Inventory));
// notify txOwner
txOwner->offeredVia(shared_from_this());
}
void Peer::processTransaction(const Tx &tx)
{
if (m_peerStatus == ShuttingDown)
return;
auto segment = m_segment.lock();
assert(segment);
if (!segment->enabled()) // their responsibility to re-fetch info later.
return;
if (m_merkleBlockHeight > 0) {
assert(segment);
const uint256 txHash = tx.createHash();
for (auto iter = m_transactionHashes.begin(); iter != m_transactionHashes.end(); ++iter) {
if (txHash == *iter) {
m_transactionHashes.erase(iter);
const int bh = m_merkleBlockHeight;
m_blockTransactions.push_back(tx);
if (m_transactionHashes.empty()) {
// done with this block
m_merkleBlockHeight = -1;
assert(segment);
segment->newTransactions(m_merkleHeader, bh, m_blockTransactions);
m_blockTransactions.clear();
}
return;
}
}
}
// must be a mempool transaction then.
segment->newTransaction(tx);
}
bool Peer::preferHeaders() const
{
return m_preferHeaders;
}
bool Peer::relaysTransactions() const
{
return m_relaysTransactions;
}
int Peer::startHeight() const
{
return m_startHeight;
}
std::string Peer::userAgent() const
{
return m_userAgent;
}
int Peer::protocolVersion() const
{
return m_protocolVersion;
}
int Peer::timeOffset() const
{
return m_timeOffset;
}
uint64_t Peer::services() const
{
return m_services;
}
void Peer::filterUpdated(bool mempoolMessage)
{
if (!merkleDownloadInProgress()) { // the merkle download will take care of filter-uploads.
auto peer = shared_from_this();
m_con.postOnStrand([peer, mempoolMessage]() {
peer->sendFilter(mempoolMessage);
});
}
}
void Peer::registerTxToSend(std::shared_ptr<BroadcastTxData> txOwner)
{
m_transactions.push_back(txOwner);
}
int Peer::highestMerkleReceived() const
{
return m_highestMerkleReceived;
}
int Peer::peerHeight() const
{
return m_peerHeight;
}
void Peer::updatePeerHeight(int peerHeight)
{
m_peerHeight = std::max(peerHeight, m_peerHeight);
}