d8e50a348e
This is a patch from John Galt to fix the problem that multiple connections could end up getting settings from one connection applied to the reply of another.
393 lines
16 KiB
C++
393 lines
16 KiB
C++
/*
|
|
* This file is part of the Flowee project
|
|
* Copyright (C) 2018-2025 Tom Zander <tom@flowee.org>
|
|
* Copyright (C) 2025 John Galt <johngaltbch@pm.me>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "AddressMonitorService.h"
|
|
|
|
// server 'lib'
|
|
#include <txmempool.h>
|
|
#include <DoubleSpendProof.h>
|
|
#include <encodings_legacy.h>
|
|
#include <chain.h>
|
|
#include <Application.h>
|
|
#include <NetworkManager.h>
|
|
#include <APIProtocol.h>
|
|
|
|
#include <Logger.h>
|
|
#include <Message.h>
|
|
#include <streaming/MessageBuilder.h>
|
|
#include <streaming/MessageParser.h>
|
|
#include <streaming/BufferPools.h>
|
|
#include <streaming/streams.h>
|
|
#include <primitives/Block.h>
|
|
#include <BitcoinVersion.h>
|
|
|
|
AddressMonitorService::AddressMonitorService()
|
|
: NetworkService(Api::AddressMonitorService),
|
|
m_pool(std::make_shared<Streaming::BufferPool>())
|
|
{
|
|
ValidationNotifier().addListener(this);
|
|
}
|
|
|
|
AddressMonitorService::~AddressMonitorService()
|
|
{
|
|
ValidationNotifier().removeListener(this);
|
|
}
|
|
|
|
void AddressMonitorService::syncTx(const Tx &tx)
|
|
{
|
|
auto rem = remotes();
|
|
std::map<int, Match> matchesPerRemote;
|
|
Tx::Iterator iter(tx);
|
|
if (!match(iter, rem, matchesPerRemote))
|
|
return;
|
|
|
|
for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) {
|
|
const Match &data = i->second;
|
|
if (!data.matches.empty())
|
|
sendMatchesToRemote(rem[i->first], data, tx.createHash(), -1, -1);
|
|
}
|
|
}
|
|
|
|
bool AddressMonitorService::match(Tx::Iterator &iter, const std::deque<NetworkService::Remote *> &remotes, std::map<int, Match> &matchingRemotes) const
|
|
{
|
|
if (remotes.empty())
|
|
return false;
|
|
auto type = iter.next();
|
|
if (type == Tx::End) // then the second end means end of block
|
|
return false;
|
|
|
|
MatchedOutput output;
|
|
int index = 0; // output index
|
|
while (type != Tx::End) {
|
|
if (type == Tx::OutputValue) {
|
|
output = MatchedOutput();
|
|
output.index = index;
|
|
output.amount = iter.longData();
|
|
}
|
|
else if (type == Tx::CashTokenBitfield) {
|
|
uint8_t tokenbitfield = iter.bitfieldData();
|
|
output.hasToken = true;
|
|
output.tokenIsFT = (tokenbitfield & 0x10) == 0x10;
|
|
output.tokenIsNFT = (tokenbitfield & 0x20) == 0x20;
|
|
output.tokenIsImmutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x00;
|
|
output.tokenIsMutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x01;
|
|
output.tokenIsMinting = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x02;
|
|
}
|
|
else if (type == Tx::CashTokenCategory) {
|
|
output.tokenCategory = iter.byteData();
|
|
}
|
|
else if (type == Tx::CashTokenCommitment) {
|
|
output.tokenCommitment = iter.byteData();
|
|
}
|
|
else if (type == Tx::CashTokenAmount) {
|
|
output.tokenAmount = iter.longData();
|
|
}
|
|
else if (type == Tx::OutputScript) {
|
|
uint256 hashedOutScript;
|
|
iter.hashByteData(hashedOutScript);
|
|
for (size_t i = 0; i < remotes.size(); ++i) {
|
|
assert(i < INT_MAX);
|
|
RemoteWithKeys *rwk = static_cast<RemoteWithKeys*>(remotes.at(i));
|
|
if (rwk->hashes.find(hashedOutScript) != rwk->hashes.end()) {
|
|
Match &m = matchingRemotes[static_cast<int>(i)];
|
|
MatchedOutput copy = output; // copy for this remote
|
|
copy.hashedOutScript = hashedOutScript; // per-remote tweak is fine on the copy
|
|
m.matches.push_back(std::move(copy));
|
|
}
|
|
}
|
|
index++;
|
|
}
|
|
type = iter.next();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void AddressMonitorService::sendMatchesToRemote(Remote *remote, const Match &data, const uint256 &txid, int blockheight, int64_t offsetInBlock)
|
|
{
|
|
const int matchCount = data.matches.size();
|
|
int messageSize = 0;
|
|
for (const auto &output : data.matches) {
|
|
messageSize += output.serializedSize() + 1;
|
|
}
|
|
if (blockheight > 0)
|
|
messageSize += 10;
|
|
if (offsetInBlock > 0)
|
|
messageSize += 10;
|
|
std::lock_guard<std::mutex> guard(m_poolMutex);
|
|
m_pool->reserve(messageSize);
|
|
int count = 0;
|
|
Streaming::MessageBuilder builder(m_pool);
|
|
builder.add(Api::AddressMonitor::TxId, txid);
|
|
if (blockheight > 0)
|
|
builder.add(Api::AddressMonitor::BlockHeight, blockheight);
|
|
if (offsetInBlock >= 0)
|
|
builder.add(Api::AddressMonitor::OffsetInBlock, static_cast<uint64_t>(offsetInBlock));
|
|
for (const auto &output : data.matches) {
|
|
++count;
|
|
output.serialize(builder);
|
|
if (count != matchCount)
|
|
builder.add(Api::AddressMonitor::Separator, true);
|
|
}
|
|
logDebug(Log::MonitorService) << "Remote gets" << data.matches.size() << "tx notification(s)";
|
|
remote->connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::TransactionFound));
|
|
}
|
|
|
|
void AddressMonitorService::syncAllTransactionsInBlock(const Block &block, CBlockIndex *index)
|
|
{
|
|
assert(index);
|
|
Tx::Iterator iter(block);
|
|
auto rem = remotes();
|
|
while (true) {
|
|
std::map<int, Match> matchesPerRemote;
|
|
if (!match(iter, rem, matchesPerRemote))
|
|
return; // end of block
|
|
|
|
// one (or zero) message per transaction
|
|
for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) {
|
|
const Match &data = i->second;
|
|
if (!data.matches.empty()) {
|
|
auto tx = iter.prevTx();
|
|
sendMatchesToRemote(rem[i->first], data, tx.createHash(), index->nHeight, tx.offsetInBlock(block));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void AddressMonitorService::doubleSpendFound(const Tx &first, const Tx &duplicate)
|
|
{
|
|
logDebug(Log::MonitorService) << "Double spend found" << first.createHash() << duplicate.createHash();
|
|
const auto rem = remotes();
|
|
std::map<int, Match> matchesPerRemote;
|
|
Tx::Iterator iter(first);
|
|
if (!match(iter, rem, matchesPerRemote))
|
|
return; // returns if no listeners
|
|
|
|
Tx::Iterator iter2(duplicate);
|
|
bool m = match(iter2, rem, matchesPerRemote);
|
|
assert(m); // our duplicate tx object should have data
|
|
|
|
for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) {
|
|
Match &data = i->second;
|
|
int messageSize = 0;
|
|
for (const auto &output : data.matches) {
|
|
messageSize += output.serializedSize();
|
|
}
|
|
std::lock_guard<std::mutex> guard(m_poolMutex);
|
|
m_pool->reserve(messageSize);
|
|
Streaming::MessageBuilder builder(m_pool);
|
|
for (const auto &output : data.matches) {
|
|
output.serialize(builder);
|
|
}
|
|
builder.add(Api::AddressMonitor::TxId, first.createHash());
|
|
builder.add(Api::AddressMonitor::TransactionData, duplicate.data());
|
|
rem[i->first]->connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::DoubleSpendFound));
|
|
}
|
|
}
|
|
|
|
void AddressMonitorService::doubleSpendFound(const Tx &txInMempool, const DoubleSpendProof &proof)
|
|
{
|
|
logDebug(Log::MonitorService) << "Double spend proof found. TxId:" << txInMempool.createHash();
|
|
const auto rem = remotes();
|
|
std::map<int, Match> matchesPerRemote;
|
|
Tx::Iterator iter(txInMempool);
|
|
if (!match(iter, rem, matchesPerRemote))
|
|
return; // returns false if no listeners
|
|
|
|
CDataStream stream(SER_NETWORK, PROTOCOL_VERSION);
|
|
stream << proof;
|
|
const std::vector<uint8_t> serializedProof(stream.begin(), stream.end());
|
|
|
|
for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) {
|
|
Match &data = i->second;
|
|
int messageSize = 0;
|
|
for (const auto &output : data.matches) {
|
|
messageSize += output.serializedSize() + 1;
|
|
}
|
|
std::lock_guard<std::mutex> guard(m_poolMutex);
|
|
m_pool->reserve(messageSize);
|
|
Streaming::MessageBuilder builder(m_pool);
|
|
for (const auto &output : data.matches) {
|
|
output.serialize(builder);
|
|
}
|
|
builder.add(Api::AddressMonitor::TxId, txInMempool.createHash());
|
|
builder.addByteArray(Api::AddressMonitor::DoubleSpendProofData, &serializedProof[0], serializedProof.size());
|
|
rem[i->first]->connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::DoubleSpendFound));
|
|
}
|
|
}
|
|
|
|
void AddressMonitorService::onIncomingMessage(Remote *remote_, const Message &message, const EndPoint &ep)
|
|
{
|
|
assert(dynamic_cast<RemoteWithKeys*>(remote_));
|
|
RemoteWithKeys *remote = static_cast<RemoteWithKeys*>(remote_);
|
|
|
|
if (message.messageId() == Api::AddressMonitor::Subscribe
|
|
|| message.messageId() == Api::AddressMonitor::Unsubscribe) {
|
|
Streaming::MessageParser parser(message.body());
|
|
|
|
std::string error;
|
|
int done = 0;
|
|
while (parser.next() == Streaming::FoundTag) {
|
|
if (parser.tag() == Api::AddressMonitor::BitcoinScriptHashed) {
|
|
if (parser.isByteArray() && parser.dataLength() == 32) {
|
|
uint256 hash = parser.uint256Data();
|
|
|
|
++done;
|
|
if (message.messageId() == Api::AddressMonitor::Subscribe) {
|
|
if (m_maxAddressesPerConnection > 0 && static_cast<int>(remote->hashes.size()) + 1 >= m_maxAddressesPerConnection) {
|
|
logInfo(Log::MonitorService) << "Remote" << ep.connectionId << "hit limit of registrations";
|
|
error = "Maximum number of addresses registered for watching, ask your node operator to change limits";
|
|
break;
|
|
}
|
|
remote->hashes.insert(hash);
|
|
remote->connection.postOnStrand(std::bind(&AddressMonitorService::findTxInMempool,
|
|
this, remote->connection.connectionId(), hash));
|
|
} else {
|
|
remote->hashes.erase(hash);
|
|
}
|
|
}
|
|
else {
|
|
error = "BitcoinScriptHashed has to be a sha256 (bytearray of 32 bytes)";
|
|
}
|
|
}
|
|
}
|
|
if (error.empty() && !done)
|
|
error = "Missing required field BitcoinScriptHashed (2)";
|
|
|
|
auto pool = Streaming::pool(10 + error.size());
|
|
Streaming::MessageBuilder builder(pool);
|
|
builder.add(Api::AddressMonitor::Result, done);
|
|
if (message.messageId() == Api::AddressMonitor::Subscribe)
|
|
logInfo(Log::MonitorService) << "Remote" << ep.connectionId << "made" << done << "changes. Hashes count:"
|
|
<< remote->hashes.size();
|
|
if (!error.empty())
|
|
builder.add(Api::AddressMonitor::ErrorMessage, error);
|
|
remote->connection.send(builder.reply(message));
|
|
updateBools();
|
|
}
|
|
}
|
|
|
|
|
|
void AddressMonitorService::updateBools()
|
|
{
|
|
m_findByHash = false;
|
|
for (auto remote : remotes()) {
|
|
RemoteWithKeys *rwk = static_cast<RemoteWithKeys*>(remote);
|
|
m_findByHash = m_findByHash || !rwk->hashes.empty();
|
|
}
|
|
}
|
|
|
|
void AddressMonitorService::findTxInMempool(int connectionId, const uint256 &hash)
|
|
{
|
|
if (m_mempool == nullptr)
|
|
return;
|
|
if (manager() == nullptr)
|
|
return;
|
|
|
|
auto connection = manager()->connection(manager()->endPoint(connectionId), NetworkManager::OnlyExisting);
|
|
if (!connection.isValid() || !connection.isConnected())
|
|
return;
|
|
|
|
LOCK(m_mempool->cs);
|
|
for (auto iter = m_mempool->mapTx.begin(); iter != m_mempool->mapTx.end(); ++iter) {
|
|
Tx::Iterator txIter(iter->tx);
|
|
auto type = txIter.next();
|
|
|
|
MatchedOutput output;
|
|
int index = 0;
|
|
while (type != Tx::End) {
|
|
if (type == Tx::OutputValue) {
|
|
output = MatchedOutput();
|
|
output.index = index;
|
|
output.amount = txIter.longData();
|
|
}
|
|
else if (type == Tx::CashTokenBitfield) {
|
|
uint8_t tokenbitfield = txIter.bitfieldData();
|
|
output.tokenIsFT = (tokenbitfield & 0x10) == 0x10;
|
|
output.tokenIsNFT = (tokenbitfield & 0x20) == 0x20;
|
|
output.tokenIsImmutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x00;
|
|
output.tokenIsMutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x01;
|
|
output.tokenIsMinting = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x02;
|
|
}
|
|
else if (type == Tx::CashTokenCategory) {
|
|
output.tokenCategory = txIter.byteData();
|
|
}
|
|
else if (type == Tx::CashTokenCommitment) {
|
|
output.tokenCommitment = txIter.byteData();
|
|
}
|
|
else if (type == Tx::CashTokenAmount) {
|
|
output.tokenAmount = txIter.longData();
|
|
}
|
|
else if (type == Tx::OutputScript) {
|
|
uint256 hashedOutScript;
|
|
txIter.hashByteData(hashedOutScript);
|
|
|
|
if (txIter.hashedByteData() == hash) {
|
|
logDebug(Log::MonitorService) << " + Sending to peers tx from mempool!";
|
|
output.hashedOutScript = hash; // to serialize it
|
|
std::lock_guard<std::mutex> guard(m_poolMutex);
|
|
m_pool->reserve(output.serializedSize() + 40);
|
|
Streaming::MessageBuilder builder(m_pool);
|
|
builder.add(Api::AddressMonitor::TxId, iter->tx.createHash());
|
|
output.serialize(builder);
|
|
Message message = builder.message(Api::AddressMonitorService, Api::AddressMonitor::TransactionFound);
|
|
connection.send(message);
|
|
}
|
|
index++;
|
|
}
|
|
type = txIter.next();
|
|
}
|
|
}
|
|
}
|
|
|
|
int AddressMonitorService::maxAddressesPerConnection() const
|
|
{
|
|
return m_maxAddressesPerConnection;
|
|
}
|
|
|
|
void AddressMonitorService::setMaxAddressesPerConnection(int maxAddressesPerConnection)
|
|
{
|
|
m_maxAddressesPerConnection = maxAddressesPerConnection;
|
|
}
|
|
|
|
int AddressMonitorService::MatchedOutput::serializedSize() const
|
|
{
|
|
int count = 40;
|
|
if (hasToken)
|
|
count += 55 + tokenCommitment.size();
|
|
return count;
|
|
}
|
|
|
|
void AddressMonitorService::MatchedOutput::serialize(Streaming::MessageBuilder &builder) const
|
|
{
|
|
builder.add(Api::AddressMonitor::BitcoinScriptHashed, hashedOutScript); // 22 bytes
|
|
builder.add(Api::AddressMonitor::Amount, amount); // up to 10 bytes
|
|
builder.add(Api::AddressMonitor::Tx_Out_Index, index); //up to 6 bytes
|
|
builder.add(Api::AddressMonitor::Tx_Out_CT_IsFT, tokenIsFT); //1 byte
|
|
builder.add(Api::AddressMonitor::Tx_Out_CT_IsNFT, tokenIsNFT); //1 byte
|
|
if (hasToken) {
|
|
assert(tokenCategory.size() == 32);
|
|
builder.add(Api::AddressMonitor::Tx_Out_CT_Category, tokenCategory); // 35 bytes
|
|
builder.add(Api::AddressMonitor::Tx_Out_CT_IsMutable, tokenIsMutable); //1 byte
|
|
builder.add(Api::AddressMonitor::Tx_Out_CT_IsImmutable, tokenIsImmutable); //1 byte
|
|
builder.add(Api::AddressMonitor::Tx_Out_CT_IsMinting, tokenIsMinting); //2 bytes
|
|
if (!tokenCommitment.isValid())
|
|
builder.add(Api::AddressMonitor::Tx_Out_CT_Commitment, tokenCommitment); // 4 + commitment length (assuming 2 bytes for the length indicator (14 bits) should be enough
|
|
builder.add(Api::AddressMonitor::Tx_Out_CT_Amount, tokenAmount); // up to 10 bytes
|
|
}
|
|
}
|