Files
thehub/hub/api/AddressMonitorService.cpp
tomFlowee d8e50a348e Make results per connection
This is a patch from John Galt to fix the problem that multiple
connections could end up getting settings from one connection
applied to the reply of another.
2025-08-11 14:28:21 +02:00

393 lines
16 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2018-2025 Tom Zander <tom@flowee.org>
* Copyright (C) 2025 John Galt <johngaltbch@pm.me>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "AddressMonitorService.h"
// server 'lib'
#include <txmempool.h>
#include <DoubleSpendProof.h>
#include <encodings_legacy.h>
#include <chain.h>
#include <Application.h>
#include <NetworkManager.h>
#include <APIProtocol.h>
#include <Logger.h>
#include <Message.h>
#include <streaming/MessageBuilder.h>
#include <streaming/MessageParser.h>
#include <streaming/BufferPools.h>
#include <streaming/streams.h>
#include <primitives/Block.h>
#include <BitcoinVersion.h>
AddressMonitorService::AddressMonitorService()
: NetworkService(Api::AddressMonitorService),
m_pool(std::make_shared<Streaming::BufferPool>())
{
ValidationNotifier().addListener(this);
}
AddressMonitorService::~AddressMonitorService()
{
ValidationNotifier().removeListener(this);
}
void AddressMonitorService::syncTx(const Tx &tx)
{
auto rem = remotes();
std::map<int, Match> matchesPerRemote;
Tx::Iterator iter(tx);
if (!match(iter, rem, matchesPerRemote))
return;
for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) {
const Match &data = i->second;
if (!data.matches.empty())
sendMatchesToRemote(rem[i->first], data, tx.createHash(), -1, -1);
}
}
bool AddressMonitorService::match(Tx::Iterator &iter, const std::deque<NetworkService::Remote *> &remotes, std::map<int, Match> &matchingRemotes) const
{
if (remotes.empty())
return false;
auto type = iter.next();
if (type == Tx::End) // then the second end means end of block
return false;
MatchedOutput output;
int index = 0; // output index
while (type != Tx::End) {
if (type == Tx::OutputValue) {
output = MatchedOutput();
output.index = index;
output.amount = iter.longData();
}
else if (type == Tx::CashTokenBitfield) {
uint8_t tokenbitfield = iter.bitfieldData();
output.hasToken = true;
output.tokenIsFT = (tokenbitfield & 0x10) == 0x10;
output.tokenIsNFT = (tokenbitfield & 0x20) == 0x20;
output.tokenIsImmutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x00;
output.tokenIsMutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x01;
output.tokenIsMinting = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x02;
}
else if (type == Tx::CashTokenCategory) {
output.tokenCategory = iter.byteData();
}
else if (type == Tx::CashTokenCommitment) {
output.tokenCommitment = iter.byteData();
}
else if (type == Tx::CashTokenAmount) {
output.tokenAmount = iter.longData();
}
else if (type == Tx::OutputScript) {
uint256 hashedOutScript;
iter.hashByteData(hashedOutScript);
for (size_t i = 0; i < remotes.size(); ++i) {
assert(i < INT_MAX);
RemoteWithKeys *rwk = static_cast<RemoteWithKeys*>(remotes.at(i));
if (rwk->hashes.find(hashedOutScript) != rwk->hashes.end()) {
Match &m = matchingRemotes[static_cast<int>(i)];
MatchedOutput copy = output; // copy for this remote
copy.hashedOutScript = hashedOutScript; // per-remote tweak is fine on the copy
m.matches.push_back(std::move(copy));
}
}
index++;
}
type = iter.next();
}
return true;
}
void AddressMonitorService::sendMatchesToRemote(Remote *remote, const Match &data, const uint256 &txid, int blockheight, int64_t offsetInBlock)
{
const int matchCount = data.matches.size();
int messageSize = 0;
for (const auto &output : data.matches) {
messageSize += output.serializedSize() + 1;
}
if (blockheight > 0)
messageSize += 10;
if (offsetInBlock > 0)
messageSize += 10;
std::lock_guard<std::mutex> guard(m_poolMutex);
m_pool->reserve(messageSize);
int count = 0;
Streaming::MessageBuilder builder(m_pool);
builder.add(Api::AddressMonitor::TxId, txid);
if (blockheight > 0)
builder.add(Api::AddressMonitor::BlockHeight, blockheight);
if (offsetInBlock >= 0)
builder.add(Api::AddressMonitor::OffsetInBlock, static_cast<uint64_t>(offsetInBlock));
for (const auto &output : data.matches) {
++count;
output.serialize(builder);
if (count != matchCount)
builder.add(Api::AddressMonitor::Separator, true);
}
logDebug(Log::MonitorService) << "Remote gets" << data.matches.size() << "tx notification(s)";
remote->connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::TransactionFound));
}
void AddressMonitorService::syncAllTransactionsInBlock(const Block &block, CBlockIndex *index)
{
assert(index);
Tx::Iterator iter(block);
auto rem = remotes();
while (true) {
std::map<int, Match> matchesPerRemote;
if (!match(iter, rem, matchesPerRemote))
return; // end of block
// one (or zero) message per transaction
for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) {
const Match &data = i->second;
if (!data.matches.empty()) {
auto tx = iter.prevTx();
sendMatchesToRemote(rem[i->first], data, tx.createHash(), index->nHeight, tx.offsetInBlock(block));
}
}
}
}
void AddressMonitorService::doubleSpendFound(const Tx &first, const Tx &duplicate)
{
logDebug(Log::MonitorService) << "Double spend found" << first.createHash() << duplicate.createHash();
const auto rem = remotes();
std::map<int, Match> matchesPerRemote;
Tx::Iterator iter(first);
if (!match(iter, rem, matchesPerRemote))
return; // returns if no listeners
Tx::Iterator iter2(duplicate);
bool m = match(iter2, rem, matchesPerRemote);
assert(m); // our duplicate tx object should have data
for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) {
Match &data = i->second;
int messageSize = 0;
for (const auto &output : data.matches) {
messageSize += output.serializedSize();
}
std::lock_guard<std::mutex> guard(m_poolMutex);
m_pool->reserve(messageSize);
Streaming::MessageBuilder builder(m_pool);
for (const auto &output : data.matches) {
output.serialize(builder);
}
builder.add(Api::AddressMonitor::TxId, first.createHash());
builder.add(Api::AddressMonitor::TransactionData, duplicate.data());
rem[i->first]->connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::DoubleSpendFound));
}
}
void AddressMonitorService::doubleSpendFound(const Tx &txInMempool, const DoubleSpendProof &proof)
{
logDebug(Log::MonitorService) << "Double spend proof found. TxId:" << txInMempool.createHash();
const auto rem = remotes();
std::map<int, Match> matchesPerRemote;
Tx::Iterator iter(txInMempool);
if (!match(iter, rem, matchesPerRemote))
return; // returns false if no listeners
CDataStream stream(SER_NETWORK, PROTOCOL_VERSION);
stream << proof;
const std::vector<uint8_t> serializedProof(stream.begin(), stream.end());
for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) {
Match &data = i->second;
int messageSize = 0;
for (const auto &output : data.matches) {
messageSize += output.serializedSize() + 1;
}
std::lock_guard<std::mutex> guard(m_poolMutex);
m_pool->reserve(messageSize);
Streaming::MessageBuilder builder(m_pool);
for (const auto &output : data.matches) {
output.serialize(builder);
}
builder.add(Api::AddressMonitor::TxId, txInMempool.createHash());
builder.addByteArray(Api::AddressMonitor::DoubleSpendProofData, &serializedProof[0], serializedProof.size());
rem[i->first]->connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::DoubleSpendFound));
}
}
void AddressMonitorService::onIncomingMessage(Remote *remote_, const Message &message, const EndPoint &ep)
{
assert(dynamic_cast<RemoteWithKeys*>(remote_));
RemoteWithKeys *remote = static_cast<RemoteWithKeys*>(remote_);
if (message.messageId() == Api::AddressMonitor::Subscribe
|| message.messageId() == Api::AddressMonitor::Unsubscribe) {
Streaming::MessageParser parser(message.body());
std::string error;
int done = 0;
while (parser.next() == Streaming::FoundTag) {
if (parser.tag() == Api::AddressMonitor::BitcoinScriptHashed) {
if (parser.isByteArray() && parser.dataLength() == 32) {
uint256 hash = parser.uint256Data();
++done;
if (message.messageId() == Api::AddressMonitor::Subscribe) {
if (m_maxAddressesPerConnection > 0 && static_cast<int>(remote->hashes.size()) + 1 >= m_maxAddressesPerConnection) {
logInfo(Log::MonitorService) << "Remote" << ep.connectionId << "hit limit of registrations";
error = "Maximum number of addresses registered for watching, ask your node operator to change limits";
break;
}
remote->hashes.insert(hash);
remote->connection.postOnStrand(std::bind(&AddressMonitorService::findTxInMempool,
this, remote->connection.connectionId(), hash));
} else {
remote->hashes.erase(hash);
}
}
else {
error = "BitcoinScriptHashed has to be a sha256 (bytearray of 32 bytes)";
}
}
}
if (error.empty() && !done)
error = "Missing required field BitcoinScriptHashed (2)";
auto pool = Streaming::pool(10 + error.size());
Streaming::MessageBuilder builder(pool);
builder.add(Api::AddressMonitor::Result, done);
if (message.messageId() == Api::AddressMonitor::Subscribe)
logInfo(Log::MonitorService) << "Remote" << ep.connectionId << "made" << done << "changes. Hashes count:"
<< remote->hashes.size();
if (!error.empty())
builder.add(Api::AddressMonitor::ErrorMessage, error);
remote->connection.send(builder.reply(message));
updateBools();
}
}
void AddressMonitorService::updateBools()
{
m_findByHash = false;
for (auto remote : remotes()) {
RemoteWithKeys *rwk = static_cast<RemoteWithKeys*>(remote);
m_findByHash = m_findByHash || !rwk->hashes.empty();
}
}
void AddressMonitorService::findTxInMempool(int connectionId, const uint256 &hash)
{
if (m_mempool == nullptr)
return;
if (manager() == nullptr)
return;
auto connection = manager()->connection(manager()->endPoint(connectionId), NetworkManager::OnlyExisting);
if (!connection.isValid() || !connection.isConnected())
return;
LOCK(m_mempool->cs);
for (auto iter = m_mempool->mapTx.begin(); iter != m_mempool->mapTx.end(); ++iter) {
Tx::Iterator txIter(iter->tx);
auto type = txIter.next();
MatchedOutput output;
int index = 0;
while (type != Tx::End) {
if (type == Tx::OutputValue) {
output = MatchedOutput();
output.index = index;
output.amount = txIter.longData();
}
else if (type == Tx::CashTokenBitfield) {
uint8_t tokenbitfield = txIter.bitfieldData();
output.tokenIsFT = (tokenbitfield & 0x10) == 0x10;
output.tokenIsNFT = (tokenbitfield & 0x20) == 0x20;
output.tokenIsImmutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x00;
output.tokenIsMutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x01;
output.tokenIsMinting = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x02;
}
else if (type == Tx::CashTokenCategory) {
output.tokenCategory = txIter.byteData();
}
else if (type == Tx::CashTokenCommitment) {
output.tokenCommitment = txIter.byteData();
}
else if (type == Tx::CashTokenAmount) {
output.tokenAmount = txIter.longData();
}
else if (type == Tx::OutputScript) {
uint256 hashedOutScript;
txIter.hashByteData(hashedOutScript);
if (txIter.hashedByteData() == hash) {
logDebug(Log::MonitorService) << " + Sending to peers tx from mempool!";
output.hashedOutScript = hash; // to serialize it
std::lock_guard<std::mutex> guard(m_poolMutex);
m_pool->reserve(output.serializedSize() + 40);
Streaming::MessageBuilder builder(m_pool);
builder.add(Api::AddressMonitor::TxId, iter->tx.createHash());
output.serialize(builder);
Message message = builder.message(Api::AddressMonitorService, Api::AddressMonitor::TransactionFound);
connection.send(message);
}
index++;
}
type = txIter.next();
}
}
}
int AddressMonitorService::maxAddressesPerConnection() const
{
return m_maxAddressesPerConnection;
}
void AddressMonitorService::setMaxAddressesPerConnection(int maxAddressesPerConnection)
{
m_maxAddressesPerConnection = maxAddressesPerConnection;
}
int AddressMonitorService::MatchedOutput::serializedSize() const
{
int count = 40;
if (hasToken)
count += 55 + tokenCommitment.size();
return count;
}
void AddressMonitorService::MatchedOutput::serialize(Streaming::MessageBuilder &builder) const
{
builder.add(Api::AddressMonitor::BitcoinScriptHashed, hashedOutScript); // 22 bytes
builder.add(Api::AddressMonitor::Amount, amount); // up to 10 bytes
builder.add(Api::AddressMonitor::Tx_Out_Index, index); //up to 6 bytes
builder.add(Api::AddressMonitor::Tx_Out_CT_IsFT, tokenIsFT); //1 byte
builder.add(Api::AddressMonitor::Tx_Out_CT_IsNFT, tokenIsNFT); //1 byte
if (hasToken) {
assert(tokenCategory.size() == 32);
builder.add(Api::AddressMonitor::Tx_Out_CT_Category, tokenCategory); // 35 bytes
builder.add(Api::AddressMonitor::Tx_Out_CT_IsMutable, tokenIsMutable); //1 byte
builder.add(Api::AddressMonitor::Tx_Out_CT_IsImmutable, tokenIsImmutable); //1 byte
builder.add(Api::AddressMonitor::Tx_Out_CT_IsMinting, tokenIsMinting); //2 bytes
if (!tokenCommitment.isValid())
builder.add(Api::AddressMonitor::Tx_Out_CT_Commitment, tokenCommitment); // 4 + commitment length (assuming 2 bytes for the length indicator (14 bits) should be enough
builder.add(Api::AddressMonitor::Tx_Out_CT_Amount, tokenAmount); // up to 10 bytes
}
}