/* * This file is part of the Flowee project * Copyright (C) 2016, 2019-2024 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "testNWM.h" #include "streaming/BufferPools.h" #include #include #include #include #include #include #include namespace { void writeLE32ForTest(char *dest, uint32_t value) { dest[0] = static_cast(value & 0xFF); dest[1] = static_cast((value >> 8) & 0xFF); dest[2] = static_cast((value >> 16) & 0xFF); dest[3] = static_cast((value >> 24) & 0xFF); } QByteArray legacyPacket(const std::array &magic, const char *command, const QByteArray &body, bool validChecksum) { QByteArray packet(24 + body.size(), 0); memcpy(packet.data(), magic.data(), magic.size()); strncpy(packet.data() + 4, command, 12); writeLE32ForTest(packet.data() + 16, static_cast(body.size())); const uint256 hash = Hash(body.constData(), body.constData() + body.size()); uint32_t checksum = 0; memcpy(&checksum, &hash, sizeof(checksum)); if (!validChecksum) checksum ^= 0x01020304; writeLE32ForTest(packet.data() + 20, checksum); memcpy(packet.data() + 24, body.constData(), static_cast(body.size())); return packet; } } TestNWM::TestNWM() { srand(time(nullptr)); } void TestNWM::testBigMessage() { auto localhost = boost::asio::ip::address_v4::loopback(); const int port = std::max(1100, rand() % 32000); std::list stash; int messageSize = -1; WorkerThreads threads(5); NetworkManager server(threads.ioContext()); server.bind(boost::asio::ip::tcp::endpoint(localhost, port), [&stash, &messageSize](NetworkConnection &connection) { connection.setOnIncomingMessage([&messageSize](const Message &message) { messageSize = message.body().size(); }); connection.accept(); stash.push_back(std::move(connection)); }); NetworkManager client(threads.ioContext()); EndPoint ep; ep.announcePort = port; ep.ipAddress = localhost; auto con = client.connection(ep); con.connect(); const int BigSize = 500000; Streaming::BufferPool pool(BigSize); for (int i =0; i < BigSize; ++i) { pool.data()[i] = 0xFF & i; } Message message(pool.commit(BigSize), 1); con.send(message); /* * This big message should be split into lots of messages but only * one should arrive at the other end. */ QTRY_COMPARE(messageSize, BigSize); } void TestNWM::testRingBuffer() { RingBuffer buf(2000); QCOMPARE(buf.reserved(), 2000); // this makes sure the tests follows the implementation QCOMPARE(buf.isEmpty(), true); QCOMPARE(buf.count(), 0); QCOMPARE(buf.hasItemsMarkedRead(), false); QCOMPARE(buf.hasUnread(), false); for (int i = 0; i < 250; ++i) { buf.append(i); } QCOMPARE(buf.hasItemsMarkedRead(), false); QCOMPARE(buf.hasUnread(), true); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 250); QCOMPARE(buf.tip(), 0); QCOMPARE(buf.unreadTip(), 0); buf.markRead(10); QCOMPARE(buf.hasItemsMarkedRead(), true); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 250); QCOMPARE(buf.tip(), 0); QCOMPARE(buf.hasUnread(), true); QCOMPARE(buf.unreadTip(), 10); buf.markAllUnread(); QCOMPARE(buf.hasItemsMarkedRead(), false); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 250); QCOMPARE(buf.tip(), 0); QCOMPARE(buf.hasUnread(), true); QCOMPARE(buf.unreadTip(), 0); buf.markRead(249); QCOMPARE(buf.hasItemsMarkedRead(), true); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 250); QCOMPARE(buf.tip(), 0); QCOMPARE(buf.hasUnread(), true); QCOMPARE(buf.unreadTip(), 249); buf.markRead(1); QCOMPARE(buf.hasItemsMarkedRead(), true); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 250); QCOMPARE(buf.tip(), 0); QCOMPARE(buf.hasUnread(), false); // don't call unreadTip when hasUnread returns falls. It will assert. // remove 200 of the 250 items for (int i = 0; i < 200; ++i) { QCOMPARE(buf.hasItemsMarkedRead(), true); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 250 - i); QCOMPARE(buf.tip(), i); QCOMPARE(buf.hasUnread(), false); buf.removeTip(); } // add 900 items so we now have 950 items wrapping around the buffer. for (int i = 0; i < 900; ++i) { QCOMPARE(buf.hasItemsMarkedRead(), true); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 50 + i); QCOMPARE(buf.tip(), 200); QCOMPARE(buf.hasUnread(), i != 0); if (i > 0) QCOMPARE(buf.unreadTip(), 1000); buf.append(1000 + i); } buf.markRead(800); // move to absolute pos 50, relative pos 850. Value 1800 QCOMPARE(buf.hasItemsMarkedRead(), true); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 950); QCOMPARE(buf.tip(), 200); QCOMPARE(buf.hasUnread(), true); QCOMPARE(buf.unreadTip(), 1800); // remove the first 50 items we added. // this means we have 900 items with value 1000 - 1900 and the read pos is at value 1800 for (int i = 0; i < 50; ++i) { QCOMPARE(buf.hasItemsMarkedRead(), true); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 950 - i); QCOMPARE(buf.tip(), 200 + i); QCOMPARE(buf.hasUnread(), true); QCOMPARE(buf.unreadTip(), 1800); buf.removeTip(); } // remove all other items we added. for (int i = 0; i < 900; ++i) { QCOMPARE(buf.hasItemsMarkedRead(), i < 800); QCOMPARE(buf.isEmpty(), false); QCOMPARE(buf.count(), 900 - i); QCOMPARE(buf.tip(), 1000 + i); QCOMPARE(buf.hasUnread(), true); QCOMPARE(buf.unreadTip(), std::max(1800, 1000 + i)); buf.removeTip(); } // its empty now QCOMPARE(buf.hasItemsMarkedRead(), false); QCOMPARE(buf.isEmpty(), true); QCOMPARE(buf.count(), 0); QCOMPARE(buf.hasUnread(), false); RingBuffer wrap(3); QCOMPARE(wrap.reserved(), 3); QCOMPARE(wrap.slotsAvailable(), 3); QCOMPARE(wrap.isFull(), false); wrap.append(1); wrap.append(2); wrap.append(3); QCOMPARE(wrap.count(), 3); QCOMPARE(wrap.slotsAvailable(), 0); QCOMPARE(wrap.isFull(), true); wrap.markRead(1); wrap.removeTip(); QCOMPARE(wrap.count(), 2); QCOMPARE(wrap.slotsAvailable(), 1); QCOMPARE(wrap.isFull(), false); QCOMPARE(wrap.tip(), 2); wrap.append(4); QCOMPARE(wrap.count(), 3); QCOMPARE(wrap.slotsAvailable(), 0); QCOMPARE(wrap.isFull(), true); QCOMPARE(wrap.tip(), 2); } void TestNWM::testHeaderInt() { auto localhost = boost::asio::ip::address_v4::loopback(); const int port = std::max(1100, rand() % 32000); QMutex writeLock; std::map headerMap; WorkerThreads threads; NetworkManager server(threads.ioContext()); std::list stash; server.bind(boost::asio::ip::tcp::endpoint(localhost, port), [&stash, &headerMap, &writeLock](NetworkConnection &connection) { connection.setOnIncomingMessage([&headerMap, &writeLock](const Message &message) { QMutexLocker l(&writeLock); headerMap = message.headerData(); }); connection.accept(); stash.push_back(std::move(connection)); }); NetworkManager client(threads.ioContext()); auto con = client.connection(EndPoint(localhost, port)); const int MessageSize = 20000; Streaming::BufferPool pool(MessageSize); for (int i =0; i < MessageSize; ++i) { pool.data()[i] = 0xFF & i; } Message message(pool.commit(MessageSize), 1); message.setHeaderInt(11, 312); message.setHeaderInt(233, 12521); message.setHeaderInt(1111, 1112); con.send(message); QCOMPARE((int) message.headerData().size(), 5); // 3 from above and the service/message ids QTRY_COMPARE(message.headerData(), headerMap); } void TestNWM::testChunkReadQueue() { /* * The NWM does flow control using the outgoing-message-queue size * This means we might end up pausing processing of incoming traffic in order to * wait for the outgoing data to be sent. * * Lets test that we still manage to send everything. * * The way to test this is simply that when we get 10 incoming messages, which generate * 1000 outgoing messages, then we expect the NWM to stop processing the incoming and * push a 'send' in between. */ auto localhost = boost::asio::ip::address_v4::loopback(); const int port = std::max(1100, rand() % 32000); std::list connections; WorkerThreads threads; NetworkManager receiver(threads.ioContext()); receiver.bind(boost::asio::ip::tcp::endpoint(localhost, port), [&connections](NetworkConnection &connection) { connection.setMessageQueueSizes(1000, 1000); connections.push_back(std::move(connection)); NetworkConnection *con = &connections.back(); con->setOnIncomingMessage([con](const Message &message) { // first send a high prio, those are useful to measure the chunk-size. con->send(Message(1, 1), NetworkConnection::HighPriority); // for each incoming connection we send 100. for (int i = 0; i < 100; ++i) { con->send(Message(message.serviceId(), message.messageId() + 1)); } }); con->accept(); }); NetworkManager sender(threads.ioContext()); EndPoint ep; ep.announcePort = port; ep.ipAddress = localhost; auto con = sender.connection(ep); con.setMessageQueueSizes(1000, 1000); struct ReplyParser { int plainMessageCount = 0; int prioMessageCount = 0; bool ok = false; void replyReceived(const Message &message) { if (message.serviceId() == 1) ++prioMessageCount; else ++plainMessageCount; if (!ok && plainMessageCount > 300 && prioMessageCount < 5) { // Prio messages are sent every flush of the other side, so this is how // we know that the replies have been chunked. We get the first 4 batches // (4 + 400 messages) in one go and then we get another such batch and // a last batch to finish the (10 + 1000) messages count. // If there was no chunking we'd have gotten all prio messages // in one (or nothing at all). ok = true; } } }; ReplyParser parser; con.setOnIncomingMessage(std::bind(&ReplyParser::replyReceived, &parser, std::placeholders::_1)); con.connect(); // we send 10 messages from sender to receiver. for (int i = 0; i < 10; ++i) { con.send(Message(10, 5)); } QTRY_COMPARE(parser.ok, true); } void TestNWM::testAsyncSendQueueFullDoesNotTerminate() { const auto localhost = boost::asio::ip::address_v4::loopback(); const int port = std::max(1100, rand() % 32000); WorkerThreads threads(2); NetworkManager client(threads.ioContext()); auto con = client.connection(EndPoint(localhost, port)); con.setMessageQueueSizes(11, 3); QTest::qWait(50); // Let the queue-size update reach the connection strand. Message message(1, 1); for (int i = 0; i < 100; ++i) con.send(message); QTest::qWait(250); // Queued sends must be dropped, not escape the IO thread. QVERIFY(true); } void TestNWM::testNativeEnvelopeMinimumLength() { const auto localhost = boost::asio::ip::address_v4::loopback(); const int port = std::max(1100, rand() % 32000); std::list stash; QAtomicInt delivered(0); QAtomicInt disconnected(0); WorkerThreads threads(5); NetworkManager server(threads.ioContext()); server.bind(boost::asio::ip::tcp::endpoint(localhost, port), [&stash, &delivered, &disconnected](NetworkConnection &connection) { connection.setOnIncomingMessage([&delivered](const Message &) { delivered.ref(); }); connection.setOnDisconnected([&disconnected](const EndPoint &) { disconnected.ref(); }); connection.accept(); stash.push_back(std::move(connection)); }); boost::asio::io_context clientContext; boost::asio::ip::tcp::socket socket(clientContext); socket.connect(boost::asio::ip::tcp::endpoint(localhost, port)); QByteArray packet(4, 0); packet[0] = 1; // Native packet length includes these two length bytes. packet[2] = 8; // Pass the first-packet native probe to exercise the length check. boost::asio::write(socket, boost::asio::buffer(packet.constData(), packet.size())); socket.close(); QTRY_COMPARE(disconnected.loadAcquire(), 1); QCOMPARE(delivered.loadAcquire(), 0); } void TestNWM::testNativeParserPacketBounds() { const auto localhost = boost::asio::ip::address_v4::loopback(); const int port = std::max(1100, rand() % 32000); std::list stash; QAtomicInt delivered(0); QAtomicInt disconnected(0); WorkerThreads threads(5); NetworkManager server(threads.ioContext()); server.bind(boost::asio::ip::tcp::endpoint(localhost, port), [&stash, &delivered, &disconnected](NetworkConnection &connection) { connection.setOnIncomingMessage([&delivered](const Message &) { delivered.ref(); }); connection.setOnDisconnected([&disconnected](const EndPoint &) { disconnected.ref(); }); connection.accept(); stash.push_back(std::move(connection)); }); boost::asio::io_context clientContext; boost::asio::ip::tcp::socket socket(clientContext); socket.connect(boost::asio::ip::tcp::endpoint(localhost, port)); QByteArray packet; packet.append(char(4)); // Packet length includes these two length bytes. packet.append(char(0)); packet.append(char(8)); // ServiceId tag. packet.append(char(1)); // ServiceId value, but no HeaderEnd in this packet. packet.append(char(4)); // Would look like HeaderEnd if the parser reads past packetLength. packet.append(char(0)); packet.append(char(4)); packet.append(char(0)); boost::asio::write(socket, boost::asio::buffer(packet.constData(), packet.size())); socket.close(); QTRY_COMPARE(disconnected.loadAcquire(), 1); QCOMPARE(delivered.loadAcquire(), 0); } void TestNWM::testLegacyP2PEnvelopeValidation() { const auto localhost = boost::asio::ip::address_v4::loopback(); const std::array expectedMagic{{0xe3, 0xe1, 0xf3, 0xe8}}; const std::array wrongMagic{{0xfa, 0xbf, 0xb5, 0xda}}; auto runCase = [&](const QByteArray &packet, int expectedDelivered) { const int port = std::max(1100, rand() % 32000); QAtomicInt delivered(0); std::list stash; WorkerThreads threads(5); NetworkManager server(threads.ioContext()); server.setMessageIdLookup({{Api::P2P::Ping, "ping"}}); server.setLegacyNetworkId(std::vector(expectedMagic.begin(), expectedMagic.end())); server.bind(boost::asio::ip::tcp::endpoint(localhost, port), [&stash, &delivered](NetworkConnection &connection) { connection.setMessageHeaderLegacy(true); connection.setOnIncomingMessage([&delivered](const Message &) { delivered.ref(); }); connection.accept(); stash.push_back(std::move(connection)); }); boost::asio::io_context clientContext; boost::asio::ip::tcp::socket socket(clientContext); socket.connect(boost::asio::ip::tcp::endpoint(localhost, port)); boost::asio::write(socket, boost::asio::buffer(packet.constData(), packet.size())); socket.close(); if (expectedDelivered > 0) QTRY_COMPARE(delivered.loadAcquire(), expectedDelivered); else QTest::qWait(200); QCOMPARE(delivered.loadAcquire(), expectedDelivered); }; const QByteArray body("12345678", 8); runCase(legacyPacket(expectedMagic, "ping", body, true), 1); runCase(legacyPacket(wrongMagic, "ping", body, true), 0); runCase(legacyPacket(expectedMagic, "ping", body, false), 0); } void TestNWM::testEncrypted() { WorkerThreads threads(5); NetworkManager nw(threads.ioContext()); /* * Certificate creation is easy with openssl: * * # private key * openssl genrsa -out privkey.pem 2048 * # Create a certificate signing request using your new key * openssl req -new -key privkey.pem -out certreq.csr * # Self-sign your CSR with your own private key: * openssl x509 -req -days 3650 -in certreq.csr -signkey privkey.pem -out newcert.pem * * We need a DH temp file. * openssl dhparam -outform PEM -out dh2048.pem 2048 * * In production all 2048s should likely go to 4096 */ auto pool = Streaming::pool(1359); QFile newcert(":/newcert.pem"); QVERIFY(newcert.open(QIODevice::ReadOnly)); QCOMPARE(newcert.size(), 1359); QCOMPARE(newcert.read(pool->begin(), 1359), 1359); auto newCertBuf = pool->commit(1359); QFile privKey(":/privkey.pem"); QVERIFY(privKey.open(QIODevice::ReadOnly)); QCOMPARE(privKey.size(), 1704); QCOMPARE(privKey.read(pool->begin(), 1704), 1704); auto privKeyBuf = pool->commit(1704); auto localhost = boost::asio::ip::address_v4::loopback(); const int port = std::max(1100, rand() % 32000); logFatal() << "Trying to bind on:" << port; QAtomicInt gotIncoming(0); Streaming::ConstBuffer empty; // bind on a random port NetworkConnection serverCon; nw.bindSsl(boost::asio::ip::tcp::endpoint(localhost, port), newCertBuf, privKeyBuf, empty, [&serverCon, &gotIncoming](NetworkConnection &connection) { logFatal() << "Server: Got incoming connection"; connection.setOnIncomingMessage([&gotIncoming](const Message &m) { logFatal() << "Server: Got incoming message"; if (m.serviceId() == 15 && m.messageId() == 1) gotIncoming.ref(); else logFatal() << "Got unrecognized message"; }); connection.accept(); serverCon = std::move(connection); }); Message headerOnlyMessage(15, 1); { // first try to connect without handshake. // this will connect and send data and then get // disconnected by the server because we didn't ssl handshake. QAtomicInt counter(0); NetworkManager client(threads.ioContext()); EndPoint ep; ep.announcePort = port; ep.ipAddress = localhost; QCOMPARE(ep.encrypted, false); // make sure this will fail auto con1 = client.connection(ep); con1.setOnConnected([](const EndPoint&) { logCritical() << "Con 1 connected"; }); con1.setOnDisconnected([&counter](const EndPoint&) { logCritical() << "Con 1 disconnected"; counter.ref(); }); con1.setOnError([](int, const boost::system::error_code&) { QFAIL("No error expected"); }); con1.send(headerOnlyMessage); // just saying its encrypted doesn't help, it will just fail differently. // So this one should never connect but fail in the handshake. Which is practically // the same as con1, accept we never 'connect' ep.encrypted = true; auto con2 = client.connection(ep); QVERIFY(con1.connectionId() != con2.connectionId()); con2.setOnConnected([](const EndPoint&) { QFAIL("should not connect"); }); con2.setOnDisconnected([](const EndPoint&) { QFAIL("Hmm, wait what?"); // if we don't connect then we can't disconnect }); con2.setOnError([&counter](int connectionId, const boost::system::error_code &err) { logDebug() << connectionId << err.message(); counter.ref(); }); con2.send(headerOnlyMessage); QTRY_COMPARE(counter.loadAcquire(), 2); } NetworkManager client(threads.ioContext()); QAtomicInt gotConnect(0); gotIncoming = 0; EndPoint ep; ep.announcePort = port; ep.ipAddress = localhost; ep.encrypted = true; // set the right data. auto con = client.connection(ep); con.setCertificate(newCertBuf); con.setOnConnected([&gotConnect](const EndPoint&) { logFatal() << "yay, connect succeeded!"; gotConnect.ref(); }); bool firstConnect = true; con.setOnDisconnected([&firstConnect](const EndPoint&) { if (firstConnect) QFAIL("Did not expect disconnect"); }); con.setOnError([](int connectionId, const boost::system::error_code &err) { logFatal() << connectionId << err.message(); QFAIL("should not error"); }); con.send(headerOnlyMessage); QTRY_COMPARE(gotIncoming.loadAcquire(), 1); // Reconnect the client and see if a new message comes through, which goes towards having // a proper handshaking logic. firstConnect = false; con.disconnect(); gotIncoming = 0; QTest::qWait(100); con.connect(); // re-activate the connection. con.send(headerOnlyMessage); QTRY_COMPARE(gotIncoming.loadAcquire(), 1); } void TestNWM::basic() { WorkerThreads threads; NetworkManager nw(threads.ioContext()); auto ep = nw.endPoint(199); QVERIFY(ep.hostname.empty()); ep.announcePort = 1212; auto localhost = boost::asio::ip::address_v4::loopback(); ep.ipAddress = localhost; auto con = nw.connection(ep); auto ep2 = nw.endPoint(con.connectionId()); QCOMPARE(ep2.announcePort, 1212); } QTEST_MAIN(TestNWM)