Files
thehub/libs/p2p/Peer.cpp
T
TomZ 652fc1e161 Introduce structure to broadcast transactions.
To send out transactions in the p2p net is quite a lot of work,
you need to find multiple peers to send the transaction to. First
you send an INV, then you respond to a getData to actually send
the transaction and last you wait for 'reject' messages that may
indicate that there is something wrong with the transaction.

This introduces the BroadcastTxData class that wraps a transaction
and gets callbacks for sending and for rejects, abstracting away
all the complexity for the user.
2020-06-08 21:35:10 +02:00

451 lines
16 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2020 Tom Zander <tomz@freedommail.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Peer.h"
#include "ConnectionManager.h"
#include "PrivacySegment.h"
#include "InventoryItem.h"
#include "BroadcastTxData.h"
#include <version.h>
#include <streaming/P2PParser.h>
#include <streaming/P2PBuilder.h>
#include <boost/asio/error.hpp>
Peer::Peer(ConnectionManager *parent, const PeerAddress &address)
: m_peerAddress(address),
m_peerStatus(Connecting),
m_connectionManager(parent)
{
assert(m_peerAddress.isValid());
m_peerAddress.setInUse(true);
m_timeOffset = time(nullptr);
}
Peer::~Peer()
{
assert(m_peerAddress.isValid());
m_peerAddress.setInUse(false);
if (m_segment)
m_segment->removeListener(this);
}
void Peer::connect(NetworkConnection && server)
{
m_con = std::move(server);
m_con.setOnConnected(std::bind(&Peer::connected, shared_from_this(), std::placeholders::_1));
m_con.setOnDisconnected(std::bind(&Peer::disconnected, shared_from_this(), std::placeholders::_1));
m_con.setOnIncomingMessage(std::bind(&Peer::processMessage, shared_from_this(), std::placeholders::_1));
m_con.setMessageHeaderLegacy(true);
m_con.setMessageQueueSizes(10, 1);
m_con.connect();
}
void Peer::shutdown()
{
m_peerStatus = ShuttingDown;
m_con.shutdown(); // forgets callbacks (shared ptrs) to us.
}
void Peer::connected(const EndPoint &endPoint)
{
m_peerStatus = Connected;
m_connectTime = time(nullptr);
logDebug() << "connected. Peer:" << connectionId();
// send the version message.
auto &pool = m_connectionManager->pool(400);
Streaming::P2PBuilder builder(pool);
builder.writeInt(PROTOCOL_VERSION);
builder.writeLong(m_connectionManager->servicesBitfield());
builder.writeLong((uint64_t) time(nullptr));
// Version msg: target address
builder.writeLong((uint64_t) 2); // services again
char buf[16];
memset(buf, 0, 16);
buf[10] = buf[11] = 0xff; // mark address as an IPv4 one
builder.writeByteArray(buf, 16, Streaming::RawBytes);
builder.writeWord(endPoint.announcePort);
// Version msg: my address
builder.writeLong(3); // services again
builder.writeByteArray(buf, 16, Streaming::RawBytes);
builder.writeWord(7); // port
// Version msg: my status
builder.writeLong(m_connectionManager->appNonce());
builder.writeString(m_connectionManager->userAgent(), Streaming::WithLength);
builder.writeInt(m_connectionManager->blockHeight());
builder.writeBool(/* relay-txs */ false);
// version is always the first thing they expect on connect
Message message = builder.message(Api::P2P::Version);
logDebug().nospace() << "peer: " << connectionId() << ", sending message (" << message.body().size() << " bytes)";
m_con.send(message);
}
void Peer::disconnected(const EndPoint &)
{
logDebug() << "Disconnected. Peer:" << connectionId();
if (m_peerStatus == ShuttingDown)
return;
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
}
void Peer::processMessage(const Message &message)
{
if (m_peerStatus == ShuttingDown)
return;
try {
logDebug() << "Peer:" << connectionId() << "messageId:"
<< message.header().constData() << "of" << message.body().size() << "bytes";
if (message.messageId() == Api::P2P::Version) {
Streaming::P2PParser parser(message);
m_protocolVersion = parser.readInt();
m_services = parser.readLong();
const auto now = time(nullptr);
// offset of node, adjusted with the round-trip time
m_timeOffset = (now - parser.readLong()) - (now - m_connectTime);
// address
parser.skip(8 + 16 + 2); // IP (and services and port) of them
parser.skip(8 + 16 + 2); // IP of me.
parser.skip(8); // nonce
m_userAgent = parser.readString();
m_startHeight = parser.readInt();
m_relaysTransactions = parser.readBool();
logCritical() << "Peer:" << connectionId() << "is connected to" << m_userAgent
<< "Address:" << m_peerAddress;
m_con.send(Message(Api::LegacyP2P, Api::P2P::VersionAck));
m_con.send(Message(Api::LegacyP2P, Api::P2P::PreferHeaders));
m_connectionManager->connectionEstablished(shared_from_this());
m_peerAddress.successfullyConnected();
}
else if (message.messageId() == Api::P2P::Ping) {
m_con.send(Message(message.body(), Api::LegacyP2P, Api::P2P::Pong));
}
else if (message.messageId() == Api::P2P::PreferHeaders) {
m_preferHeaders = true;
}
else if (message.messageId() == Api::P2P::Headers) {
m_receivedHeaders = true;
m_connectionManager->addBlockHeaders(message, connectionId());
}
else if (message.messageId() == Api::P2P::RejectData) {
Streaming::P2PParser parser(message);
const std::string messageType = parser.readString();
const int errorCode = parser.readByte() ;
const std::string errorMessage = parser.readString();
logWarning() << "Reject received for" << messageType
<< errorCode << errorMessage;
// check m_transactions to call callback.
if (messageType == "tx") {
// we can only forward it if the additional info is a txid
uint256 txid;
try {
txid = parser.readUint256();
} catch (...) {
logDebug() << "No txid present in the reject message";
return;
}
for (auto i = m_transactions.begin(); i != m_transactions.end();) {
auto txOwner = i->lock();
if (txOwner) {
if (txOwner->hash() == txid) {
txOwner->txRejected(
static_cast<BroadcastTxData::RejectReason>(errorCode),
errorMessage);
m_transactions.erase(i); // it can only be rejected once...
return;
}
} else {
i = m_transactions.erase(i);
}
}
}
}
else if (message.messageId() == Api::P2P::Addresses) {
m_connectionManager->addAddresses(message, connectionId());
}
else if (message.messageId() == Api::P2P::Inventory) {
m_connectionManager->addInvMessage(message, connectionId());
}
else if (message.messageId() == Api::P2P::Data_Transaction) {
Tx tx(message.body());
if (m_segment)
processTransaction(tx);
else
m_connectionManager->addTransaction(tx, connectionId());
}
else if (message.messageId() == Api::P2P::Data_MerkleBlock) {
if (!m_segment) { // Received merkleblock without asking for one
m_connectionManager->punish(PUNISHMENT_MAX);
return;
}
Streaming::P2PParser parser(message);
auto header = BlockHeader::fromMessage(parser);
int blockHeight = m_connectionManager->blockHeightFor(header.createHash());
if (blockHeight == -1) { // not on our chain (anymore)
m_connectionManager->punish(PUNISHMENT_MAX);
return;
}
CPartialMerkleTree tree = CPartialMerkleTree::construct(parser);
if (tree.ExtractMatches(m_transactionHashes) != header.hashMerkleRoot) {
m_transactionHashes.clear();
m_merkleBlockHeight = -1;
throw Streaming::ParsingException("Bad merkle tree received");
}
if (!m_blockTransactions.empty())
throw Streaming::ParsingException("Did not receive all promised Txs for MerkleBlock");
m_merkleHeader = header;
m_merkleBlockHeight = blockHeight;
m_lastReceivedMerkle = blockHeight;
m_segment->blockSynched(blockHeight);
logDebug() << "Merkle received by" << connectionId() << "height:" << blockHeight;
if (m_lastReceivedMerkle == m_merkleDownloadTo - 1) {
m_merkleDownloadFrom = m_merkleDownloadTo;
// we limit our INVs to 100 per request. Notice that the protocol allows for 50000
m_merkleDownloadTo = std::min(m_merkleDownloadFrom + 100, m_connectionManager->blockHeight() + 1);
requestMerkleBlocks();
}
}
else if (message.messageId() == Api::P2P::GetData) {
Streaming::P2PParser parser(message);
const size_t count = parser.readCompactInt();
logDebug() << "Received" << count << "Inv requests using GetData";
for (size_t i = 0; i < count; ++i) {
const uint32_t type = parser.readInt();
auto inv = InventoryItem(parser.readUint256(), type);
if (type != InventoryItem::TransactionType) { // ignore stupid question
m_connectionManager->punish(connectionId(), 10);
continue;
}
for (auto i = m_transactions.begin(); i != m_transactions.end();) {
auto tx = i->lock();
if (tx.get()) {
if (tx->hash() == inv.hash()) {
Streaming::P2PBuilder builder(m_connectionManager->pool(tx->transaction().size()));
builder.writeByteArray(tx->transaction().data(), Streaming::RawBytes);
m_con.send(builder.message(Api::P2P::Data_Transaction));
tx->sentOne();
break;
}
++i;
} else {
i = m_transactions.erase(i);
}
}
}
}
} catch (const Streaming::ParsingException &e) {
logCritical() << "Parsing failure" << e << "peer=" << m_con.connectionId();
m_peerAddress.punishPeer(200);
m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted.
}
}
void Peer::sendFilter_priv()
{
if (m_peerStatus == ShuttingDown)
return;
assert(m_segment);
auto buf = m_segment->writeFilter(m_connectionManager->pool(0));
m_con.send(Message(buf, Api::LegacyP2P, Api::P2P::FilterLoad));
m_bloomUploadHeight = m_segment->lastBlockSynched();
}
void Peer::setRequestedHeader(bool requestedHeader)
{
m_requestedHeader = requestedHeader;
}
bool Peer::requestedHeader() const
{
return m_requestedHeader;
}
uint32_t Peer::connectTime() const
{
return m_connectTime;
}
void Peer::sendFilter(const CBloomFilter &bloom, int blockHeight)
{
if (m_peerStatus == ShuttingDown)
return;
Streaming::P2PBuilder builder(m_connectionManager->pool(bloom.GetSerializeSize(0, 0)));
bloom.store(builder);
m_con.send(Message(builder.buffer(), Api::LegacyP2P, Api::P2P::FilterLoad));
m_bloomUploadHeight = blockHeight;
}
int Peer::lastReceivedMerkle() const
{
return m_lastReceivedMerkle;
}
bool Peer::merkleDownloadInProgress() const
{
return m_merkleDownloadFrom >= m_bloomUploadHeight // started
&& m_merkleDownloadFrom < m_merkleDownloadTo; // and has not stopped yet
}
void Peer::startMerkleDownload(int from)
{
if (m_peerStatus == ShuttingDown)
return;
assert(m_segment);
if (m_bloomUploadHeight < m_segment->filterChangedHeight() // filter changed since we uploaded it.
&& m_merkleDownloadFrom >= m_segment->filterChangedHeight()) // unless I'm the one that changed it
sendFilter_priv(); // then send updated filter
m_merkleDownloadFrom = from;
// we limit our INVs to 100 per message. Notice that the protocol allows for 50000
m_merkleDownloadTo = std::min(from + 100, m_connectionManager->blockHeight() + 1);
requestMerkleBlocks();
}
void Peer::requestMerkleBlocks()
{
if (m_peerStatus == ShuttingDown)
return;
const int count = m_merkleDownloadTo - m_merkleDownloadFrom;
if (count == 0)
return;
Streaming::P2PBuilder builder(m_connectionManager->pool(40 * count));
builder.writeCompactSize(count);
for (int i = m_merkleDownloadFrom; i < m_merkleDownloadTo; ++i) {
// write INV data-type
builder.writeInt(InventoryItem::MerkleBlock);
builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes);
}
m_con.send(builder.message(Api::P2P::GetData));
}
int Peer::bloomUploadHeight() const
{
return m_bloomUploadHeight;
}
bool Peer::receivedHeaders() const
{
return m_receivedHeaders;
}
void Peer::setPrivacySegment(PrivacySegment *ps)
{
assert(ps);
if (ps != m_segment) {
assert(m_segment == nullptr);
m_segment = ps;
m_segment->addListener(this);
}
sendFilter_priv();
}
void Peer::sendTx(const std::shared_ptr<BroadcastTxData> &txOwner)
{
// move to our thread to avoid concurrency issues with the deque
m_con.postOnStrand(std::bind(&Peer::registerTxToSend, this, txOwner));
// send INV
Streaming::P2PBuilder builder(m_connectionManager->pool(40));
builder.writeCompactSize(1); // inv-count
builder.writeInt(InventoryItem::TransactionType);
builder.writeByteArray(txOwner->transaction().createHash(), Streaming::RawBytes);
m_con.send(builder.message(Api::P2P::Inventory));
}
void Peer::processTransaction(const Tx &tx)
{
if (m_peerStatus == ShuttingDown)
return;
if (m_merkleBlockHeight > 0) {
assert(m_segment);
const uint256 txHash = tx.createHash();
for (auto iter = m_transactionHashes.begin(); iter != m_transactionHashes.end(); ++iter) {
if (txHash == *iter) {
m_transactionHashes.erase(iter);
const int bh = m_merkleBlockHeight;
m_blockTransactions.push_back(tx);
if (m_transactionHashes.empty()) {
// done with this block
m_merkleBlockHeight = -1;
assert(m_segment);
m_segment->newTransactions(m_merkleHeader, bh, m_blockTransactions);
m_blockTransactions.clear();
}
return;
}
}
}
// must be a mempool transaction then.
m_segment->newTransaction(tx);
}
bool Peer::preferHeaders() const
{
return m_preferHeaders;
}
bool Peer::relaysTransactions() const
{
return m_relaysTransactions;
}
int Peer::startHeight() const
{
return m_startHeight;
}
std::string Peer::userAgent() const
{
return m_userAgent;
}
int Peer::protocolVersion() const
{
return m_protocolVersion;
}
int Peer::timeOffset() const
{
return m_timeOffset;
}
uint64_t Peer::services() const
{
return m_services;
}
void Peer::filterUpdated()
{
m_con.postOnStrand(std::bind(&Peer::sendFilter_priv, shared_from_this()));
}
void Peer::registerTxToSend(std::shared_ptr<BroadcastTxData> txOwner)
{
m_transactions.push_back(txOwner);
}
/*
* It would be useful to add a timeout for merkleblock and transactions. So if the peer doesn't deliver in
* the set time we move to another peer less busy (or less evil).
*/