/* * This file is part of the Flowee project * Copyright (C) 2019-2025 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "BlackBoxTest.h" #include #include #include #include #include #include namespace { void writeLogsConf(const QString &nodePath) { QFile logConfFile(nodePath + "logs.conf"); bool ok = logConfFile.open(QIODevice::WriteOnly); Q_ASSERT(ok); QTextStream log(&logConfFile); log << "channel file\noption timestamp time\nALL debug\n2101 quiet\n#3000 quiet\n#3001 info\n"; logConfFile.close(); } } QString BlackBoxTest::s_hubPath = QString(); BlackBoxTest::BlackBoxTest() : m_network(m_workers.ioContext()) { srand(QTime::currentTime().msecsSinceStartOfDay()); if (s_hubPath.isEmpty()) // since Qt6 this points to the testing/api/ dir s_hubPath = QString(QT_TESTCASE_BUILDDIR "/../../hub/hub"); Log::Manager::instance()->clearLogLevels(Log::DebugLevel); } void BlackBoxTest::setHubExecutable(const QString &path) { s_hubPath = path; } void BlackBoxTest::startHubs(int amount, Connect connect) { Q_ASSERT(m_hubs.empty()); Q_ASSERT(amount > 0); Q_ASSERT(int(m_onConnectCallbacks.size()) <= amount); m_onConnectCallbacks.resize(amount); m_hubs.reserve(amount + 1); m_currentTest = QString(QTest::currentTestFunction()); m_baseDir = QDir::tempPath() + QString("/flowee-bbtest-%1").arg(rand()); logDebug() << "Starting hub at" << m_baseDir << "with" << s_hubPath; int port = rand() % 31000 + 1025; for (int i = 0; i < amount; ++i) { Hub hub; hub.proc = new QProcess(this); hub.apiPort = port++; hub.p2pPort = port++; m_hubs.push_back(hub); QString nodePath = m_baseDir + QString("/node%1/regtest/").arg(i); QDir(nodePath).mkpath("."); QFile confFile(nodePath + "flowee.conf"); bool ok = confFile.open(QIODevice::WriteOnly); Q_ASSERT(ok); QTextStream conf(&confFile); conf << "port=" << hub.p2pPort << "\n" "listenonion=0\n" "api=true\n" "server=false\n" "regtest=true\n" "apibind=127.0.0.1:" << hub.apiPort << "\n" "discover=false\n"; if (connect == ConnectHubs && i > 0) conf << "addnode=127.0.0.1:" << hub.p2pPort - 2 << "\n\n"; confFile.close(); writeLogsConf(nodePath); hub.proc->setProgram(s_hubPath); hub.proc->setWorkingDirectory(nodePath); hub.proc->setArguments(QStringList() << "-conf=" + nodePath + "flowee.conf" <<"-datadir=" + m_baseDir + QString("/node%1").arg(i)); logCritical() << "Starting hub" << hub.proc->arguments() << "in" << nodePath; hub.proc->start(QProcess::ReadOnly); con.push_back(m_network.connection( EndPoint(boost::asio::ip::address_v4::loopback(), hub.apiPort))); con.back().setOnIncomingMessage(std::bind(&BlackBoxTest::Hub::addMessage, &m_hubs.back(), std::placeholders::_1)); if (m_onConnectCallbacks.at(i)) con.back().setOnConnected(m_onConnectCallbacks.at(i)); MilliSleep(500); // Assuming that the hub takes half a second is better than assuming it doesn't and hitting the reconnect-time. } for (int i = 0; i < amount; ++i) { con.at(i).connect(); } logDebug() << "Hubs started"; } void BlackBoxTest::feedDefaultBlocksToHub(int hubIndex) { Q_ASSERT(m_hubs.size() > hubIndex); Hub &target = m_hubs[hubIndex]; Q_ASSERT(target.proc); logDebug().nospace() << "Starting new hub with pre-prepared chain: node" << m_hubs.size(); QString nodePath = m_baseDir + QString("/node%1/").arg(m_hubs.size()); QDir(nodePath).mkpath("regtest/blocks"); writeLogsConf(nodePath + "/regtest/"); QFile blk(":/blk00000.dat"); bool ok = blk.open(QIODevice::ReadOnly); Q_ASSERT(ok); QString destPath(nodePath + "regtest/blocks/blk00000.dat"); ok = blk.copy(destPath); Q_ASSERT(ok); blk.close(); QFile destFile(destPath); destFile.setPermissions((QFileDevice::Permissions)0x6666); { QProcess proc1; proc1.setProgram(s_hubPath); auto args = QStringList() << "-api=false" << "-server=false" << "-regtest" << "-listen=false" << "-datadir=." << "-reindex" << "-stopafterblockimport"; proc1.setArguments(args); logCritical() << "feedBlocks starting with:" << args; proc1.setWorkingDirectory(nodePath); proc1.start(QProcess::ReadOnly); proc1.waitForFinished(); } logDebug() << "Reindex finished, restarting feed hub to provide the chain to node" << hubIndex; Hub hub; hub.p2pPort = 0; hub.apiPort = 0; hub.proc = new QProcess(this); hub.proc->setProgram(s_hubPath); auto args = QStringList() << "-api=false" << "-server=false" << "-regtest" << "-datadir=." << QString("-connect=127.0.0.1:%1").arg(target.p2pPort); hub.proc->setArguments(args); logCritical() << "feedBlocks restarting with" << args; hub.proc->setWorkingDirectory(nodePath); hub.proc->start(QProcess::ReadOnly); m_hubs.push_back(hub); // Ask the target hub its block-height and don't continue until it reaches that. NetworkConnection con = m_network.connection( EndPoint(boost::asio::ip::address_v4::loopback(), target.apiPort)); con.setOnIncomingMessage(std::bind(&BlackBoxTest::Hub::addMessage, &hub, std::placeholders::_1)); hub.m_waitForMessageId = Api::BlockChain::GetBlockCountReply; hub.m_waitForServiceId = Api::BlockChainService; hub.m_waitForMessageId2 = -1; for (int wait = 0; wait < 30; ++wait) { struct timespec tim, tim2; tim.tv_sec = 0; hub.messages.clear(); hub.m_foundMessage.storeRelaxed(nullptr); con.send(Message(Api::BlockChainService, Api::BlockChain::GetBlockCount)); while (true) { Message *m = hub.m_foundMessage.loadRelaxed(); if (m) { Streaming::MessageParser p(m->body()); p.next(); if (p.tag() == Api::BlockHeight) { if (p.intData() == 115) { logDebug() << " feed done, shutting down helper hub"; auto pid = hub.proc->processId(); if (pid > 0) kill(pid, SIGTERM); // politely tell the Hub to terminate return; } logDebug() << " hub" << hubIndex << "is at height:" << p.intData(); break; } } tim.tv_nsec = 50000; nanosleep(&tim , &tim2); } tim.tv_sec = 1; // give it another second. nanosleep(&tim , &tim2); } logFatal() << "Failed to feed chain"; // proceed, which will likely fail and we get a nice hub log } Message BlackBoxTest::waitForReply(int hubId, const Message &message, int messageId, int timeout) { return waitForReply(hubId, message, static_cast(message.serviceId()), messageId, timeout); } Message BlackBoxTest::waitForReply(int hubId, const Message &message, Api::ServiceIds serviceId, int messageId, int timeout) { Q_ASSERT(hubId >= 0); Q_ASSERT(hubId < int(m_hubs.size())); QElapsedTimer timer; timer.start(); Hub &hub = m_hubs[hubId]; hub.m_waitForMessageId = messageId; hub.m_waitForServiceId = serviceId; hub.m_waitForMessageId2 = serviceId == message.serviceId() ? message.messageId() : INT_MAX; hub.m_foundMessage.storeRelaxed(nullptr); con[hubId].send(message); while (true) { Message *m = hub.m_foundMessage.loadRelaxed(); if (m) return *m; if (timeout < timer.elapsed()) return Message(); // if need to wait, don't burn CPU struct timespec tim, tim2; tim.tv_sec = 0; tim.tv_nsec = 15000; nanosleep(&tim , &tim2); } } bool BlackBoxTest::waitForHeight(int height) { QSet nodes; for (int i = 0; i < (int) con.size(); ++i) nodes.insert(i); QElapsedTimer timer; timer.start(); while (!nodes.isEmpty() && timer.elapsed() < 30000) { MilliSleep(100); auto copy(nodes); for (auto i : copy) { auto m = waitForReply(i, Message(Api::BlockChainService, Api::BlockChain::GetBlockCount), Api::BlockChain::GetBlockCountReply); if (m.serviceId() == Api::BlockChainService) { // not an error. Streaming::MessageParser p(m); p.next(); if (p.intData() >= height) nodes.remove(i); } } } return nodes.isEmpty(); } void BlackBoxTest::cleanup() { for (size_t i = 0; i < con.size(); ++i) { con[i].disconnect(); } con.clear(); if (m_hubs.empty()) // no hubs started return; // shut down hubs bool allOk = !QTest::currentTestFailed(); for (auto &hub : m_hubs) { auto pid = hub.proc->processId(); Q_ASSERT(pid >= 0); if (pid > 0) kill(pid, SIGTERM); // politely tell the process to terminate else hub.proc->kill(); } for (size_t i = 0; i < m_hubs.size(); ++i) { const auto &hub = m_hubs[i]; hub.proc->waitForFinished(10000); if (hub.proc->state() != QProcess::NotRunning) { allOk = false; logFatal() << m_currentTest << "Remote hub" << i << "didn't quit after 10 sec. Killing"; hub.proc->kill(); } else if (hub.proc->exitCode() != 0) { allOk = false; logFatal() << m_currentTest << "Remote hub" << i << "didn't exit cleanly. Exit code:" << hub.proc->exitCode(); } else if (hub.proc->exitStatus() != QProcess::NormalExit) { allOk = false; logFatal() << m_currentTest << "Remote hub" << i << "crashed"; } hub.proc->deleteLater(); } if (allOk) { QDir(m_baseDir).removeRecursively(); } else { for (size_t i = 0; i < m_hubs.size(); ++i) { QFile log(m_baseDir + QString("/node%1/regtest/hub.log").arg(i)); if (log.open(QIODevice::ReadOnly)) { QTextStream in(&log); while (!in.atEnd()) { logFatal().nospace() << "{HUB" << i << "} " << in.readLine().toLatin1().data(); } } } } m_hubs.clear(); m_currentTest.clear(); m_baseDir.clear(); m_onConnectCallbacks.clear(); } void BlackBoxTest::Hub::addMessage(const Message &message) { logDebug() << "addMessage" << message.serviceId() << message.messageId() << " queue:" << messages.size(); // logFatal() << " wait: " << ((void*)this) << m_waitForServiceId.loadRelaxed() << m_waitForMessageId.loadRelaxed(); messages.push_back(message); if (m_waitForServiceId != -1 && m_waitForMessageId != -1) { if (message.serviceId() == m_waitForServiceId && message.messageId() == m_waitForMessageId) { m_foundMessage.storeRelaxed(&messages.back()); } // also check the failed message as API Service generates. else if (message.serviceId() == Api::APIService && message.messageId() == Api::Meta::CommandFailed) { Streaming::MessageParser parser = Streaming::MessageParser(message.body()); int ok = 0; while (ok < 2 && parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::Meta::FailedCommandId) { if (parser.intData() != m_waitForMessageId2) return; ok++; } else if (parser.tag() == Api::Meta::FailedCommandServiceId) { if (parser.intData() != m_waitForServiceId) return; ok++; } } if (ok == 2) m_foundMessage.storeRelaxed(&messages.back()); } } }