/* * This file is part of the Flowee project * Copyright (C) 2019-2021 Tom Zander * Copyright (C) 2025 John Galt * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "AddressIndexer.h" #include "Indexer.h" #include #include #include #include #include #include #include #include #include #include #include #include namespace { QString valueFromSettings(const QSettings &settings, const QString &key, const QString &defvalue=QString()) { QVariant x = settings.value(QString("addressdb/") + key); if (x.isNull()) x = settings.value(key); if (x.isNull()) return defvalue; return x.toString(); } QString addressTable(int index) { return QString("AddressUsage%1").arg(index, 2, 10, QChar('_')); } } class TableSpecification { public: inline virtual ~TableSpecification() {} virtual bool queryTableExists(QSqlQuery &query, const QString &tableName) const { return query.exec("select count(*) from " + tableName); } // we only create one index type, so this API is assuming a lot. virtual bool createIndexIfNotExists(QSqlQuery &query, const QString &tableName) const { QString createIndexString("create index %1_index on %1 (address_row)"); return query.exec(createIndexString.arg(tableName)); } }; class PostgresTables : public TableSpecification { public: bool queryTableExists(QSqlQuery &query, const QString &tableName) const override { bool ok = query.exec("select exists (select 1 from pg_tables where tablename='" + tableName.toLower() + "' and schemaname='public')"); if (!ok) return false; query.next(); return query.value(0).toInt() == 1; } bool createIndexIfNotExists(QSqlQuery &query, const QString &tableName) const override { QString createIndexString("CREATE INDEX IF NOT EXISTS %1_index ON %1 (address_row)"); return query.exec(createIndexString.arg(tableName.toLower())); } }; class MySQLTables : public TableSpecification { public: bool queryTableExists(QSqlQuery &query, const QString &tableName) const override { query.exec("SELECT COUNT(table_name) FROM information_schema.tables WHERE table_schema=DATABASE() AND table_name='"+tableName+"'"); query.first(); return query.value(0).toInt() == 1; } bool createIndexIfNotExists(QSqlQuery &query, const QString &tableName) const override { QString createIndexString("CREATE INDEX %1_index ON %1 (address_row)"); return query.exec(createIndexString.arg(tableName.toLower())); } }; AddressIndexer::AddressIndexer(const boost::filesystem::path &basedir, Indexer *datasource) : m_addresses(basedir), m_basedir(QString::fromStdWString(basedir.wstring())), m_dataSource(datasource), m_flushRequested(0), m_postgresCsv(m_basedir + "/postgres.csv") { assert(!m_basedir.isEmpty()); } AddressIndexer::~AddressIndexer() { delete m_spec; } void AddressIndexer::loadSetting(const QSettings &settings) { auto db = valueFromSettings(settings, "db_driver"); m_insertDb = QSqlDatabase::addDatabase(db, "insertConnection"); if (!m_insertDb.isValid()) { if (QSqlDatabase::drivers().contains(db)) { logFatal().nospace() << "Failed to open a databse (" << db << "), missing libs?"; } else { logFatal() << "The configured database is not known. Installed drivers are:"; auto drivers = QSqlDatabase::drivers(); logFatal() << std::list(drivers.begin(), drivers.end()); } logCritical() << "Error reported:" << m_insertDb.lastError().text(); throw std::runtime_error("Failed to read database"); } delete m_spec; m_spec = nullptr; m_selectDb = QSqlDatabase::addDatabase(db, "selectConnection"); logCritical().nospace() << "AddressIndexer database(" << db << ") " << valueFromSettings(settings, "db_username") << "@" << valueFromSettings(settings, "db_hostname") << " DB: " << valueFromSettings(settings, "db_database"); if (db == "QPSQL") { m_insertDb.setDatabaseName(valueFromSettings(settings, "db_database")); m_insertDb.setUserName(valueFromSettings(settings, "db_username")); m_insertDb.setPassword(valueFromSettings(settings, "db_password")); m_insertDb.setHostName(valueFromSettings(settings, "db_hostname")); m_insertDb.setPort(valueFromSettings(settings, "db_port", "5432").toInt()); m_selectDb.setDatabaseName(valueFromSettings(settings, "db_database")); m_selectDb.setUserName(valueFromSettings(settings, "db_username")); m_selectDb.setPassword(valueFromSettings(settings, "db_password")); m_selectDb.setHostName(valueFromSettings(settings, "db_hostname")); m_selectDb.setPort(valueFromSettings(settings, "db_port", "5432").toInt()); m_spec = new PostgresTables(); } else if (db == "QMYSQL") { m_insertDb.setDatabaseName(valueFromSettings(settings, "db_database")); m_insertDb.setUserName(valueFromSettings(settings, "db_username")); m_insertDb.setPassword(valueFromSettings(settings, "db_password")); m_insertDb.setHostName(valueFromSettings(settings, "db_hostname")); m_insertDb.setPort(valueFromSettings(settings, "db_port", "3306").toInt()); m_selectDb.setDatabaseName(valueFromSettings(settings, "db_database")); m_selectDb.setUserName(valueFromSettings(settings, "db_username")); m_selectDb.setPassword(valueFromSettings(settings, "db_password")); m_selectDb.setHostName(valueFromSettings(settings, "db_hostname")); m_selectDb.setPort(valueFromSettings(settings, "db_port", "3306").toInt()); m_spec = new MySQLTables(); } else if (db == "QSQLITE") { m_insertDb.setDatabaseName(m_basedir + "/addresses.db"); m_selectDb.setDatabaseName(m_basedir + "/addresses.db"); } if (!m_selectDb.isValid()) { logFatal() << "Failed opening the database-connection" << m_insertDb.lastError().text(); throw std::runtime_error("Failed to open database connection"); } } int AddressIndexer::blockheight() { return m_height; } void AddressIndexer::blockFinished(int blockheight) { assert(blockheight > m_height); m_height = blockheight; if (++m_uncommittedCount > 150000) { commitAllData(); m_uncommittedCount = 0; } } void AddressIndexer::insert(const Streaming::ConstBuffer &outScriptHashed, int outputIndex, int blockHeight, int offsetInBlock) { assert(outScriptHashed.size() == 32); // a sha256 assert(QThread::currentThread() == this); const uint256 *address = reinterpret_cast(outScriptHashed.begin()); auto result = m_addresses.lookup(*address); if (result.db == -1) result = m_addresses.append(*address); assert(result.db >= 0); assert(result.row >= 0); if (result.db >= int(m_uncommittedData.size())) m_uncommittedData.resize(result.db + 1); std::deque &data = m_uncommittedData[result.db]; assert(outputIndex < 0x7FFF); data.push_back({(short) outputIndex, blockHeight, result.row, offsetInBlock}); ++m_uncommittedCount; } std::vector AddressIndexer::find(const uint256 &address) const { std::vector answer; auto result = m_addresses.lookup(address); if (result.db == -1) return answer; QSqlQuery query(m_selectDb); const QString select = QString("select DISTINCT offset_in_block, block_height, out_index " "FROM AddressUsage%1 " "WHERE address_row=:row " "ORDER BY block_height DESC").arg(result.db, 2, 10, QChar('_')); query.prepare(select); query.bindValue(":row", result.row); if (!query.exec()) { logFatal() << "Failed to select" << query.lastError().text(); logDebug() << "Failed with" << select; QCoreApplication::exit(1); } const int size = query.size(); if (size > 0) answer.reserve(size); while (query.next()) { TxData txData; txData.offsetInBlock = query.value(0).toInt(); txData.blockHeight = query.value(1).toInt(); txData.outputIndex = static_cast(query.value(2).toInt()); answer.push_back(txData); } return answer; } void AddressIndexer::createTables() { if (m_spec == nullptr) m_spec = new TableSpecification(); // generic one. /* Tables * AddressUsage_N * address_row INTEGER (the row that the hashStorage provided us with) * block_height INTEGER \ * offset_in_block INTEGER /-- together give the transaction * out_index INTEGER * * LastKnownState * blockHeight INTEGER */ QSqlQuery query(m_insertDb); bool doInsert = false; if (!m_spec->queryTableExists(query, "LastKnownState")) { logInfo() << "Creating tables..."; if (!query.exec("create table LastKnownState (blockheight INTEGER)")) { logFatal() << "Failed to create table" << query.lastError().text(); throw std::runtime_error("Failed to create table"); } doInsert = true; } if (doInsert || (query.next() && query.value(0).toInt() < 1)) { if (!query.exec("insert into LastKnownState values (0)")) { logFatal() << "Failed to insert row" << query.lastError().text(); throw std::runtime_error("Failed to insert row"); } if (!query.exec("create table IBD (busy INTEGER)")) { logFatal() << "Failed to create notificatoin table IBD" << query.lastError().text(); throw std::runtime_error("Failed to create table"); } if (!query.exec("insert into IBD values (42)")) { logFatal() << "Failed..."; throw std::runtime_error("Failed..."); } } } void AddressIndexer::run() { // wait for database to come online. while (!isInterruptionRequested()) { assert(m_selectDb.isValid()); if (m_insertDb.open() && m_selectDb.open()) { createTables(); QSqlQuery query(m_selectDb); if (!query.exec("select blockheight from LastKnownState")) { logFatal() << "Failed to select blockHeight" << query.lastError().text(); QCoreApplication::exit(1); return; } query.next(); m_height = query.value(0).toInt(); m_topOfChain = m_spec->queryTableExists(query, "IBD") ? InInitialSync : InitialSyncFinished; break; } else { sleep(3); logCritical() << "Waiting for SQL DB to come online."; } } logCritical() << "Address DB connected. Starting at block-height:" << m_height << (m_topOfChain ? "" : "in initial-sync"); assert(m_dataSource); while (!isInterruptionRequested()) { int tipHeight; Message message = m_dataSource->nextBlock(blockheight() + 1, &tipHeight, m_uncommittedData.empty() ? ULONG_MAX : 20000); if (m_flushRequested.loadRelaxed() == 1) { commitAllData(); m_flushRequested = 0; } if (message.body().size() == 0) // typically true if the flush was requested continue; int txOffsetInBlock = 0; int outputIndex = -1; int blockHeight = -1; Streaming::MessageParser parser(message.body()); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::BlockChain::BlockHeight) { blockHeight = parser.intData(); assert(blockHeight == m_height + 1); } else if (parser.tag() == Api::BlockChain::Separator) { txOffsetInBlock = 0; outputIndex = -1; } else if (parser.tag() == Api::BlockChain::Tx_OffsetInBlock) { txOffsetInBlock = parser.intData(); } else if (parser.tag() == Api::BlockChain::Tx_Out_Index) { outputIndex = parser.intData(); } else if (parser.tag() == Api::BlockChain::Tx_Out_ScriptHash) { assert(parser.dataLength() == 32); assert(outputIndex >= 0); assert(blockHeight >= 0); assert(txOffsetInBlock > 0); insert(parser.bytesDataBuffer(), outputIndex, blockHeight, txOffsetInBlock); } } assert(blockHeight >= 0); blockFinished(blockHeight); if (blockHeight == tipHeight) { // immediately flush when we processed the tip of the chain m_topOfChain.testAndSetAcquire(InInitialSync, FlushRequested); commitAllData(); } } } void AddressIndexer::commitAllData() { assert(QThread::currentThread() == this); if (m_height == -1) { assert(m_uncommittedData.empty()); return; } QElapsedTimer time; time.start(); int rowsInserted = 0; logCritical() << "AddressDB sending data to SQL DB"; // create tables outside of transaction for (size_t db = 0; db < m_uncommittedData.size(); ++db) { const std::deque &list = m_uncommittedData.at(db); if (!list.empty()) { const QString table = addressTable(db); QSqlQuery query(m_insertDb); if (!m_spec->queryTableExists(query, table)) { static QString q("create table %1 (" "address_row INTEGER, " "block_height INTEGER, offset_in_block INTEGER, out_index INTEGER)"); if (!query.exec(q.arg(table))) { logFatal() << "Failed to create table" << query.lastError().text(); QCoreApplication::exit(1); } // when creating a new table, set the index on the previous table. if (db > 0 && m_topOfChain.loadRelaxed() == InitialSyncFinished) { if (!m_spec->createIndexIfNotExists(query, addressTable(db - 1))) { logFatal() << "Failed to create index" << query.lastError().text(); QCoreApplication::exit(1); } } } } } m_insertDb.transaction(); for (size_t db = 0; db < m_uncommittedData.size(); ++db) { const std::deque &list = m_uncommittedData.at(db); if (!list.empty()) { const QString table = addressTable(db); std::ofstream outFile(m_postgresCsv.toStdString(), std::ios_base::trunc); // write mode if (outFile.is_open()) { for (auto entry : list) { outFile << entry.row << "," << entry.height << "," << entry.offsetInBlock << "," << entry.outIndex << std::endl; } outFile.close(); } else { logFatal() << "Unable to open file for writing: " << m_postgresCsv; return; } QSqlQuery sqlQuery(m_insertDb); QString query("COPY %1 (address_row, block_height, offset_in_block, out_index) FROM '%2' WITH (FORMAT csv)"); query = query.arg(table, m_postgresCsv); if (!sqlQuery.exec(query)) { logFatal() << "Failed to start COPY command:" << sqlQuery.lastError().text(); return; } rowsInserted += list.size(); } } m_uncommittedData.clear(); QSqlQuery query(m_insertDb); query.prepare("update LastKnownState set blockHeight=:bh"); query.bindValue(":bh", m_height); if (!query.exec()) { logFatal() << "Failed to update blockheight" << query.lastError().text(); logDebug() << " q" << query.lastQuery(); } m_insertDb.commit(); logCritical().nospace() << "AddressDB: SQL-DB took " << time.elapsed() << "ms to insert " << rowsInserted << " rows"; if (m_topOfChain == FlushRequested) { // only ever run this code once per DB logCritical() << "Reached top of chain, creating indexes on our tables"; for (int db = 0;; ++db) { if (!m_spec->queryTableExists(query, addressTable(db))) // found the last DB break; const QString tableName = addressTable(db); if (m_spec->createIndexIfNotExists(query, tableName)) logInfo() << "Created index on SQL table" << tableName; } logCritical() << "Dropping table 'IBD' which was our indicator of initial sync"; query.exec("drop table IBD"); m_topOfChain = InitialSyncFinished; } }