/* * This file is part of the Flowee project * Copyright (C) 2020-2022 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "Peer.h" #include "ConnectionManager.h" #include "PrivacySegment.h" #include "InventoryItem.h" #include "BroadcastTxData.h" #include #include #include #include #include Peer::Peer(ConnectionManager *parent, const PeerAddress &address) : m_peerAddress(address), m_peerStatus(Connecting), m_connectionManager(parent) { assert(m_peerAddress.isValid()); m_peerAddress.setInUse(true); m_timeOffset = time(nullptr); } Peer::~Peer() { assert(m_peerAddress.isValid()); m_peerAddress.setInUse(false); if (m_segment) m_segment->removeListener(this); } void Peer::connect(NetworkConnection && server) { m_con = std::move(server); m_con.setOnConnected(std::bind(&Peer::connected, shared_from_this(), std::placeholders::_1)); m_con.setOnDisconnected(std::bind(&Peer::disconnected, shared_from_this(), std::placeholders::_1)); m_con.setOnIncomingMessage(std::bind(&Peer::processMessage, shared_from_this(), std::placeholders::_1)); m_con.setMessageHeaderLegacy(true); m_con.setMessageQueueSizes(50, 3); m_con.connect(); } void Peer::shutdown() { m_peerStatus = ShuttingDown; m_con.shutdown(); // forgets callbacks (shared ptrs) to us. } void Peer::connected(const EndPoint &endPoint) { m_peerStatus = Connected; m_connectTime = time(nullptr); logDebug() << "connected. Peer:" << connectionId(); // send the version message. auto &pool = Streaming::pool(400); Streaming::P2PBuilder builder(pool); builder.writeInt(PROTOCOL_VERSION); builder.writeLong(m_connectionManager->servicesBitfield()); builder.writeLong((uint64_t) time(nullptr)); // Version msg: target address builder.writeLong((uint64_t) 2); // services again char buf[16]; memset(buf, 0, 16); buf[10] = buf[11] = 0xff; // mark address as an IPv4 one builder.writeByteArray(buf, 16, Streaming::RawBytes); builder.writeWord(endPoint.announcePort); // Version msg: my address builder.writeLong(3); // services again builder.writeByteArray(buf, 16, Streaming::RawBytes); builder.writeWord(7); // port // Version msg: my status builder.writeLong(m_connectionManager->appNonce()); builder.writeString(m_connectionManager->userAgent(), Streaming::WithLength); builder.writeInt(m_connectionManager->blockHeight()); builder.writeBool(/* relay-txs */ false); // version is always the first thing they expect on connect Message message = builder.message(Api::P2P::Version); logDebug().nospace() << "peer: " << connectionId() << ", sending message (" << message.body().size() << " bytes)"; m_con.send(message); } void Peer::disconnected(const EndPoint &) { logDebug() << "Disconnected. Peer:" << connectionId(); if (m_peerStatus == ShuttingDown) return; m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted. } void Peer::processMessage(const Message &message) { if (m_peerStatus == ShuttingDown) return; try { logDebug() << "Peer:" << connectionId() << "messageId:" << message.header().constData() << "of" << message.body().size() << "bytes"; if (message.messageId() == Api::P2P::Version) { Streaming::P2PParser parser(message); m_protocolVersion = parser.readInt(); m_services = parser.readLong(); const auto now = time(nullptr); // offset of node, adjusted with the round-trip time m_timeOffset = (now - parser.readLong()) - (now - m_connectTime); // address parser.skip(8 + 16 + 2); // IP (and services and port) of them parser.skip(8 + 16 + 2); // IP of me. parser.skip(8); // nonce m_userAgent = parser.readString(); m_startHeight = m_peerHeight = parser.readInt(); m_relaysTransactions = parser.readBool(); logCritical() << "Peer:" << connectionId() << "is connected to" << m_userAgent << "Address:" << m_peerAddress; m_con.send(Message(Api::LegacyP2P, Api::P2P::VersionAck)); m_con.send(Message(Api::LegacyP2P, Api::P2P::PreferHeaders)); m_connectionManager->connectionEstablished(shared_from_this()); m_peerAddress.successfullyConnected(); } else if (message.messageId() == Api::P2P::Ping) { m_con.send(Message(message.body(), Api::LegacyP2P, Api::P2P::Pong)); } else if (message.messageId() == Api::P2P::PreferHeaders) { m_preferHeaders = true; } else if (message.messageId() == Api::P2P::Headers) { m_receivedHeaders = true; m_connectionManager->addBlockHeaders(message, connectionId()); } else if (message.messageId() == Api::P2P::RejectData) { Streaming::P2PParser parser(message); const std::string messageType = parser.readString(); const int errorCode = parser.readByte() ; const std::string errorMessage = parser.readString(); logWarning() << "Reject received for" << messageType << errorCode << errorMessage; // check m_transactions to call callback. if (messageType == "tx") { // we can only forward it if the additional info is a txid uint256 txid; try { txid = parser.readUint256(); } catch (...) { logDebug() << "No txid present in the reject message"; return; } for (auto i = m_transactions.begin(); i != m_transactions.end();) { auto txOwner = i->lock(); if (txOwner) { if (txOwner->hash() == txid) { txOwner->txRejected( static_cast(errorCode), errorMessage); m_transactions.erase(i); // it can only be rejected once... return; } ++i; } else { i = m_transactions.erase(i); } } } } else if (message.messageId() == Api::P2P::Addresses) { m_connectionManager->addAddresses(message, connectionId()); } else if (message.messageId() == Api::P2P::Inventory) { m_connectionManager->addInvMessage(message, connectionId()); } else if (message.messageId() == Api::P2P::Data_Transaction) { Tx tx(message.body()); if (m_segment) processTransaction(tx); else m_connectionManager->addTransaction(tx, connectionId()); } else if (message.messageId() == Api::P2P::Data_DSProof) { logCritical() << "Received a DoubleSpendProof from peer:" << connectionId() << "Not implemented yet."; } else if (message.messageId() == Api::P2P::Data_MerkleBlock) { if (!m_segment) { // Received merkleblock without asking for one logWarning() << "Peer sent merkleblock without us askign for one. PeerId:" << connectionId(); m_connectionManager->punish(PUNISHMENT_MAX); return; } if (!m_segment->enabled()) // they want no more. return; Streaming::P2PParser parser(message); auto header = BlockHeader::fromMessage(parser); int blockHeight = m_connectionManager->blockHeightFor(header.createHash()); if (blockHeight == -1) { // not on our chain (anymore) logWarning() << "Peer sent merkleblock not on our chain (anymore). PeerId:" << connectionId(); m_connectionManager->punish(PUNISHMENT_MAX); return; } CPartialMerkleTree tree = CPartialMerkleTree::construct(parser); if (tree.ExtractMatches(m_transactionHashes) != header.hashMerkleRoot) { m_transactionHashes.clear(); m_merkleBlockHeight = -1; throw Streaming::ParsingException("Bad merkle tree received"); } if (!m_blockTransactions.empty()) throw Streaming::ParsingException("Did not receive all promised Txs for MerkleBlock"); m_merkleHeader = header; m_merkleBlockHeight = blockHeight; m_lastReceivedMerkle = blockHeight; /* * When we go back with 'another round of merkle downloads' we end up * calling blockSynched again with a older number than we did before, which confuses * the securitySegment in a bad way. * Apart from us rolling back 10 blocks, the peer may also send us data we * didn't ask for and try to confuse our internal state. * * So, lets make sure we only update our progress in one direction for this peer. */ if (blockHeight > m_highestMerkleReceived) { m_highestMerkleReceived = blockHeight; m_segment->blockSynched(blockHeight); } logDebug() << "Merkle received by" << connectionId() << "height:" << blockHeight; if (m_lastReceivedMerkle == m_merkleDownloadTo) { assert(m_segment); const auto filterChangedHeight = m_segment->filterChangedHeight(); logDebug() << "Doing another round of merkle downloads" << m_merkleDownloadFrom << filterChangedHeight << m_merkleDownloadTo << m_bloomUploaded; if (filterChangedHeight != m_bloomUploaded) { // the wallet updated the bloom filter AFTER we requested merkle blocks, // we need to re-request those blocks with the updated filter. sendFilter(); // send updated filter const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom; logDebug() << " filter changed, re-downloading them (n =" << count << "blocks)"; Streaming::P2PBuilder builder(Streaming::pool(40 * count)); builder.writeCompactSize(count); for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) { builder.writeInt(InventoryItem::MerkleBlock); builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes); } m_con.send(builder.message(Api::P2P::GetData)); } startMerkleDownload(m_merkleDownloadTo + 1); } } else if (message.messageId() == Api::P2P::GetData) { Streaming::P2PParser parser(message); const size_t count = parser.readCompactInt(); logDebug() << "Received" << count << "Inv requests using GetData"; for (size_t i = 0; i < count; ++i) { const uint32_t type = parser.readInt(); auto inv = InventoryItem(parser.readUint256(), type); if (type != InventoryItem::TransactionType) { // ignore stupid question m_connectionManager->punish(connectionId(), 10); continue; } for (auto i = m_transactions.begin(); i != m_transactions.end();) { auto tx = i->lock(); if (tx.get()) { if (tx->hash() == inv.hash()) { Streaming::P2PBuilder builder(Streaming::pool(tx->transaction().size())); builder.writeByteArray(tx->transaction().data(), Streaming::RawBytes); m_con.send(builder.message(Api::P2P::Data_Transaction)); tx->sentOne(); break; } ++i; } else { i = m_transactions.erase(i); } } } } } catch (const Streaming::ParsingException &e) { logCritical() << "Parsing failure" << e << "peer=" << m_con.connectionId(); m_peerAddress.punishPeer(200); m_connectionManager->disconnect(shared_from_this()); // will cause us to be deleted. } } void Peer::sendFilter() { if (m_peerStatus == ShuttingDown) return; assert(m_segment); if (!m_segment->enabled()) return; logDebug() << m_con.connectionId() << "/" << m_segment->segmentId() << "Sending filter! Now at height" << m_segment->lastBlockSynched(); auto buf = m_segment->writeFilter(Streaming::pool(0)); m_con.send(Message(buf, Api::LegacyP2P, Api::P2P::FilterLoad)); m_bloomUploaded = m_segment->filterChangedHeight(); } void Peer::setRequestedHeader(bool requestedHeader) { m_requestedHeader = requestedHeader; } bool Peer::requestedHeader() const { return m_requestedHeader; } uint32_t Peer::connectTime() const { return m_connectTime; } int Peer::lastReceivedMerkle() const { return m_lastReceivedMerkle; } bool Peer::merkleDownloadInProgress() const { return m_merkleDownloadFrom != -1 // has started && 1 + m_merkleDownloadTo - m_merkleDownloadFrom > 0; // has not ended yet } void Peer::startMerkleDownload(int from) { if (m_peerStatus == ShuttingDown) return; assert(m_segment); if (m_bloomUploaded != m_segment->filterChangedHeight()) // filter changed since we uploaded it. sendFilter(); m_merkleDownloadFrom = from; // we limit our MerkleBlock list to 10 per iteration. m_merkleDownloadTo = std::min(m_merkleDownloadFrom + 9, std::min(m_peerHeight, m_connectionManager->blockHeight())); const int count = 1 + m_merkleDownloadTo - m_merkleDownloadFrom; if (count == 0) { // send one last message to get the transactions (via INV) // from the remote peers mempool. m_con.send(Message(Api::LegacyP2P, Api::P2P::Mempool)); return; } assert(count > 0); Streaming::P2PBuilder builder(Streaming::pool(40 * count)); builder.writeCompactSize(count); for (int i = m_merkleDownloadFrom; i <= m_merkleDownloadTo; ++i) { // write INV data-type builder.writeInt(InventoryItem::MerkleBlock); builder.writeByteArray(m_connectionManager->blockHashFor(i), Streaming::RawBytes); } m_con.send(builder.message(Api::P2P::GetData)); } bool Peer::receivedHeaders() const { return m_receivedHeaders; } void Peer::setPrivacySegment(PrivacySegment *ps) { assert(ps); if (ps != m_segment) { assert(m_segment == nullptr); m_segment = ps; m_segment->addListener(this); if (m_bloomUploaded != m_segment->filterChangedHeight()) sendFilter(); } } void Peer::sendTx(const std::shared_ptr &txOwner) { // move to our thread to avoid concurrency issues with the deque m_con.postOnStrand(std::bind(&Peer::registerTxToSend, this, txOwner)); // send INV Streaming::P2PBuilder builder(Streaming::pool(40)); builder.writeCompactSize(1); // inv-count builder.writeInt(InventoryItem::TransactionType); builder.writeByteArray(txOwner->transaction().createHash(), Streaming::RawBytes); m_con.send(builder.message(Api::P2P::Inventory)); } void Peer::processTransaction(const Tx &tx) { if (m_peerStatus == ShuttingDown) return; assert(m_segment); if (!m_segment->enabled()) // their responsibility to re-fetch info later. return; if (m_merkleBlockHeight > 0) { assert(m_segment); const uint256 txHash = tx.createHash(); for (auto iter = m_transactionHashes.begin(); iter != m_transactionHashes.end(); ++iter) { if (txHash == *iter) { m_transactionHashes.erase(iter); const int bh = m_merkleBlockHeight; m_blockTransactions.push_back(tx); if (m_transactionHashes.empty()) { // done with this block m_merkleBlockHeight = -1; assert(m_segment); m_segment->newTransactions(m_merkleHeader, bh, m_blockTransactions); m_blockTransactions.clear(); } return; } } } // must be a mempool transaction then. m_segment->newTransaction(tx); } bool Peer::preferHeaders() const { return m_preferHeaders; } bool Peer::relaysTransactions() const { return m_relaysTransactions; } int Peer::startHeight() const { return m_startHeight; } std::string Peer::userAgent() const { return m_userAgent; } int Peer::protocolVersion() const { return m_protocolVersion; } int Peer::timeOffset() const { return m_timeOffset; } uint64_t Peer::services() const { return m_services; } void Peer::filterUpdated() { if (!merkleDownloadInProgress()) // the merkle download will take care of filter-uploads. m_con.postOnStrand(std::bind(&Peer::sendFilter, shared_from_this())); } void Peer::registerTxToSend(std::shared_ptr txOwner) { m_transactions.push_back(txOwner); } int Peer::peerHeight() const { return m_peerHeight; } void Peer::updatePeerHeight(int peerHeight) { m_peerHeight = std::max(peerHeight, m_peerHeight); }