Files
thehub/libs/p2p/Peer.cpp
T
tomFlowee b6fc84696a Wrap PrivacySegment in a shared pointer.
This is an abstract class that the application using this library needs
to subclass. Ownership and lifetime don't change, it still lies with the
app using the library and they still need to add and remove it from the
connectionManager, but this makes it much more stable for multi-
threading environments and avoids issues on misuse.
2024-11-29 15:12:13 +01:00

518 lines
20 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2020-2024 Tom Zander <tom@flowee.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Peer.h"
#include "ConnectionManager.h"
#include "PrivacySegment.h"
#include "InventoryItem.h"
#include "BroadcastTxData.h"
#include <streaming/P2PParser.h>
#include <streaming/P2PBuilder.h>
#include <streaming/BufferPools.h>
#include <BitcoinVersion.h>
#include <boost/asio/error.hpp>
Peer::Peer(ConnectionManager *parent, const PeerAddress &address)
: m_peerAddress(address),
m_peerStatus(Connecting),
m_connectionManager(parent)
{
assert(m_peerAddress.isValid());
m_peerAddress.setInUse(true);
m_timeOffset = time(nullptr);
}
Peer::~Peer()
{
assert(m_peerAddress.isValid());
m_peerAddress.setInUse(false);
}
void Peer::connect(NetworkConnection && server)
{
m_con = std::move(server);
m_con.setOnConnected(std::bind(&Peer::connected, shared_from_this(), std::placeholders::_1));
m_con.setOnDisconnected(std::bind(&Peer::disconnected, shared_from_this(), std::placeholders::_1));
m_con.setOnIncomingMessage(std::bind(&Peer::processMessage, shared_from_this(), std::placeholders::_1));
m_con.setMessageHeaderLegacy(true);
m_con.setMessageQueueSizes(50, 3);
m_con.connect();
}
void Peer::shutdown()
{
m_peerStatus = ShuttingDown;
m_con.shutdown(); // forgets callbacks (shared ptrs) to us.
auto segment = m_segment.lock();
if (segment)
segment->removePeer(shared_from_this());
}
void Peer::connected(const EndPoint &endPoint)
{
m_peerStatus = Connected;
m_connectTime = time(nullptr);
logDebug() << "connected. Peer:" << connectionId();
// send the version message.
auto pool = Streaming::pool(400);
Streaming::P2PBuilder builder(pool);
builder.writeInt(PROTOCOL_VERSION);
builder.writeLong(m_connectionManager->servicesBitfield());
builder.writeLong((uint64_t) time(nullptr));
// Version msg: target address
builder.writeLong((uint64_t) 2); // services again
char buf[16];
memset(buf, 0, 16);
buf[10] = buf[11] = 0xff; // mark address as an IPv4 one
builder.writeByteArray(buf, 16, Streaming::RawBytes);
builder.writeWord(endPoint.announcePort);
// Version msg: my address
builder.writeLong(3); // services again
builder.writeByteArray(buf, 16, Streaming::RawBytes);
builder.writeWord(7); // port
// Version msg: my status
builder.writeLong(m_connectionManager->appNonce());
builder.writeString(m_connectionManager->userAgent(), Streaming::WithLength);
builder.writeInt(m_connectionManager->blockHeight());
builder.writeBool(/* relay-txs */ false);
// version is always the first thing they expect on connect
Message message = builder.message(Api::P2P::Version);
logDebug().nospace() << "peer: " << connectionId() << ", sending message (" << message.body().size() << " bytes)";
m_con.send(message);
}
void Peer::disconnected(const EndPoint &)
{
logDebug() << "Disconnected. Peer:" << connectionId();
if (m_peerStatus == ShuttingDown)
return;
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
}
void Peer::processMessage(const Message &message)
{
if (m_peerStatus == ShuttingDown)
return;
try {
logDebug() << "Peer:" << connectionId() << "type:"
<< message.header().constData() << "with body of" << message.body().size() << "bytes";
if (message.messageId() == Api::P2P::Version) {
Streaming::P2PParser parser(message);
m_protocolVersion = parser.readInt();
m_services = parser.readLong();
const auto now = time(nullptr);
// offset of node, adjusted with the round-trip time
m_timeOffset = (now - parser.readLong()) - (now - m_connectTime);
// address
parser.skip(8 + 16 + 2); // IP (and services and port) of them
parser.skip(8 + 16 + 2); // IP of me.
parser.skip(8); // nonce
m_userAgent = parser.readString();
m_startHeight = m_peerHeight = parser.readInt();
m_relaysTransactions = parser.readBool();
logCritical() << "Peer:" << connectionId() << "is connected to" << m_userAgent
<< "Address:" << m_peerAddress;
m_con.send(Message(Api::LegacyP2P, Api::P2P::VersionAck));
m_con.send(Message(Api::LegacyP2P, Api::P2P::PreferHeaders));
m_peerAddress.successfullyConnected(m_services); // update the peersDB with things like services
m_connectionManager->connectionEstablished(shared_from_this());
}
else if (message.messageId() == Api::P2P::Ping) {
m_con.send(Message(message.body(), Api::LegacyP2P, Api::P2P::Pong));
}
else if (message.messageId() == Api::P2P::PreferHeaders) {
m_preferHeaders = true;
}
else if (message.messageId() == Api::P2P::Headers) {
if (message.body().size() > 1)
m_receivedHeaders = true;
m_connectionManager->addBlockHeaders(message, connectionId());
}
else if (message.messageId() == Api::P2P::RejectData) {
Streaming::P2PParser parser(message);
const std::string messageType = parser.readString();
const int errorCode = parser.readByte() ;
const std::string errorMessage = parser.readString();
logWarning() << "Reject received for" << messageType
<< errorCode << errorMessage;
// check m_transactions to call callback.
if (messageType == "tx") {
// we can only forward it if the additional info is a txid
uint256 txid;
try {
txid = parser.readUint256();
} catch (...) {
logDebug() << "No txid present in the reject message";
return;
}
auto i = m_transactions.begin();
while (i != m_transactions.end()) {
auto txOwner = i->lock();
if (txOwner) {
if (txOwner->hash() == txid) {
txOwner->txRejected(connectionId(),
static_cast<BroadcastTxData::RejectReason>(errorCode),
errorMessage);
m_transactions.erase(i); // it can only be rejected once...
return;
}
++i;
} else {
i = m_transactions.erase(i);
}
}
}
}
else if (message.messageId() == Api::P2P::Addresses) {
m_connectionManager->addAddresses(message, connectionId());
}
else if (message.messageId() == Api::P2P::Inventory) {
m_connectionManager->addInvMessage(message, connectionId());
}
else if (message.messageId() == Api::P2P::Data_Transaction) {
auto segment = m_segment.lock();
Tx tx(message.body());
if (segment)
processTransaction(tx);
else
m_connectionManager->addTransaction(tx, connectionId());
}
else if (message.messageId() == Api::P2P::Data_DSProof) {
logCritical() << "Received a DoubleSpendProof from peer:" << connectionId() << "Not implemented yet.";
}
else if (message.messageId() == Api::P2P::Data_MerkleBlock) {
auto segment = m_segment.lock();
if (!segment) { // Received merkleblock without asking for one
logWarning() << "Peer sent merkleblock without us asking for one. PeerId:" << connectionId();
m_connectionManager->punish(shared_from_this(), PUNISHMENT_MAX);
return;
}
if (!segment->enabled()) // they want no more.
return;
Streaming::P2PParser parser(message);
auto header = BlockHeader::fromMessage(parser);
int blockHeight = m_connectionManager->blockHeightFor(header.createHash());
if (blockHeight == -1) { // not on our chain (anymore)
logWarning() << "Peer sent merkleblock not on our chain (anymore). PeerId:" << connectionId();
m_connectionManager->punish(shared_from_this(), PUNISHMENT_MAX);
return;
}
CPartialMerkleTree tree = CPartialMerkleTree::construct(parser);
if (tree.ExtractMatches(m_transactionHashes) != header.hashMerkleRoot) {
m_transactionHashes.clear();
m_merkleBlockHeight = -1;
throw Streaming::ParsingException("Bad merkle tree received");
}
if (!m_blockTransactions.empty())
throw Streaming::ParsingException("Did not receive all promised Txs for MerkleBlock");
m_merkleHeader = header;
m_merkleBlockHeight = blockHeight;
m_lastReceivedMerkle = blockHeight;
/*
* When we go back with 'another round of merkle downloads' we end up
* calling blockSynched again with a older number than we did before, which confuses
* the securitySegment in a bad way.
* Apart from us rolling back 10 blocks, the peer may also send us data we
* didn't ask for and try to confuse our internal state.
*
* So, lets make sure we only update our progress in one direction for this peer.
*/
if (blockHeight > m_highestMerkleReceived) {
m_highestMerkleReceived = blockHeight;
assert(segment);
segment->blockSynched(blockHeight);
}
logDebug() << "Merkle received by" << connectionId() << "height:" << blockHeight;
if (m_lastReceivedMerkle == m_merkleDownloadTo) {
assert(segment);
const auto filterChangedHeight = segment->filterChangedHeight();
logDebug() << "Doing another round of merkle downloads";
logDebug().nospace() << " +- height: " << m_merkleDownloadFrom << "-" << m_merkleDownloadTo
<< ", filterChanged: " << filterChangedHeight
<< ", filterUpload: " << m_bloomUploaded;
if (filterChangedHeight != m_bloomUploaded) {
// the wallet updated the bloom filter AFTER we requested merkle blocks,
// we need to re-request those blocks with the updated filter.
sendFilter(); // send updated filter
const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom;
logDebug() << " filter changed, re-downloading them (n =" << count << "blocks)";
Streaming::P2PBuilder builder(Streaming::pool(40 * count));
builder.writeCompactSize(count);
for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) {
builder.writeInt(InventoryItem::MerkleBlock);
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
}
m_con.send(builder.message(Api::P2P::GetData));
}
startMerkleDownload(m_merkleDownloadTo + 1, m_merkleDownloadBoundary);
}
}
else if (message.messageId() == Api::P2P::GetData) {
Streaming::P2PParser parser(message);
const size_t count = parser.readCompactInt();
logDebug() << "Received" << count << "Inv requests using GetData";
for (size_t i = 0; i < count; ++i) {
const uint32_t type = parser.readInt();
auto inv = InventoryItem(parser.readUint256(), type);
if (type != InventoryItem::TransactionType) { // ignore stupid question
m_connectionManager->punish(shared_from_this(), 70);
continue;
}
for (auto i = m_transactions.begin(); i != m_transactions.end();) {
auto tx = i->lock();
if (tx.get()) {
if (tx->hash() == inv.hash()) {
Streaming::P2PBuilder builder(Streaming::pool(tx->transaction().size()));
builder.writeByteArray(tx->transaction().data(), Streaming::RawBytes);
m_con.send(builder.message(Api::P2P::Data_Transaction));
tx->sentVia(shared_from_this());
break;
}
++i;
} else {
i = m_transactions.erase(i);
}
}
}
}
else if (message.messageId() == Api::P2P::AVAHello
|| message.messageId() == Api::P2P::AlertMessage
|| message.messageId() == Api::P2P::ProtoConf) {
// not a Bitcoin Cash peer
logCritical() << "Not a Bitcoin Cash peer, known message received";
m_connectionManager->punish(shared_from_this(), PUNISHMENT_MAX);
}
} catch (const Streaming::ParsingException &e) {
logCritical() << "Parsing failure" << e << "peer=" << m_con.connectionId();
if (!m_connectionManager->punish(shared_from_this(), 200))
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
}
}
void Peer::sendFilter(bool mempoolMessage)
{
if (m_peerStatus == ShuttingDown)
return;
auto segment = m_segment.lock();
assert(segment.get());
if (!segment->enabled())
return;
logDebug() << m_con.connectionId() << "/" << segment->segmentId() << "Sending filter! Now at height" << segment->lastBlockSynched();
auto buf = segment->writeFilter(Streaming::pool(0));
m_con.send(Message(buf, Api::LegacyP2P, Api::P2P::FilterLoad));
m_bloomUploaded = segment->filterChangedHeight();
if (mempoolMessage && !m_mempoolSent) {
m_con.send(Message(Api::LegacyP2P, Api::P2P::Mempool));
// any peer only ever should send the 'mempool' command once as the
// point is to get transactions broadcast before we connected to them.
m_mempoolSent = true;
}
}
void Peer::setRequestedHeader(bool requestedHeader)
{
m_requestedHeader = requestedHeader;
}
bool Peer::requestedHeader() const
{
return m_requestedHeader;
}
uint32_t Peer::connectTime() const
{
return m_connectTime;
}
int Peer::lastReceivedMerkle() const
{
return m_lastReceivedMerkle;
}
bool Peer::merkleDownloadInProgress() const
{
return m_merkleDownloadFrom != -1 // has started
&& 1 + m_merkleDownloadTo - m_merkleDownloadFrom > 0; // has not ended yet
}
void Peer::startMerkleDownload(int from, int maxHeight)
{
if (m_peerStatus == ShuttingDown)
return;
auto segment = m_segment.lock();
assert(segment);
if (m_bloomUploaded != segment->filterChangedHeight()) // filter changed since we uploaded it.
sendFilter();
m_merkleDownloadFrom = from;
m_merkleDownloadBoundary = maxHeight;
// we limit our MerkleBlock list to 10 per iteration.
m_merkleDownloadTo = std::min(m_merkleDownloadFrom + 9,
std::min(m_peerHeight, m_connectionManager->blockHeight()));
if (m_merkleDownloadBoundary >= 1 && m_merkleDownloadBoundary < m_merkleDownloadTo)
m_merkleDownloadTo = m_merkleDownloadBoundary;
const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom;
assert(count >= 0);
if (count <= 0) // done!
return;
assert(merkleDownloadInProgress());
Streaming::P2PBuilder builder(Streaming::pool(40 * count));
builder.writeCompactSize(count);
for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) {
// write INV data-type
builder.writeInt(InventoryItem::MerkleBlock);
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
}
m_con.send(builder.message(Api::P2P::GetData));
}
bool Peer::receivedHeaders() const
{
return m_receivedHeaders;
}
void Peer::setPrivacySegment(const std::shared_ptr<PrivacySegment> &ps)
{
assert(ps);
auto old = m_segment.lock();
if (ps != old) {
assert(old == nullptr);
m_segment = ps;
ps->addPeer(shared_from_this());
const bool atTip = ps->lastBlockSynched() == m_peerHeight;
if (m_bloomUploaded != ps->filterChangedHeight())
sendFilter(atTip);
}
}
void Peer::sendTx(const std::shared_ptr<BroadcastTxData> &txOwner)
{
// move to our thread to avoid concurrency issues with the deque
m_con.postOnStrand(std::bind(&Peer::registerTxToSend, this, txOwner));
// send INV
Streaming::P2PBuilder builder(Streaming::pool(40));
builder.writeCompactSize(1); // inv-count
builder.writeInt(InventoryItem::TransactionType);
builder.writeByteArray(txOwner->transaction().createHash(), Streaming::RawBytes);
m_con.send(builder.message(Api::P2P::Inventory));
}
void Peer::processTransaction(const Tx &tx)
{
if (m_peerStatus == ShuttingDown)
return;
auto segment = m_segment.lock();
assert(segment);
if (!segment->enabled()) // their responsibility to re-fetch info later.
return;
if (m_merkleBlockHeight > 0) {
assert(segment);
const uint256 txHash = tx.createHash();
for (auto iter = m_transactionHashes.begin(); iter != m_transactionHashes.end(); ++iter) {
if (txHash == *iter) {
m_transactionHashes.erase(iter);
const int bh = m_merkleBlockHeight;
m_blockTransactions.push_back(tx);
if (m_transactionHashes.empty()) {
// done with this block
m_merkleBlockHeight = -1;
assert(segment);
segment->newTransactions(m_merkleHeader, bh, m_blockTransactions);
m_blockTransactions.clear();
}
return;
}
}
}
// must be a mempool transaction then.
segment->newTransaction(tx);
}
bool Peer::preferHeaders() const
{
return m_preferHeaders;
}
bool Peer::relaysTransactions() const
{
return m_relaysTransactions;
}
int Peer::startHeight() const
{
return m_startHeight;
}
std::string Peer::userAgent() const
{
return m_userAgent;
}
int Peer::protocolVersion() const
{
return m_protocolVersion;
}
int Peer::timeOffset() const
{
return m_timeOffset;
}
uint64_t Peer::services() const
{
return m_services;
}
void Peer::filterUpdated(bool mempoolMessage)
{
if (!merkleDownloadInProgress()) { // the merkle download will take care of filter-uploads.
auto peer = shared_from_this();
m_con.postOnStrand([peer, mempoolMessage]() {
peer->sendFilter(mempoolMessage);
});
}
}
void Peer::registerTxToSend(std::shared_ptr<BroadcastTxData> txOwner)
{
m_transactions.push_back(txOwner);
}
int Peer::peerHeight() const
{
return m_peerHeight;
}
void Peer::updatePeerHeight(int peerHeight)
{
m_peerHeight = std::max(peerHeight, m_peerHeight);
}