/* * This file is part of the Flowee project * Copyright (C) 2018-2025 Tom Zander * Copyright (C) 2025 John Galt * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "AddressMonitorService.h" // server 'lib' #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include AddressMonitorService::AddressMonitorService() : NetworkService(Api::AddressMonitorService), m_pool(std::make_shared()) { ValidationNotifier().addListener(this); } AddressMonitorService::~AddressMonitorService() { ValidationNotifier().removeListener(this); } void AddressMonitorService::syncTx(const Tx &tx) { auto rem = remotes(); std::map matchesPerRemote; Tx::Iterator iter(tx); if (!match(iter, rem, matchesPerRemote)) return; for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) { const Match &data = i->second; if (!data.matches.empty()) sendMatchesToRemote(rem[i->first], data, tx.createHash(), -1, -1); } } bool AddressMonitorService::match(Tx::Iterator &iter, const std::deque &remotes, std::map &matchingRemotes) const { if (remotes.empty()) return false; auto type = iter.next(); if (type == Tx::End) // then the second end means end of block return false; MatchedOutput output; int index = 0; // output index while (type != Tx::End) { if (type == Tx::OutputValue) { output = MatchedOutput(); output.index = index; output.amount = iter.longData(); } else if (type == Tx::CashTokenBitfield) { uint8_t tokenbitfield = iter.bitfieldData(); output.hasToken = true; output.tokenIsFT = (tokenbitfield & 0x10) == 0x10; output.tokenIsNFT = (tokenbitfield & 0x20) == 0x20; output.tokenIsImmutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x00; output.tokenIsMutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x01; output.tokenIsMinting = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x02; } else if (type == Tx::CashTokenCategory) { output.tokenCategory = iter.byteData(); } else if (type == Tx::CashTokenCommitment) { output.tokenCommitment = iter.byteData(); } else if (type == Tx::CashTokenAmount) { output.tokenAmount = iter.longData(); } else if (type == Tx::OutputScript) { uint256 hashedOutScript; iter.hashByteData(hashedOutScript); for (size_t i = 0; i < remotes.size(); ++i) { assert(i < INT_MAX); RemoteWithKeys *rwk = static_cast(remotes.at(i)); if (rwk->hashes.find(hashedOutScript) != rwk->hashes.end()) { Match &m = matchingRemotes[static_cast(i)]; MatchedOutput copy = output; // copy for this remote copy.hashedOutScript = hashedOutScript; // per-remote tweak is fine on the copy m.matches.push_back(std::move(copy)); } } index++; } type = iter.next(); } return true; } void AddressMonitorService::sendMatchesToRemote(Remote *remote, const Match &data, const uint256 &txid, int blockheight, int64_t offsetInBlock) { const int matchCount = data.matches.size(); int messageSize = 0; for (const auto &output : data.matches) { messageSize += output.serializedSize() + 1; } if (blockheight > 0) messageSize += 10; if (offsetInBlock > 0) messageSize += 10; std::lock_guard guard(m_poolMutex); m_pool->reserve(messageSize); int count = 0; Streaming::MessageBuilder builder(m_pool); builder.add(Api::AddressMonitor::TxId, txid); if (blockheight > 0) builder.add(Api::AddressMonitor::BlockHeight, blockheight); if (offsetInBlock >= 0) builder.add(Api::AddressMonitor::OffsetInBlock, static_cast(offsetInBlock)); for (const auto &output : data.matches) { ++count; output.serialize(builder); if (count != matchCount) builder.add(Api::AddressMonitor::Separator, true); } logDebug(Log::MonitorService) << "Remote gets" << data.matches.size() << "tx notification(s)"; remote->connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::TransactionFound)); } void AddressMonitorService::syncAllTransactionsInBlock(const Block &block, CBlockIndex *index) { assert(index); Tx::Iterator iter(block); auto rem = remotes(); while (true) { std::map matchesPerRemote; if (!match(iter, rem, matchesPerRemote)) return; // end of block // one (or zero) message per transaction for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) { const Match &data = i->second; if (!data.matches.empty()) { auto tx = iter.prevTx(); sendMatchesToRemote(rem[i->first], data, tx.createHash(), index->nHeight, tx.offsetInBlock(block)); } } } } void AddressMonitorService::doubleSpendFound(const Tx &first, const Tx &duplicate) { logDebug(Log::MonitorService) << "Double spend found" << first.createHash() << duplicate.createHash(); const auto rem = remotes(); std::map matchesPerRemote; Tx::Iterator iter(first); if (!match(iter, rem, matchesPerRemote)) return; // returns if no listeners Tx::Iterator iter2(duplicate); bool m = match(iter2, rem, matchesPerRemote); assert(m); // our duplicate tx object should have data for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) { Match &data = i->second; int messageSize = 0; for (const auto &output : data.matches) { messageSize += output.serializedSize(); } std::lock_guard guard(m_poolMutex); m_pool->reserve(messageSize); Streaming::MessageBuilder builder(m_pool); for (const auto &output : data.matches) { output.serialize(builder); } builder.add(Api::AddressMonitor::TxId, first.createHash()); builder.add(Api::AddressMonitor::TransactionData, duplicate.data()); rem[i->first]->connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::DoubleSpendFound)); } } void AddressMonitorService::doubleSpendFound(const Tx &txInMempool, const DoubleSpendProof &proof) { logDebug(Log::MonitorService) << "Double spend proof found. TxId:" << txInMempool.createHash(); const auto rem = remotes(); std::map matchesPerRemote; Tx::Iterator iter(txInMempool); if (!match(iter, rem, matchesPerRemote)) return; // returns false if no listeners CDataStream stream(SER_NETWORK, PROTOCOL_VERSION); stream << proof; const std::vector serializedProof(stream.begin(), stream.end()); for (auto i = matchesPerRemote.begin(); i != matchesPerRemote.end(); ++i) { Match &data = i->second; int messageSize = 0; for (const auto &output : data.matches) { messageSize += output.serializedSize() + 1; } std::lock_guard guard(m_poolMutex); m_pool->reserve(messageSize); Streaming::MessageBuilder builder(m_pool); for (const auto &output : data.matches) { output.serialize(builder); } builder.add(Api::AddressMonitor::TxId, txInMempool.createHash()); builder.addByteArray(Api::AddressMonitor::DoubleSpendProofData, &serializedProof[0], serializedProof.size()); rem[i->first]->connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::DoubleSpendFound)); } } void AddressMonitorService::onIncomingMessage(Remote *remote_, const Message &message, const EndPoint &ep) { assert(dynamic_cast(remote_)); RemoteWithKeys *remote = static_cast(remote_); if (message.messageId() == Api::AddressMonitor::Subscribe || message.messageId() == Api::AddressMonitor::Unsubscribe) { Streaming::MessageParser parser(message.body()); std::string error; int done = 0; while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::AddressMonitor::BitcoinScriptHashed) { if (parser.isByteArray() && parser.dataLength() == 32) { uint256 hash = parser.uint256Data(); ++done; if (message.messageId() == Api::AddressMonitor::Subscribe) { if (m_maxAddressesPerConnection > 0 && static_cast(remote->hashes.size()) + 1 >= m_maxAddressesPerConnection) { logInfo(Log::MonitorService) << "Remote" << ep.connectionId << "hit limit of registrations"; error = "Maximum number of addresses registered for watching, ask your node operator to change limits"; break; } remote->hashes.insert(hash); remote->connection.postOnStrand(std::bind(&AddressMonitorService::findTxInMempool, this, remote->connection.connectionId(), hash)); } else { remote->hashes.erase(hash); } } else { error = "BitcoinScriptHashed has to be a sha256 (bytearray of 32 bytes)"; } } } if (error.empty() && !done) error = "Missing required field BitcoinScriptHashed (2)"; auto pool = Streaming::pool(10 + error.size()); Streaming::MessageBuilder builder(pool); builder.add(Api::AddressMonitor::Result, done); if (message.messageId() == Api::AddressMonitor::Subscribe) logInfo(Log::MonitorService) << "Remote" << ep.connectionId << "made" << done << "changes. Hashes count:" << remote->hashes.size(); if (!error.empty()) builder.add(Api::AddressMonitor::ErrorMessage, error); remote->connection.send(builder.reply(message)); updateBools(); } } void AddressMonitorService::updateBools() { m_findByHash = false; for (auto remote : remotes()) { RemoteWithKeys *rwk = static_cast(remote); m_findByHash = m_findByHash || !rwk->hashes.empty(); } } void AddressMonitorService::findTxInMempool(int connectionId, const uint256 &hash) { if (m_mempool == nullptr) return; if (manager() == nullptr) return; auto connection = manager()->connection(manager()->endPoint(connectionId), NetworkManager::OnlyExisting); if (!connection.isValid() || !connection.isConnected()) return; LOCK(m_mempool->cs); for (auto iter = m_mempool->mapTx.begin(); iter != m_mempool->mapTx.end(); ++iter) { Tx::Iterator txIter(iter->tx); auto type = txIter.next(); MatchedOutput output; int index = 0; while (type != Tx::End) { if (type == Tx::OutputValue) { output = MatchedOutput(); output.index = index; output.amount = txIter.longData(); } else if (type == Tx::CashTokenBitfield) { uint8_t tokenbitfield = txIter.bitfieldData(); output.tokenIsFT = (tokenbitfield & 0x10) == 0x10; output.tokenIsNFT = (tokenbitfield & 0x20) == 0x20; output.tokenIsImmutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x00; output.tokenIsMutable = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x01; output.tokenIsMinting = output.tokenIsNFT && (tokenbitfield & 0x0f) == 0x02; } else if (type == Tx::CashTokenCategory) { output.tokenCategory = txIter.byteData(); } else if (type == Tx::CashTokenCommitment) { output.tokenCommitment = txIter.byteData(); } else if (type == Tx::CashTokenAmount) { output.tokenAmount = txIter.longData(); } else if (type == Tx::OutputScript) { uint256 hashedOutScript; txIter.hashByteData(hashedOutScript); if (txIter.hashedByteData() == hash) { logDebug(Log::MonitorService) << " + Sending to peers tx from mempool!"; output.hashedOutScript = hash; // to serialize it std::lock_guard guard(m_poolMutex); m_pool->reserve(output.serializedSize() + 40); Streaming::MessageBuilder builder(m_pool); builder.add(Api::AddressMonitor::TxId, iter->tx.createHash()); output.serialize(builder); Message message = builder.message(Api::AddressMonitorService, Api::AddressMonitor::TransactionFound); connection.send(message); } index++; } type = txIter.next(); } } } int AddressMonitorService::maxAddressesPerConnection() const { return m_maxAddressesPerConnection; } void AddressMonitorService::setMaxAddressesPerConnection(int maxAddressesPerConnection) { m_maxAddressesPerConnection = maxAddressesPerConnection; } int AddressMonitorService::MatchedOutput::serializedSize() const { int count = 40; if (hasToken) count += 55 + tokenCommitment.size(); return count; } void AddressMonitorService::MatchedOutput::serialize(Streaming::MessageBuilder &builder) const { builder.add(Api::AddressMonitor::BitcoinScriptHashed, hashedOutScript); // 22 bytes builder.add(Api::AddressMonitor::Amount, amount); // up to 10 bytes builder.add(Api::AddressMonitor::Tx_Out_Index, index); //up to 6 bytes builder.add(Api::AddressMonitor::Tx_Out_CT_IsFT, tokenIsFT); //1 byte builder.add(Api::AddressMonitor::Tx_Out_CT_IsNFT, tokenIsNFT); //1 byte if (hasToken) { assert(tokenCategory.size() == 32); builder.add(Api::AddressMonitor::Tx_Out_CT_Category, tokenCategory); // 35 bytes builder.add(Api::AddressMonitor::Tx_Out_CT_IsMutable, tokenIsMutable); //1 byte builder.add(Api::AddressMonitor::Tx_Out_CT_IsImmutable, tokenIsImmutable); //1 byte builder.add(Api::AddressMonitor::Tx_Out_CT_IsMinting, tokenIsMinting); //2 bytes if (!tokenCommitment.isValid()) builder.add(Api::AddressMonitor::Tx_Out_CT_Commitment, tokenCommitment); // 4 + commitment length (assuming 2 bytes for the length indicator (14 bits) should be enough builder.add(Api::AddressMonitor::Tx_Out_CT_Amount, tokenAmount); // up to 10 bytes } }