301a4ca3ee
This is done in several steps: 1. Separate my height from the remote peer heights. Instead of assuming we have one height, recognize that a peer may not be at the tip at the same time we are. We monitor headers/invs to update the 'peerHeight' variable. 2. Ask for merkle blocks from a peer to the maximum height of that peer (but not later than what we validated to be true). This avoids us asking past the remotes tip which they didn't like. 3. Redo the SyncSPVAction to use all this and make it much more reliable in finding peers to download from and getting all the changes as fast as possible.
450 lines
16 KiB
C++
450 lines
16 KiB
C++
/*
|
|
* This file is part of the Flowee project
|
|
* Copyright (C) 2020 Tom Zander <tomz@freedommail.ch>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "Peer.h"
|
|
#include "ConnectionManager.h"
|
|
#include "PrivacySegment.h"
|
|
#include "InventoryItem.h"
|
|
#include "BroadcastTxData.h"
|
|
|
|
#include <version.h>
|
|
#include <streaming/P2PParser.h>
|
|
#include <streaming/P2PBuilder.h>
|
|
|
|
#include <boost/asio/error.hpp>
|
|
|
|
Peer::Peer(ConnectionManager *parent, const PeerAddress &address)
|
|
: m_peerAddress(address),
|
|
m_peerStatus(Connecting),
|
|
m_connectionManager(parent)
|
|
{
|
|
assert(m_peerAddress.isValid());
|
|
m_peerAddress.setInUse(true);
|
|
m_timeOffset = time(nullptr);
|
|
}
|
|
|
|
Peer::~Peer()
|
|
{
|
|
assert(m_peerAddress.isValid());
|
|
m_peerAddress.setInUse(false);
|
|
if (m_segment)
|
|
m_segment->removeListener(this);
|
|
}
|
|
|
|
void Peer::connect(NetworkConnection && server)
|
|
{
|
|
m_con = std::move(server);
|
|
m_con.setOnConnected(std::bind(&Peer::connected, shared_from_this(), std::placeholders::_1));
|
|
m_con.setOnDisconnected(std::bind(&Peer::disconnected, shared_from_this(), std::placeholders::_1));
|
|
m_con.setOnIncomingMessage(std::bind(&Peer::processMessage, shared_from_this(), std::placeholders::_1));
|
|
m_con.setMessageHeaderLegacy(true);
|
|
m_con.setMessageQueueSizes(10, 1);
|
|
m_con.connect();
|
|
}
|
|
|
|
void Peer::shutdown()
|
|
{
|
|
m_peerStatus = ShuttingDown;
|
|
m_con.shutdown(); // forgets callbacks (shared ptrs) to us.
|
|
}
|
|
|
|
void Peer::connected(const EndPoint &endPoint)
|
|
{
|
|
m_peerStatus = Connected;
|
|
m_connectTime = time(nullptr);
|
|
logDebug() << "connected. Peer:" << connectionId();
|
|
|
|
// send the version message.
|
|
auto &pool = m_connectionManager->pool(400);
|
|
Streaming::P2PBuilder builder(pool);
|
|
builder.writeInt(PROTOCOL_VERSION);
|
|
builder.writeLong(m_connectionManager->servicesBitfield());
|
|
builder.writeLong((uint64_t) time(nullptr));
|
|
// Version msg: target address
|
|
builder.writeLong((uint64_t) 2); // services again
|
|
char buf[16];
|
|
memset(buf, 0, 16);
|
|
buf[10] = buf[11] = 0xff; // mark address as an IPv4 one
|
|
builder.writeByteArray(buf, 16, Streaming::RawBytes);
|
|
builder.writeWord(endPoint.announcePort);
|
|
// Version msg: my address
|
|
builder.writeLong(3); // services again
|
|
builder.writeByteArray(buf, 16, Streaming::RawBytes);
|
|
builder.writeWord(7); // port
|
|
// Version msg: my status
|
|
builder.writeLong(m_connectionManager->appNonce());
|
|
builder.writeString(m_connectionManager->userAgent(), Streaming::WithLength);
|
|
builder.writeInt(m_connectionManager->blockHeight());
|
|
builder.writeBool(/* relay-txs */ false);
|
|
|
|
// version is always the first thing they expect on connect
|
|
Message message = builder.message(Api::P2P::Version);
|
|
logDebug().nospace() << "peer: " << connectionId() << ", sending message (" << message.body().size() << " bytes)";
|
|
m_con.send(message);
|
|
}
|
|
|
|
void Peer::disconnected(const EndPoint &)
|
|
{
|
|
logDebug() << "Disconnected. Peer:" << connectionId();
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
|
|
}
|
|
|
|
void Peer::processMessage(const Message &message)
|
|
{
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
try {
|
|
logDebug() << "Peer:" << connectionId() << "messageId:"
|
|
<< message.header().constData() << "of" << message.body().size() << "bytes";
|
|
if (message.messageId() == Api::P2P::Version) {
|
|
Streaming::P2PParser parser(message);
|
|
m_protocolVersion = parser.readInt();
|
|
m_services = parser.readLong();
|
|
const auto now = time(nullptr);
|
|
// offset of node, adjusted with the round-trip time
|
|
m_timeOffset = (now - parser.readLong()) - (now - m_connectTime);
|
|
|
|
// address
|
|
parser.skip(8 + 16 + 2); // IP (and services and port) of them
|
|
parser.skip(8 + 16 + 2); // IP of me.
|
|
parser.skip(8); // nonce
|
|
m_userAgent = parser.readString();
|
|
m_startHeight = m_peerHeight = parser.readInt();
|
|
m_relaysTransactions = parser.readBool();
|
|
|
|
logCritical() << "Peer:" << connectionId() << "is connected to" << m_userAgent
|
|
<< "Address:" << m_peerAddress;
|
|
m_con.send(Message(Api::LegacyP2P, Api::P2P::VersionAck));
|
|
m_con.send(Message(Api::LegacyP2P, Api::P2P::PreferHeaders));
|
|
m_connectionManager->connectionEstablished(shared_from_this());
|
|
m_peerAddress.successfullyConnected();
|
|
}
|
|
else if (message.messageId() == Api::P2P::Ping) {
|
|
m_con.send(Message(message.body(), Api::LegacyP2P, Api::P2P::Pong));
|
|
}
|
|
else if (message.messageId() == Api::P2P::PreferHeaders) {
|
|
m_preferHeaders = true;
|
|
}
|
|
else if (message.messageId() == Api::P2P::Headers) {
|
|
m_receivedHeaders = true;
|
|
m_connectionManager->addBlockHeaders(message, connectionId());
|
|
}
|
|
else if (message.messageId() == Api::P2P::RejectData) {
|
|
Streaming::P2PParser parser(message);
|
|
const std::string messageType = parser.readString();
|
|
const int errorCode = parser.readByte() ;
|
|
const std::string errorMessage = parser.readString();
|
|
logWarning() << "Reject received for" << messageType
|
|
<< errorCode << errorMessage;
|
|
|
|
// check m_transactions to call callback.
|
|
if (messageType == "tx") {
|
|
// we can only forward it if the additional info is a txid
|
|
uint256 txid;
|
|
try {
|
|
txid = parser.readUint256();
|
|
} catch (...) {
|
|
logDebug() << "No txid present in the reject message";
|
|
return;
|
|
}
|
|
for (auto i = m_transactions.begin(); i != m_transactions.end();) {
|
|
auto txOwner = i->lock();
|
|
if (txOwner) {
|
|
if (txOwner->hash() == txid) {
|
|
txOwner->txRejected(
|
|
static_cast<BroadcastTxData::RejectReason>(errorCode),
|
|
errorMessage);
|
|
m_transactions.erase(i); // it can only be rejected once...
|
|
return;
|
|
}
|
|
++i;
|
|
} else {
|
|
i = m_transactions.erase(i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else if (message.messageId() == Api::P2P::Addresses) {
|
|
m_connectionManager->addAddresses(message, connectionId());
|
|
}
|
|
else if (message.messageId() == Api::P2P::Inventory) {
|
|
m_connectionManager->addInvMessage(message, connectionId());
|
|
}
|
|
else if (message.messageId() == Api::P2P::Data_Transaction) {
|
|
Tx tx(message.body());
|
|
if (m_segment)
|
|
processTransaction(tx);
|
|
else
|
|
m_connectionManager->addTransaction(tx, connectionId());
|
|
}
|
|
else if (message.messageId() == Api::P2P::Data_MerkleBlock) {
|
|
if (!m_segment) { // Received merkleblock without asking for one
|
|
logWarning() << "Peer sent merkleblock without us askign for one. PeerId:" << connectionId();
|
|
m_connectionManager->punish(PUNISHMENT_MAX);
|
|
return;
|
|
}
|
|
Streaming::P2PParser parser(message);
|
|
auto header = BlockHeader::fromMessage(parser);
|
|
int blockHeight = m_connectionManager->blockHeightFor(header.createHash());
|
|
if (blockHeight == -1) { // not on our chain (anymore)
|
|
logWarning() << "Peer sent merkleblock not on our chain (anymore). PeerId:" << connectionId();
|
|
m_connectionManager->punish(PUNISHMENT_MAX);
|
|
return;
|
|
}
|
|
CPartialMerkleTree tree = CPartialMerkleTree::construct(parser);
|
|
if (tree.ExtractMatches(m_transactionHashes) != header.hashMerkleRoot) {
|
|
m_transactionHashes.clear();
|
|
m_merkleBlockHeight = -1;
|
|
throw Streaming::ParsingException("Bad merkle tree received");
|
|
}
|
|
if (!m_blockTransactions.empty())
|
|
throw Streaming::ParsingException("Did not receive all promised Txs for MerkleBlock");
|
|
m_merkleHeader = header;
|
|
m_merkleBlockHeight = blockHeight;
|
|
m_lastReceivedMerkle = blockHeight;
|
|
m_segment->blockSynched(blockHeight);
|
|
logDebug() << "Merkle received by" << connectionId() << "height:" << blockHeight;
|
|
|
|
if (m_lastReceivedMerkle == m_merkleDownloadTo)
|
|
startMerkleDownload(m_merkleDownloadTo + 1);
|
|
}
|
|
else if (message.messageId() == Api::P2P::GetData) {
|
|
Streaming::P2PParser parser(message);
|
|
const size_t count = parser.readCompactInt();
|
|
logDebug() << "Received" << count << "Inv requests using GetData";
|
|
for (size_t i = 0; i < count; ++i) {
|
|
const uint32_t type = parser.readInt();
|
|
auto inv = InventoryItem(parser.readUint256(), type);
|
|
if (type != InventoryItem::TransactionType) { // ignore stupid question
|
|
m_connectionManager->punish(connectionId(), 10);
|
|
continue;
|
|
}
|
|
for (auto i = m_transactions.begin(); i != m_transactions.end();) {
|
|
auto tx = i->lock();
|
|
if (tx.get()) {
|
|
if (tx->hash() == inv.hash()) {
|
|
Streaming::P2PBuilder builder(m_connectionManager->pool(tx->transaction().size()));
|
|
builder.writeByteArray(tx->transaction().data(), Streaming::RawBytes);
|
|
m_con.send(builder.message(Api::P2P::Data_Transaction));
|
|
tx->sentOne();
|
|
break;
|
|
}
|
|
++i;
|
|
} else {
|
|
i = m_transactions.erase(i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (const Streaming::ParsingException &e) {
|
|
logCritical() << "Parsing failure" << e << "peer=" << m_con.connectionId();
|
|
m_peerAddress.punishPeer(200);
|
|
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
|
|
}
|
|
}
|
|
|
|
void Peer::sendFilter_priv()
|
|
{
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
assert(m_segment);
|
|
auto buf = m_segment->writeFilter(m_connectionManager->pool(0));
|
|
m_con.send(Message(buf, Api::LegacyP2P, Api::P2P::FilterLoad));
|
|
m_bloomUploadHeight = m_segment->lastBlockSynched();
|
|
}
|
|
|
|
void Peer::setRequestedHeader(bool requestedHeader)
|
|
{
|
|
m_requestedHeader = requestedHeader;
|
|
}
|
|
|
|
bool Peer::requestedHeader() const
|
|
{
|
|
return m_requestedHeader;
|
|
}
|
|
|
|
uint32_t Peer::connectTime() const
|
|
{
|
|
return m_connectTime;
|
|
}
|
|
|
|
void Peer::sendFilter(const CBloomFilter &bloom, int blockHeight)
|
|
{
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
Streaming::P2PBuilder builder(m_connectionManager->pool(bloom.GetSerializeSize(0, 0)));
|
|
bloom.store(builder);
|
|
m_con.send(Message(builder.buffer(), Api::LegacyP2P, Api::P2P::FilterLoad));
|
|
m_bloomUploadHeight = blockHeight;
|
|
}
|
|
|
|
int Peer::lastReceivedMerkle() const
|
|
{
|
|
return m_lastReceivedMerkle;
|
|
}
|
|
|
|
bool Peer::merkleDownloadInProgress() const
|
|
{
|
|
return m_merkleDownloadFrom >= m_bloomUploadHeight // started
|
|
&& m_merkleDownloadFrom <= m_merkleDownloadTo; // and has not stopped yet
|
|
}
|
|
|
|
void Peer::startMerkleDownload(int from)
|
|
{
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
assert(m_segment);
|
|
if (m_bloomUploadHeight < m_segment->filterChangedHeight() // filter changed since we uploaded it.
|
|
&& m_merkleDownloadFrom >= m_segment->filterChangedHeight()) // unless I'm the one that changed it
|
|
sendFilter_priv(); // then send updated filter
|
|
|
|
m_merkleDownloadFrom = from;
|
|
// we limit our INVs to 100 per request. Notice that the protocol allows for 50000
|
|
m_merkleDownloadTo = std::min(m_merkleDownloadFrom + 100,
|
|
std::min(m_peerHeight, m_connectionManager->blockHeight()));
|
|
|
|
const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom;
|
|
if (count == 0)
|
|
return;
|
|
Streaming::P2PBuilder builder(m_connectionManager->pool(40 * count));
|
|
builder.writeCompactSize(count);
|
|
for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) {
|
|
// write INV data-type
|
|
builder.writeInt(InventoryItem::MerkleBlock);
|
|
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
|
|
}
|
|
m_con.send(builder.message(Api::P2P::GetData));
|
|
}
|
|
|
|
int Peer::bloomUploadHeight() const
|
|
{
|
|
return m_bloomUploadHeight;
|
|
}
|
|
|
|
bool Peer::receivedHeaders() const
|
|
{
|
|
return m_receivedHeaders;
|
|
}
|
|
|
|
void Peer::setPrivacySegment(PrivacySegment *ps)
|
|
{
|
|
assert(ps);
|
|
if (ps != m_segment) {
|
|
assert(m_segment == nullptr);
|
|
m_segment = ps;
|
|
m_segment->addListener(this);
|
|
}
|
|
sendFilter_priv();
|
|
}
|
|
|
|
void Peer::sendTx(const std::shared_ptr<BroadcastTxData> &txOwner)
|
|
{
|
|
// move to our thread to avoid concurrency issues with the deque
|
|
m_con.postOnStrand(std::bind(&Peer::registerTxToSend, this, txOwner));
|
|
|
|
// send INV
|
|
Streaming::P2PBuilder builder(m_connectionManager->pool(40));
|
|
builder.writeCompactSize(1); // inv-count
|
|
builder.writeInt(InventoryItem::TransactionType);
|
|
builder.writeByteArray(txOwner->transaction().createHash(), Streaming::RawBytes);
|
|
m_con.send(builder.message(Api::P2P::Inventory));
|
|
}
|
|
|
|
void Peer::processTransaction(const Tx &tx)
|
|
{
|
|
if (m_peerStatus == ShuttingDown)
|
|
return;
|
|
if (m_merkleBlockHeight > 0) {
|
|
assert(m_segment);
|
|
const uint256 txHash = tx.createHash();
|
|
for (auto iter = m_transactionHashes.begin(); iter != m_transactionHashes.end(); ++iter) {
|
|
if (txHash == *iter) {
|
|
m_transactionHashes.erase(iter);
|
|
const int bh = m_merkleBlockHeight;
|
|
m_blockTransactions.push_back(tx);
|
|
if (m_transactionHashes.empty()) {
|
|
// done with this block
|
|
m_merkleBlockHeight = -1;
|
|
assert(m_segment);
|
|
m_segment->newTransactions(m_merkleHeader, bh, m_blockTransactions);
|
|
m_blockTransactions.clear();
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
// must be a mempool transaction then.
|
|
m_segment->newTransaction(tx);
|
|
}
|
|
|
|
bool Peer::preferHeaders() const
|
|
{
|
|
return m_preferHeaders;
|
|
}
|
|
|
|
bool Peer::relaysTransactions() const
|
|
{
|
|
return m_relaysTransactions;
|
|
}
|
|
|
|
int Peer::startHeight() const
|
|
{
|
|
return m_startHeight;
|
|
}
|
|
|
|
std::string Peer::userAgent() const
|
|
{
|
|
return m_userAgent;
|
|
}
|
|
|
|
int Peer::protocolVersion() const
|
|
{
|
|
return m_protocolVersion;
|
|
}
|
|
|
|
int Peer::timeOffset() const
|
|
{
|
|
return m_timeOffset;
|
|
}
|
|
|
|
uint64_t Peer::services() const
|
|
{
|
|
return m_services;
|
|
}
|
|
|
|
void Peer::filterUpdated()
|
|
{
|
|
m_con.postOnStrand(std::bind(&Peer::sendFilter_priv, shared_from_this()));
|
|
}
|
|
|
|
void Peer::registerTxToSend(std::shared_ptr<BroadcastTxData> txOwner)
|
|
{
|
|
m_transactions.push_back(txOwner);
|
|
}
|
|
|
|
int Peer::peerHeight() const
|
|
{
|
|
return m_peerHeight;
|
|
}
|
|
|
|
void Peer::updatePeerHeight(int peerHeight)
|
|
{
|
|
m_peerHeight = std::max(peerHeight, m_peerHeight);
|
|
}
|