56561b4d98
This now also registers when a peer sends the INV.
526 lines
20 KiB
C++
526 lines
20 KiB
C++
/*
|
|
* This file is part of the Flowee project
|
|
* Copyright (C) 2020-2025 Tom Zander <tom@flowee.org>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "Peer.h"
|
|
#include "ConnectionManager.h"
|
|
#include "PrivacySegment.h"
|
|
#include "InventoryItem.h"
|
|
#include "BroadcastTxData.h"
|
|
|
|
#include <streaming/P2PParser.h>
|
|
#include <streaming/P2PBuilder.h>
|
|
#include <streaming/BufferPools.h>
|
|
#include <BitcoinVersion.h>
|
|
|
|
#include <boost/asio/error.hpp>
|
|
|
|
Peer::Peer(ConnectionManager *parent, const PeerAddress &address)
|
|
: m_peerAddress(address),
|
|
m_peerStatus(Connecting),
|
|
m_connectionManager(parent)
|
|
{
|
|
assert(m_peerAddress.isValid());
|
|
m_peerAddress.setInUse(true);
|
|
m_timeOffset = time(nullptr);
|
|
}
|
|
|
|
Peer::~Peer()
|
|
{
|
|
assert(m_peerAddress.isValid());
|
|
m_peerAddress.setInUse(false);
|
|
}
|
|
|
|
void Peer::connect(NetworkConnection && server)
|
|
{
|
|
m_con = std::move(server);
|
|
m_con.setOnConnected(std::bind(&Peer::connected, shared_from_this(), std::placeholders::_1));
|
|
m_con.setOnDisconnected(std::bind(&Peer::disconnected, shared_from_this(), std::placeholders::_1));
|
|
m_con.setOnIncomingMessage(std::bind(&Peer::processMessage, shared_from_this(), std::placeholders::_1));
|
|
m_con.setMessageHeaderLegacy(true);
|
|
m_con.setMessageQueueSizes(50, 3);
|
|
m_con.connect();
|
|
}
|
|
|
|
void Peer::shutdown()
|
|
{
|
|
m_peerStatus = ShuttingDown;
|
|
m_con.shutdown(); // forgets callbacks (shared ptrs) to us.
|
|
auto segment = m_segment.lock();
|
|
if (segment)
|
|
segment->removePeer(shared_from_this());
|
|
}
|
|
|
|
void Peer::connected(const EndPoint &endPoint)
|
|
{
|
|
m_peerStatus = Connected;
|
|
m_connectTime = time(nullptr);
|
|
logDebug() << "connected. Peer:" << connectionId();
|
|
|
|
// send the version message.
|
|
auto pool = Streaming::pool(400);
|
|
Streaming::P2PBuilder builder(pool);
|
|
builder.writeInt(PROTOCOL_VERSION);
|
|
builder.writeLong(m_connectionManager->servicesBitfield());
|
|
builder.writeLong((uint64_t) time(nullptr));
|
|
// Version msg: target address
|
|
builder.writeLong((uint64_t) 2); // services again
|
|
char buf[16];
|
|
memset(buf, 0, 16);
|
|
buf[10] = buf[11] = 0xff; // mark address as an IPv4 one
|
|
builder.writeByteArray(buf, 16, Streaming::RawBytes);
|
|
builder.writeWord(endPoint.announcePort);
|
|
// Version msg: my address
|
|
builder.writeLong(3); // services again
|
|
builder.writeByteArray(buf, 16, Streaming::RawBytes);
|
|
builder.writeWord(7); // port
|
|
// Version msg: my status
|
|
builder.writeLong(m_connectionManager->appNonce());
|
|
builder.writeString(m_connectionManager->userAgent(), Streaming::WithLength);
|
|
builder.writeInt(m_connectionManager->blockHeight());
|
|
builder.writeBool(/* relay-txs */ false);
|
|
|
|
// version is always the first thing they expect on connect
|
|
Message message = builder.message(Api::P2P::Version);
|
|
logDebug().nospace() << "peer: " << connectionId() << ", sending message (" << message.body().size() << " bytes)";
|
|
m_con.send(message);
|
|
}
|
|
|
|
void Peer::disconnected(const EndPoint &)
|
|
{
|
|
logDebug() << "Disconnected. Peer:" << connectionId();
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
|
|
}
|
|
|
|
void Peer::processMessage(const Message &message)
|
|
{
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
try {
|
|
logDebug() << "Peer:" << connectionId() << "type:"
|
|
<< message.header().constData() << "with body of" << message.body().size() << "bytes";
|
|
if (message.messageId() == Api::P2P::Version) {
|
|
Streaming::P2PParser parser(message);
|
|
m_protocolVersion = parser.readInt();
|
|
m_services = parser.readLong();
|
|
const auto now = time(nullptr);
|
|
// offset of node, adjusted with the round-trip time
|
|
m_timeOffset = (now - parser.readLong()) - (now - m_connectTime);
|
|
|
|
// address
|
|
parser.skip(8 + 16 + 2); // IP (and services and port) of them
|
|
parser.skip(8 + 16 + 2); // IP of me.
|
|
parser.skip(8); // nonce
|
|
m_userAgent = parser.readString();
|
|
m_startHeight = m_peerHeight = parser.readInt();
|
|
m_relaysTransactions = parser.readBool();
|
|
|
|
logCritical() << "Peer:" << connectionId() << "is connected to" << m_userAgent
|
|
<< "Address:" << m_peerAddress;
|
|
m_con.send(Message(Api::LegacyP2P, Api::P2P::VersionAck));
|
|
m_con.send(Message(Api::LegacyP2P, Api::P2P::PreferHeaders));
|
|
m_peerAddress.successfullyConnected(m_services); // update the peersDB with things like services
|
|
m_connectionManager->connectionEstablished(shared_from_this());
|
|
}
|
|
else if (message.messageId() == Api::P2P::Ping) {
|
|
m_con.send(Message(message.body(), Api::LegacyP2P, Api::P2P::Pong));
|
|
}
|
|
else if (message.messageId() == Api::P2P::PreferHeaders) {
|
|
m_preferHeaders = true;
|
|
}
|
|
else if (message.messageId() == Api::P2P::Headers) {
|
|
if (message.body().size() > 1)
|
|
m_receivedHeaders = true;
|
|
m_connectionManager->addBlockHeaders(message, connectionId());
|
|
}
|
|
else if (message.messageId() == Api::P2P::RejectData) {
|
|
Streaming::P2PParser parser(message);
|
|
const std::string messageType = parser.readString();
|
|
const int errorCode = parser.readByte() ;
|
|
const std::string errorMessage = parser.readString();
|
|
logWarning() << "Reject received for" << messageType
|
|
<< errorCode << errorMessage;
|
|
|
|
// check m_transactions to call callback.
|
|
if (messageType == "tx") {
|
|
// we can only forward it if the additional info is a txid
|
|
uint256 txid;
|
|
try {
|
|
txid = parser.readUint256();
|
|
} catch (...) {
|
|
logDebug() << "No txid present in the reject message";
|
|
return;
|
|
}
|
|
auto i = m_transactions.begin();
|
|
while (i != m_transactions.end()) {
|
|
auto txOwner = i->lock();
|
|
if (txOwner) {
|
|
if (txOwner->hash() == txid) {
|
|
txOwner->txRejected(connectionId(),
|
|
static_cast<BroadcastTxData::RejectReason>(errorCode),
|
|
errorMessage);
|
|
m_transactions.erase(i); // it can only be rejected once...
|
|
return;
|
|
}
|
|
++i;
|
|
} else {
|
|
i = m_transactions.erase(i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else if (message.messageId() == Api::P2P::Addresses) {
|
|
m_connectionManager->addAddresses(message, connectionId());
|
|
}
|
|
else if (message.messageId() == Api::P2P::Inventory) {
|
|
m_connectionManager->addInvMessage(message, connectionId());
|
|
}
|
|
else if (message.messageId() == Api::P2P::Data_Transaction) {
|
|
auto segment = m_segment.lock();
|
|
Tx tx(message.body());
|
|
if (segment)
|
|
processTransaction(tx);
|
|
else
|
|
m_connectionManager->addTransaction(tx, connectionId());
|
|
}
|
|
else if (message.messageId() == Api::P2P::Data_DSProof) {
|
|
logCritical() << "Received a DoubleSpendProof from peer:" << connectionId() << "Not implemented yet.";
|
|
}
|
|
else if (message.messageId() == Api::P2P::Data_MerkleBlock) {
|
|
auto segment = m_segment.lock();
|
|
if (!segment) { // Received merkleblock without asking for one
|
|
logWarning() << "Peer sent merkleblock without us asking for one. PeerId:" << connectionId();
|
|
m_connectionManager->punish(shared_from_this(), PUNISHMENT_MAX);
|
|
return;
|
|
}
|
|
if (!segment->enabled()) // they want no more.
|
|
return;
|
|
Streaming::P2PParser parser(message);
|
|
auto header = BlockHeader::fromMessage(parser);
|
|
int blockHeight = m_connectionManager->blockHeightFor(header.createHash());
|
|
if (blockHeight == -1) { // not on our chain (anymore)
|
|
logWarning() << "Peer sent merkleblock not on our chain (anymore). PeerId:" << connectionId();
|
|
m_connectionManager->punish(shared_from_this(), PUNISHMENT_MAX);
|
|
return;
|
|
}
|
|
CPartialMerkleTree tree = CPartialMerkleTree::construct(parser);
|
|
if (tree.ExtractMatches(m_transactionHashes) != header.hashMerkleRoot) {
|
|
m_transactionHashes.clear();
|
|
m_merkleBlockHeight = -1;
|
|
throw Streaming::ParsingException("Bad merkle tree received");
|
|
}
|
|
if (!m_blockTransactions.empty())
|
|
throw Streaming::ParsingException("Did not receive all promised Txs for MerkleBlock");
|
|
m_merkleHeader = header;
|
|
m_merkleBlockHeight = blockHeight;
|
|
m_lastReceivedMerkle = blockHeight;
|
|
|
|
/*
|
|
* When we go back with 'another round of merkle downloads' we end up
|
|
* calling blockSynched again with a older number than we did before, which confuses
|
|
* the securitySegment in a bad way.
|
|
* Apart from us rolling back 10 blocks, the peer may also send us data we
|
|
* didn't ask for and try to confuse our internal state.
|
|
*
|
|
* So, lets make sure we only update our progress in one direction for this peer.
|
|
*/
|
|
if (blockHeight > m_highestMerkleReceived) {
|
|
m_highestMerkleReceived = blockHeight;
|
|
assert(segment);
|
|
segment->blockSynched(blockHeight);
|
|
}
|
|
logDebug() << "Merkle received by" << connectionId() << "height:" << blockHeight;
|
|
|
|
if (m_lastReceivedMerkle == m_merkleDownloadTo) {
|
|
assert(segment);
|
|
const auto filterChangedHeight = segment->filterChangedHeight();
|
|
logDebug() << "Doing another round of merkle downloads";
|
|
logDebug().nospace() << " +- height: " << m_merkleDownloadFrom << "-" << m_merkleDownloadTo
|
|
<< ", filterChanged: " << filterChangedHeight
|
|
<< ", filterUpload: " << m_bloomUploaded;
|
|
if (filterChangedHeight != m_bloomUploaded) {
|
|
// the wallet updated the bloom filter AFTER we requested merkle blocks,
|
|
// we need to re-request those blocks with the updated filter.
|
|
sendFilter(); // send updated filter
|
|
const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom;
|
|
logDebug() << " filter changed, re-downloading them (n =" << count << "blocks)";
|
|
Streaming::P2PBuilder builder(Streaming::pool(40 * count));
|
|
builder.writeCompactSize(count);
|
|
for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) {
|
|
builder.writeInt(InventoryItem::MerkleBlock);
|
|
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
|
|
}
|
|
m_con.send(builder.message(Api::P2P::GetData));
|
|
}
|
|
|
|
startMerkleDownload(m_merkleDownloadTo + 1, m_merkleDownloadBoundary);
|
|
}
|
|
}
|
|
else if (message.messageId() == Api::P2P::GetData) {
|
|
Streaming::P2PParser parser(message);
|
|
const size_t count = parser.readCompactInt();
|
|
logDebug() << "Received" << count << "Inv requests using GetData";
|
|
for (size_t i = 0; i < count; ++i) {
|
|
const uint32_t type = parser.readInt();
|
|
auto inv = InventoryItem(parser.readUint256(), type);
|
|
if (type != InventoryItem::TransactionType) { // ignore stupid question
|
|
m_connectionManager->punish(shared_from_this(), 70);
|
|
continue;
|
|
}
|
|
for (auto i = m_transactions.begin(); i != m_transactions.end();) {
|
|
auto tx = i->lock();
|
|
if (tx.get()) {
|
|
if (tx->hash() == inv.hash()) {
|
|
Streaming::P2PBuilder builder(Streaming::pool(tx->transaction().size()));
|
|
builder.writeByteArray(tx->transaction().data(), Streaming::RawBytes);
|
|
m_con.send(builder.message(Api::P2P::Data_Transaction));
|
|
tx->sentVia(shared_from_this());
|
|
break;
|
|
}
|
|
++i;
|
|
} else {
|
|
i = m_transactions.erase(i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else if (message.messageId() == Api::P2P::AVAHello
|
|
|| message.messageId() == Api::P2P::AlertMessage
|
|
|| message.messageId() == Api::P2P::ProtoConf) {
|
|
// not a Bitcoin Cash peer
|
|
logCritical() << "Not a Bitcoin Cash peer, known message received";
|
|
m_connectionManager->punish(shared_from_this(), PUNISHMENT_MAX);
|
|
}
|
|
} catch (const Streaming::ParsingException &e) {
|
|
logCritical() << "Parsing failure" << e << "peer=" << m_con.connectionId();
|
|
if (!m_connectionManager->punish(shared_from_this(), 200))
|
|
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
|
|
}
|
|
}
|
|
|
|
void Peer::sendFilter(bool mempoolMessage)
|
|
{
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
auto segment = m_segment.lock();
|
|
assert(segment.get());
|
|
if (!segment->enabled())
|
|
return;
|
|
logDebug() << m_con.connectionId() << "/" << segment->segmentId() << "Sending filter! Now at height" << segment->lastBlockSynched();
|
|
auto buf = segment->writeFilter(Streaming::pool(0));
|
|
m_con.send(Message(buf, Api::LegacyP2P, Api::P2P::FilterLoad));
|
|
m_bloomUploaded = segment->filterChangedHeight();
|
|
if (mempoolMessage && !m_mempoolSent) {
|
|
m_con.send(Message(Api::LegacyP2P, Api::P2P::Mempool));
|
|
// any peer only ever should send the 'mempool' command once as the
|
|
// point is to get transactions broadcast before we connected to them.
|
|
m_mempoolSent = true;
|
|
}
|
|
}
|
|
|
|
void Peer::setRequestedHeader(bool requestedHeader)
|
|
{
|
|
m_requestedHeader = requestedHeader;
|
|
}
|
|
|
|
bool Peer::requestedHeader() const
|
|
{
|
|
return m_requestedHeader;
|
|
}
|
|
|
|
uint32_t Peer::connectTime() const
|
|
{
|
|
return m_connectTime;
|
|
}
|
|
|
|
int Peer::lastReceivedMerkle() const
|
|
{
|
|
return m_lastReceivedMerkle;
|
|
}
|
|
|
|
bool Peer::merkleDownloadInProgress() const
|
|
{
|
|
return m_merkleDownloadFrom != -1 // has started
|
|
&& 1 + m_merkleDownloadTo - m_merkleDownloadFrom > 0; // has not ended yet
|
|
}
|
|
|
|
void Peer::startMerkleDownload(int from, int maxHeight)
|
|
{
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
auto segment = m_segment.lock();
|
|
assert(segment);
|
|
if (m_bloomUploaded != segment->filterChangedHeight()) // filter changed since we uploaded it.
|
|
sendFilter();
|
|
|
|
m_merkleDownloadFrom = from;
|
|
m_merkleDownloadBoundary = maxHeight;
|
|
// we limit our MerkleBlock list to 10 per iteration.
|
|
m_merkleDownloadTo = std::min(m_merkleDownloadFrom + 9,
|
|
std::min(m_peerHeight, m_connectionManager->blockHeight()));
|
|
if (m_merkleDownloadBoundary >= 1 && m_merkleDownloadBoundary < m_merkleDownloadTo)
|
|
m_merkleDownloadTo = m_merkleDownloadBoundary;
|
|
|
|
const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom;
|
|
assert(count >= 0);
|
|
if (count <= 0) // done!
|
|
return;
|
|
|
|
assert(merkleDownloadInProgress());
|
|
Streaming::P2PBuilder builder(Streaming::pool(40 * count));
|
|
builder.writeCompactSize(count);
|
|
for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) {
|
|
// write INV data-type
|
|
builder.writeInt(InventoryItem::MerkleBlock);
|
|
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
|
|
}
|
|
m_con.send(builder.message(Api::P2P::GetData));
|
|
}
|
|
|
|
bool Peer::receivedHeaders() const
|
|
{
|
|
return m_receivedHeaders;
|
|
}
|
|
|
|
void Peer::setPrivacySegment(const std::shared_ptr<PrivacySegment> &ps)
|
|
{
|
|
assert(ps);
|
|
auto old = m_segment.lock();
|
|
if (ps != old) {
|
|
assert(old == nullptr);
|
|
m_segment = ps;
|
|
ps->addPeer(shared_from_this());
|
|
const bool atTip = ps->lastBlockSynched() == m_peerHeight;
|
|
if (m_bloomUploaded != ps->filterChangedHeight())
|
|
sendFilter(atTip);
|
|
}
|
|
}
|
|
|
|
void Peer::sendTx(const std::shared_ptr<BroadcastTxData> &txOwner)
|
|
{
|
|
// move to our thread to avoid concurrency issues with the deque
|
|
m_con.postOnStrand(std::bind(&Peer::registerTxToSend, this, txOwner));
|
|
|
|
// send INV
|
|
Streaming::P2PBuilder builder(Streaming::pool(40));
|
|
builder.writeCompactSize(1); // inv-count
|
|
builder.writeInt(InventoryItem::TransactionType);
|
|
builder.writeByteArray(txOwner->transaction().createHash(), Streaming::RawBytes);
|
|
m_con.send(builder.message(Api::P2P::Inventory));
|
|
|
|
// notify txOwner
|
|
txOwner->offeredVia(shared_from_this());
|
|
}
|
|
|
|
void Peer::processTransaction(const Tx &tx)
|
|
{
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
auto segment = m_segment.lock();
|
|
assert(segment);
|
|
if (!segment->enabled()) // their responsibility to re-fetch info later.
|
|
return;
|
|
|
|
if (m_merkleBlockHeight > 0) {
|
|
assert(segment);
|
|
const uint256 txHash = tx.createHash();
|
|
for (auto iter = m_transactionHashes.begin(); iter != m_transactionHashes.end(); ++iter) {
|
|
if (txHash == *iter) {
|
|
m_transactionHashes.erase(iter);
|
|
const int bh = m_merkleBlockHeight;
|
|
m_blockTransactions.push_back(tx);
|
|
if (m_transactionHashes.empty()) {
|
|
// done with this block
|
|
m_merkleBlockHeight = -1;
|
|
assert(segment);
|
|
segment->newTransactions(m_merkleHeader, bh, m_blockTransactions);
|
|
m_blockTransactions.clear();
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
// must be a mempool transaction then.
|
|
segment->newTransaction(tx);
|
|
}
|
|
|
|
bool Peer::preferHeaders() const
|
|
{
|
|
return m_preferHeaders;
|
|
}
|
|
|
|
bool Peer::relaysTransactions() const
|
|
{
|
|
return m_relaysTransactions;
|
|
}
|
|
|
|
int Peer::startHeight() const
|
|
{
|
|
return m_startHeight;
|
|
}
|
|
|
|
std::string Peer::userAgent() const
|
|
{
|
|
return m_userAgent;
|
|
}
|
|
|
|
int Peer::protocolVersion() const
|
|
{
|
|
return m_protocolVersion;
|
|
}
|
|
|
|
int Peer::timeOffset() const
|
|
{
|
|
return m_timeOffset;
|
|
}
|
|
|
|
uint64_t Peer::services() const
|
|
{
|
|
return m_services;
|
|
}
|
|
|
|
void Peer::filterUpdated(bool mempoolMessage)
|
|
{
|
|
if (!merkleDownloadInProgress()) { // the merkle download will take care of filter-uploads.
|
|
auto peer = shared_from_this();
|
|
m_con.postOnStrand([peer, mempoolMessage]() {
|
|
peer->sendFilter(mempoolMessage);
|
|
});
|
|
}
|
|
}
|
|
|
|
void Peer::registerTxToSend(std::shared_ptr<BroadcastTxData> txOwner)
|
|
{
|
|
m_transactions.push_back(txOwner);
|
|
}
|
|
|
|
int Peer::highestMerkleReceived() const
|
|
{
|
|
return m_highestMerkleReceived;
|
|
}
|
|
|
|
int Peer::peerHeight() const
|
|
{
|
|
return m_peerHeight;
|
|
}
|
|
|
|
void Peer::updatePeerHeight(int peerHeight)
|
|
{
|
|
m_peerHeight = std::max(peerHeight, m_peerHeight);
|
|
}
|