2019-03-31 15:11:03 +02:00
|
|
|
/*
|
|
|
|
|
* This file is part of the Flowee project
|
2021-02-06 23:09:03 +01:00
|
|
|
* Copyright (C) 2019-2021 Tom Zander <tom@flowee.org>
|
2025-02-12 15:35:24 +01:00
|
|
|
* Copyright (C) 2025 John Galt <johngaltbch@pm.me>
|
2019-03-31 15:11:03 +02:00
|
|
|
*
|
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
|
* it under the terms of the GNU General Public License as published by
|
|
|
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
|
|
|
* (at your option) any later version.
|
|
|
|
|
*
|
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
|
* GNU General Public License for more details.
|
|
|
|
|
*
|
|
|
|
|
* You should have received a copy of the GNU General Public License
|
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
*/
|
|
|
|
|
#include "AddressIndexer.h"
|
2019-06-26 21:39:07 +02:00
|
|
|
#include "Indexer.h"
|
2019-04-05 13:50:41 +02:00
|
|
|
#include <uint256.h>
|
2019-06-26 21:39:07 +02:00
|
|
|
#include <APIProtocol.h>
|
|
|
|
|
#include <streaming/MessageParser.h>
|
|
|
|
|
#include <Message.h>
|
2019-03-31 15:11:03 +02:00
|
|
|
|
2019-04-06 18:19:57 +02:00
|
|
|
#include <QTime>
|
2025-10-20 23:38:54 +02:00
|
|
|
#include <QElapsedTimer>
|
2019-04-06 15:16:37 +02:00
|
|
|
#include <qsettings.h>
|
2019-03-31 15:11:03 +02:00
|
|
|
#include <qsqlerror.h>
|
2019-04-06 18:19:57 +02:00
|
|
|
#include <qtimer.h>
|
2019-03-31 15:11:03 +02:00
|
|
|
#include <qvariant.h>
|
2019-04-09 20:22:35 +02:00
|
|
|
#include <qcoreapplication.h>
|
2019-03-31 15:11:03 +02:00
|
|
|
|
2025-02-05 11:13:00 +01:00
|
|
|
#include <fstream>
|
|
|
|
|
|
2019-04-06 15:16:37 +02:00
|
|
|
namespace {
|
2019-06-19 16:23:05 +02:00
|
|
|
|
2020-05-13 12:55:07 +00:00
|
|
|
QString valueFromSettings(const QSettings &settings, const QString &key, const QString &defvalue=QString()) {
|
2019-04-06 15:16:37 +02:00
|
|
|
QVariant x = settings.value(QString("addressdb/") + key);
|
2020-05-13 12:55:07 +00:00
|
|
|
if (x.isNull()) x = settings.value(key);
|
|
|
|
|
if (x.isNull()) return defvalue;
|
2019-04-06 15:16:37 +02:00
|
|
|
return x.toString();
|
|
|
|
|
}
|
2019-04-06 18:19:57 +02:00
|
|
|
|
|
|
|
|
QString addressTable(int index) {
|
|
|
|
|
return QString("AddressUsage%1").arg(index, 2, 10, QChar('_'));
|
|
|
|
|
}
|
|
|
|
|
|
2019-04-06 15:16:37 +02:00
|
|
|
}
|
2019-03-31 15:11:03 +02:00
|
|
|
|
2019-08-13 22:21:22 +02:00
|
|
|
class TableSpecification
|
|
|
|
|
{
|
|
|
|
|
public:
|
2019-09-12 15:21:48 +02:00
|
|
|
inline virtual ~TableSpecification() {}
|
|
|
|
|
|
2019-08-13 22:21:22 +02:00
|
|
|
virtual bool queryTableExists(QSqlQuery &query, const QString &tableName) const {
|
|
|
|
|
return query.exec("select count(*) from " + tableName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// we only create one index type, so this API is assuming a lot.
|
|
|
|
|
virtual bool createIndexIfNotExists(QSqlQuery &query, const QString &tableName) const {
|
|
|
|
|
QString createIndexString("create index %1_index on %1 (address_row)");
|
|
|
|
|
return query.exec(createIndexString.arg(tableName));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
class PostgresTables : public TableSpecification
|
|
|
|
|
{
|
|
|
|
|
public:
|
|
|
|
|
bool queryTableExists(QSqlQuery &query, const QString &tableName) const override {
|
|
|
|
|
bool ok = query.exec("select exists (select 1 from pg_tables where tablename='" + tableName.toLower()
|
|
|
|
|
+ "' and schemaname='public')");
|
|
|
|
|
if (!ok)
|
|
|
|
|
return false;
|
|
|
|
|
query.next();
|
|
|
|
|
return query.value(0).toInt() == 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool createIndexIfNotExists(QSqlQuery &query, const QString &tableName) const override {
|
|
|
|
|
QString createIndexString("CREATE INDEX IF NOT EXISTS %1_index ON %1 (address_row)");
|
|
|
|
|
return query.exec(createIndexString.arg(tableName.toLower()));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2020-05-11 17:05:39 +00:00
|
|
|
class MySQLTables : public TableSpecification
|
|
|
|
|
{
|
|
|
|
|
public:
|
|
|
|
|
bool queryTableExists(QSqlQuery &query, const QString &tableName) const override {
|
2021-02-07 14:28:00 +01:00
|
|
|
query.exec("SELECT COUNT(table_name) FROM information_schema.tables WHERE table_schema=DATABASE() AND table_name='"+tableName+"'");
|
|
|
|
|
query.first();
|
|
|
|
|
return query.value(0).toInt() == 1;
|
2020-05-11 17:05:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool createIndexIfNotExists(QSqlQuery &query, const QString &tableName) const override {
|
|
|
|
|
QString createIndexString("CREATE INDEX %1_index ON %1 (address_row)");
|
|
|
|
|
return query.exec(createIndexString.arg(tableName.toLower()));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2019-06-26 21:39:07 +02:00
|
|
|
AddressIndexer::AddressIndexer(const boost::filesystem::path &basedir, Indexer *datasource)
|
2019-03-31 15:11:03 +02:00
|
|
|
: m_addresses(basedir),
|
2019-06-26 21:39:07 +02:00
|
|
|
m_basedir(QString::fromStdWString(basedir.wstring())),
|
|
|
|
|
m_dataSource(datasource),
|
2025-02-05 11:13:00 +01:00
|
|
|
m_flushRequested(0),
|
2025-02-12 15:35:24 +01:00
|
|
|
m_postgresCsv(m_basedir + "/postgres.csv")
|
2019-03-31 15:11:03 +02:00
|
|
|
{
|
2025-02-12 15:35:24 +01:00
|
|
|
assert(!m_basedir.isEmpty());
|
2019-04-06 15:16:37 +02:00
|
|
|
}
|
|
|
|
|
|
2019-08-13 22:21:22 +02:00
|
|
|
AddressIndexer::~AddressIndexer()
|
|
|
|
|
{
|
|
|
|
|
delete m_spec;
|
|
|
|
|
}
|
|
|
|
|
|
2019-04-06 15:16:37 +02:00
|
|
|
void AddressIndexer::loadSetting(const QSettings &settings)
|
|
|
|
|
{
|
|
|
|
|
auto db = valueFromSettings(settings, "db_driver");
|
2019-06-19 14:48:30 +02:00
|
|
|
m_insertDb = QSqlDatabase::addDatabase(db, "insertConnection");
|
|
|
|
|
if (!m_insertDb.isValid()) {
|
2019-04-06 15:16:37 +02:00
|
|
|
if (QSqlDatabase::drivers().contains(db)) {
|
|
|
|
|
logFatal().nospace() << "Failed to open a databse (" << db << "), missing libs?";
|
|
|
|
|
} else {
|
2019-11-29 14:47:36 +01:00
|
|
|
logFatal() << "The configured database is not known. Installed drivers are:";
|
2022-08-20 19:18:34 +02:00
|
|
|
auto drivers = QSqlDatabase::drivers();
|
|
|
|
|
logFatal() << std::list<QString>(drivers.begin(), drivers.end());
|
2019-04-06 15:16:37 +02:00
|
|
|
}
|
2019-06-19 14:48:30 +02:00
|
|
|
logCritical() << "Error reported:" << m_insertDb.lastError().text();
|
2019-03-31 15:11:03 +02:00
|
|
|
throw std::runtime_error("Failed to read database");
|
|
|
|
|
}
|
2019-08-13 22:21:22 +02:00
|
|
|
delete m_spec;
|
|
|
|
|
m_spec = nullptr;
|
2019-03-31 15:11:03 +02:00
|
|
|
|
2019-06-19 14:48:30 +02:00
|
|
|
m_selectDb = QSqlDatabase::addDatabase(db, "selectConnection");
|
2019-11-29 14:47:36 +01:00
|
|
|
logCritical().nospace() << "AddressIndexer database(" << db << ") "
|
2019-08-28 13:40:50 +02:00
|
|
|
<< valueFromSettings(settings, "db_username")
|
|
|
|
|
<< "@" << valueFromSettings(settings, "db_hostname")
|
|
|
|
|
<< " DB: " << valueFromSettings(settings, "db_database");
|
2019-04-06 15:16:37 +02:00
|
|
|
if (db == "QPSQL") {
|
2019-06-19 14:48:30 +02:00
|
|
|
m_insertDb.setDatabaseName(valueFromSettings(settings, "db_database"));
|
|
|
|
|
m_insertDb.setUserName(valueFromSettings(settings, "db_username"));
|
|
|
|
|
m_insertDb.setPassword(valueFromSettings(settings, "db_password"));
|
|
|
|
|
m_insertDb.setHostName(valueFromSettings(settings, "db_hostname"));
|
2020-05-13 12:55:07 +00:00
|
|
|
m_insertDb.setPort(valueFromSettings(settings, "db_port", "5432").toInt());
|
2019-06-19 14:48:30 +02:00
|
|
|
m_selectDb.setDatabaseName(valueFromSettings(settings, "db_database"));
|
|
|
|
|
m_selectDb.setUserName(valueFromSettings(settings, "db_username"));
|
|
|
|
|
m_selectDb.setPassword(valueFromSettings(settings, "db_password"));
|
|
|
|
|
m_selectDb.setHostName(valueFromSettings(settings, "db_hostname"));
|
2020-05-13 12:55:07 +00:00
|
|
|
m_selectDb.setPort(valueFromSettings(settings, "db_port", "5432").toInt());
|
2019-08-13 22:21:22 +02:00
|
|
|
m_spec = new PostgresTables();
|
2020-05-10 16:11:32 +00:00
|
|
|
} else if (db == "QMYSQL") {
|
|
|
|
|
m_insertDb.setDatabaseName(valueFromSettings(settings, "db_database"));
|
|
|
|
|
m_insertDb.setUserName(valueFromSettings(settings, "db_username"));
|
|
|
|
|
m_insertDb.setPassword(valueFromSettings(settings, "db_password"));
|
|
|
|
|
m_insertDb.setHostName(valueFromSettings(settings, "db_hostname"));
|
2020-05-13 12:55:07 +00:00
|
|
|
m_insertDb.setPort(valueFromSettings(settings, "db_port", "3306").toInt());
|
2020-05-10 16:11:32 +00:00
|
|
|
m_selectDb.setDatabaseName(valueFromSettings(settings, "db_database"));
|
|
|
|
|
m_selectDb.setUserName(valueFromSettings(settings, "db_username"));
|
|
|
|
|
m_selectDb.setPassword(valueFromSettings(settings, "db_password"));
|
|
|
|
|
m_selectDb.setHostName(valueFromSettings(settings, "db_hostname"));
|
2020-05-13 12:55:07 +00:00
|
|
|
m_selectDb.setPort(valueFromSettings(settings, "db_port", "3306").toInt());
|
2021-02-07 14:28:00 +01:00
|
|
|
m_spec = new MySQLTables();
|
2019-04-06 15:16:37 +02:00
|
|
|
} else if (db == "QSQLITE") {
|
2019-06-19 14:48:30 +02:00
|
|
|
m_insertDb.setDatabaseName(m_basedir + "/addresses.db");
|
|
|
|
|
m_selectDb.setDatabaseName(m_basedir + "/addresses.db");
|
2019-04-06 15:16:37 +02:00
|
|
|
}
|
|
|
|
|
|
2019-11-29 14:47:36 +01:00
|
|
|
if (!m_selectDb.isValid()) {
|
2019-06-19 14:48:30 +02:00
|
|
|
logFatal() << "Failed opening the database-connection" << m_insertDb.lastError().text();
|
2019-04-06 15:16:37 +02:00
|
|
|
throw std::runtime_error("Failed to open database connection");
|
2019-03-31 15:11:03 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int AddressIndexer::blockheight()
|
|
|
|
|
{
|
2019-06-26 21:39:07 +02:00
|
|
|
return m_height;
|
2019-03-31 15:11:03 +02:00
|
|
|
}
|
|
|
|
|
|
2019-11-06 22:55:17 +01:00
|
|
|
void AddressIndexer::blockFinished(int blockheight)
|
2019-03-31 15:11:03 +02:00
|
|
|
{
|
2021-02-06 23:09:03 +01:00
|
|
|
assert(blockheight > m_height);
|
2019-06-26 21:39:07 +02:00
|
|
|
m_height = blockheight;
|
2019-08-13 23:08:26 +02:00
|
|
|
if (++m_uncommittedCount > 150000) {
|
2019-06-26 21:39:07 +02:00
|
|
|
commitAllData();
|
2019-08-13 23:08:26 +02:00
|
|
|
m_uncommittedCount = 0;
|
|
|
|
|
}
|
2019-03-31 15:11:03 +02:00
|
|
|
}
|
|
|
|
|
|
2019-10-16 22:42:44 +02:00
|
|
|
void AddressIndexer::insert(const Streaming::ConstBuffer &outScriptHashed, int outputIndex, int blockHeight, int offsetInBlock)
|
2019-03-31 15:11:03 +02:00
|
|
|
{
|
2021-02-06 23:09:03 +01:00
|
|
|
assert(outScriptHashed.size() == 32); // a sha256
|
|
|
|
|
assert(QThread::currentThread() == this);
|
2019-03-31 15:11:03 +02:00
|
|
|
|
2019-10-16 22:42:44 +02:00
|
|
|
const uint256 *address = reinterpret_cast<const uint256*>(outScriptHashed.begin());
|
2019-04-05 13:50:41 +02:00
|
|
|
auto result = m_addresses.lookup(*address);
|
2019-03-31 15:11:03 +02:00
|
|
|
if (result.db == -1)
|
|
|
|
|
result = m_addresses.append(*address);
|
|
|
|
|
assert(result.db >= 0);
|
|
|
|
|
assert(result.row >= 0);
|
|
|
|
|
|
2020-12-25 23:51:06 +01:00
|
|
|
if (result.db >= int(m_uncommittedData.size()))
|
2019-06-26 21:39:07 +02:00
|
|
|
m_uncommittedData.resize(result.db + 1);
|
2019-04-06 18:19:57 +02:00
|
|
|
|
2019-06-26 21:39:07 +02:00
|
|
|
std::deque<Entry> &data = m_uncommittedData[result.db];
|
2019-04-06 18:19:57 +02:00
|
|
|
assert(outputIndex < 0x7FFF);
|
|
|
|
|
data.push_back({(short) outputIndex, blockHeight, result.row, offsetInBlock});
|
2019-06-26 21:39:07 +02:00
|
|
|
++m_uncommittedCount;
|
2019-03-31 15:11:03 +02:00
|
|
|
}
|
|
|
|
|
|
2019-10-16 22:42:44 +02:00
|
|
|
std::vector<AddressIndexer::TxData> AddressIndexer::find(const uint256 &address) const
|
2019-03-31 15:11:03 +02:00
|
|
|
{
|
|
|
|
|
std::vector<TxData> answer;
|
2019-04-05 13:50:41 +02:00
|
|
|
auto result = m_addresses.lookup(address);
|
2019-03-31 15:11:03 +02:00
|
|
|
if (result.db == -1)
|
|
|
|
|
return answer;
|
|
|
|
|
|
2019-06-19 14:48:30 +02:00
|
|
|
QSqlQuery query(m_selectDb);
|
2019-10-02 16:55:02 +02:00
|
|
|
const QString select = QString("select DISTINCT offset_in_block, block_height, out_index "
|
2019-04-05 13:50:41 +02:00
|
|
|
"FROM AddressUsage%1 "
|
2019-12-26 00:00:03 +01:00
|
|
|
"WHERE address_row=:row "
|
|
|
|
|
"ORDER BY block_height DESC").arg(result.db, 2, 10, QChar('_'));
|
2019-04-05 13:50:41 +02:00
|
|
|
query.prepare(select);
|
2019-03-31 15:11:03 +02:00
|
|
|
query.bindValue(":row", result.row);
|
|
|
|
|
if (!query.exec()) {
|
|
|
|
|
logFatal() << "Failed to select" << query.lastError().text();
|
2019-04-05 13:50:41 +02:00
|
|
|
logDebug() << "Failed with" << select;
|
2019-04-09 20:22:35 +02:00
|
|
|
QCoreApplication::exit(1);
|
2019-03-31 15:11:03 +02:00
|
|
|
}
|
|
|
|
|
const int size = query.size();
|
|
|
|
|
if (size > 0)
|
|
|
|
|
answer.reserve(size);
|
|
|
|
|
while (query.next()) {
|
|
|
|
|
TxData txData;
|
|
|
|
|
txData.offsetInBlock = query.value(0).toInt();
|
2019-04-11 14:59:14 +02:00
|
|
|
txData.blockHeight = query.value(1).toInt();
|
2019-10-02 16:55:02 +02:00
|
|
|
txData.outputIndex = static_cast<short>(query.value(2).toInt());
|
2019-03-31 15:11:03 +02:00
|
|
|
answer.push_back(txData);
|
|
|
|
|
}
|
|
|
|
|
return answer;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void AddressIndexer::createTables()
|
|
|
|
|
{
|
2019-08-13 22:21:22 +02:00
|
|
|
if (m_spec == nullptr)
|
|
|
|
|
m_spec = new TableSpecification(); // generic one.
|
|
|
|
|
|
2019-03-31 15:11:03 +02:00
|
|
|
/* Tables
|
2019-04-05 13:50:41 +02:00
|
|
|
* AddressUsage_N
|
2019-03-31 15:11:03 +02:00
|
|
|
* address_row INTEGER (the row that the hashStorage provided us with)
|
|
|
|
|
* block_height INTEGER \
|
|
|
|
|
* offset_in_block INTEGER /-- together give the transaction
|
|
|
|
|
* out_index INTEGER
|
|
|
|
|
*
|
|
|
|
|
* LastKnownState
|
|
|
|
|
* blockHeight INTEGER
|
|
|
|
|
*/
|
|
|
|
|
|
2019-06-19 14:48:30 +02:00
|
|
|
QSqlQuery query(m_insertDb);
|
2019-04-06 18:19:57 +02:00
|
|
|
bool doInsert = false;
|
2019-08-13 22:21:22 +02:00
|
|
|
if (!m_spec->queryTableExists(query, "LastKnownState")) {
|
2019-08-28 13:40:50 +02:00
|
|
|
logInfo() << "Creating tables...";
|
|
|
|
|
if (!query.exec("create table LastKnownState (blockheight INTEGER)")) {
|
2019-03-31 15:11:03 +02:00
|
|
|
logFatal() << "Failed to create table" << query.lastError().text();
|
|
|
|
|
throw std::runtime_error("Failed to create table");
|
|
|
|
|
}
|
2019-04-06 18:19:57 +02:00
|
|
|
doInsert = true;
|
|
|
|
|
}
|
2019-09-12 15:21:48 +02:00
|
|
|
if (doInsert || (query.next() && query.value(0).toInt() < 1)) {
|
2019-03-31 15:11:03 +02:00
|
|
|
if (!query.exec("insert into LastKnownState values (0)")) {
|
|
|
|
|
logFatal() << "Failed to insert row" << query.lastError().text();
|
|
|
|
|
throw std::runtime_error("Failed to insert row");
|
|
|
|
|
}
|
2019-08-28 13:40:50 +02:00
|
|
|
if (!query.exec("create table IBD (busy INTEGER)")) {
|
|
|
|
|
logFatal() << "Failed to create notificatoin table IBD" << query.lastError().text();
|
|
|
|
|
throw std::runtime_error("Failed to create table");
|
|
|
|
|
}
|
|
|
|
|
if (!query.exec("insert into IBD values (42)")) {
|
|
|
|
|
logFatal() << "Failed...";
|
|
|
|
|
throw std::runtime_error("Failed...");
|
2019-06-19 16:23:05 +02:00
|
|
|
}
|
|
|
|
|
}
|
2019-04-06 18:19:57 +02:00
|
|
|
}
|
2019-03-31 15:11:03 +02:00
|
|
|
|
2019-06-26 21:39:07 +02:00
|
|
|
void AddressIndexer::run()
|
2019-04-06 18:19:57 +02:00
|
|
|
{
|
2019-11-29 14:47:36 +01:00
|
|
|
// wait for database to come online.
|
|
|
|
|
while (!isInterruptionRequested()) {
|
|
|
|
|
assert(m_selectDb.isValid());
|
|
|
|
|
if (m_insertDb.open() && m_selectDb.open()) {
|
|
|
|
|
createTables();
|
|
|
|
|
|
|
|
|
|
QSqlQuery query(m_selectDb);
|
|
|
|
|
if (!query.exec("select blockheight from LastKnownState")) {
|
|
|
|
|
logFatal() << "Failed to select blockHeight" << query.lastError().text();
|
|
|
|
|
QCoreApplication::exit(1);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
query.next();
|
|
|
|
|
m_height = query.value(0).toInt();
|
|
|
|
|
m_topOfChain = m_spec->queryTableExists(query, "IBD") ? InInitialSync : InitialSyncFinished;
|
|
|
|
|
break;
|
|
|
|
|
} else {
|
2019-12-12 14:49:46 +01:00
|
|
|
sleep(3);
|
2019-11-29 14:47:36 +01:00
|
|
|
logCritical() << "Waiting for SQL DB to come online.";
|
|
|
|
|
}
|
|
|
|
|
}
|
2019-11-29 17:56:24 +01:00
|
|
|
logCritical() << "Address DB connected. Starting at block-height:" << m_height << (m_topOfChain ? "" : "in initial-sync");
|
2019-11-29 14:47:36 +01:00
|
|
|
|
2019-06-26 21:39:07 +02:00
|
|
|
assert(m_dataSource);
|
|
|
|
|
while (!isInterruptionRequested()) {
|
2019-12-12 14:49:46 +01:00
|
|
|
int tipHeight;
|
|
|
|
|
Message message = m_dataSource->nextBlock(blockheight() + 1, &tipHeight, m_uncommittedData.empty() ? ULONG_MAX : 20000);
|
2019-06-26 21:39:07 +02:00
|
|
|
|
2022-08-20 19:18:34 +02:00
|
|
|
if (m_flushRequested.loadRelaxed() == 1) {
|
2019-06-26 21:39:07 +02:00
|
|
|
commitAllData();
|
2019-08-14 22:10:06 +02:00
|
|
|
m_flushRequested = 0;
|
2019-06-26 21:39:07 +02:00
|
|
|
}
|
|
|
|
|
if (message.body().size() == 0) // typically true if the flush was requested
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
int txOffsetInBlock = 0;
|
|
|
|
|
int outputIndex = -1;
|
|
|
|
|
int blockHeight = -1;
|
|
|
|
|
|
|
|
|
|
Streaming::MessageParser parser(message.body());
|
|
|
|
|
while (parser.next() == Streaming::FoundTag) {
|
|
|
|
|
if (parser.tag() == Api::BlockChain::BlockHeight) {
|
|
|
|
|
blockHeight = parser.intData();
|
2021-02-06 23:09:03 +01:00
|
|
|
assert(blockHeight == m_height + 1);
|
2019-06-26 21:39:07 +02:00
|
|
|
} else if (parser.tag() == Api::BlockChain::Separator) {
|
|
|
|
|
txOffsetInBlock = 0;
|
|
|
|
|
outputIndex = -1;
|
|
|
|
|
} else if (parser.tag() == Api::BlockChain::Tx_OffsetInBlock) {
|
|
|
|
|
txOffsetInBlock = parser.intData();
|
|
|
|
|
} else if (parser.tag() == Api::BlockChain::Tx_Out_Index) {
|
|
|
|
|
outputIndex = parser.intData();
|
2019-10-16 22:42:44 +02:00
|
|
|
} else if (parser.tag() == Api::BlockChain::Tx_Out_ScriptHash) {
|
|
|
|
|
assert(parser.dataLength() == 32);
|
2019-06-26 21:39:07 +02:00
|
|
|
assert(outputIndex >= 0);
|
2021-02-06 23:09:03 +01:00
|
|
|
assert(blockHeight >= 0);
|
2019-06-26 21:39:07 +02:00
|
|
|
assert(txOffsetInBlock > 0);
|
|
|
|
|
insert(parser.bytesDataBuffer(), outputIndex, blockHeight, txOffsetInBlock);
|
|
|
|
|
}
|
|
|
|
|
}
|
2021-02-06 23:09:03 +01:00
|
|
|
assert(blockHeight >= 0);
|
2019-11-06 22:55:17 +01:00
|
|
|
blockFinished(blockHeight);
|
2019-12-12 16:02:08 +01:00
|
|
|
if (blockHeight == tipHeight) { // immediately flush when we processed the tip of the chain
|
|
|
|
|
m_topOfChain.testAndSetAcquire(InInitialSync, FlushRequested);
|
2019-12-12 14:49:46 +01:00
|
|
|
commitAllData();
|
2019-12-12 16:02:08 +01:00
|
|
|
}
|
2019-06-26 21:39:07 +02:00
|
|
|
}
|
2019-04-06 18:19:57 +02:00
|
|
|
}
|
|
|
|
|
|
2019-06-26 21:39:07 +02:00
|
|
|
void AddressIndexer::commitAllData()
|
2019-04-06 18:19:57 +02:00
|
|
|
{
|
2021-02-06 23:09:03 +01:00
|
|
|
assert(QThread::currentThread() == this);
|
2019-08-14 22:10:06 +02:00
|
|
|
if (m_height == -1) {
|
2021-02-06 23:09:03 +01:00
|
|
|
assert(m_uncommittedData.empty());
|
2019-08-14 22:10:06 +02:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2022-08-20 19:18:34 +02:00
|
|
|
QElapsedTimer time;
|
2019-04-06 18:19:57 +02:00
|
|
|
time.start();
|
|
|
|
|
int rowsInserted = 0;
|
2019-10-16 21:15:20 +02:00
|
|
|
logCritical() << "AddressDB sending data to SQL DB";
|
2019-04-06 18:19:57 +02:00
|
|
|
// create tables outside of transaction
|
|
|
|
|
for (size_t db = 0; db < m_uncommittedData.size(); ++db) {
|
|
|
|
|
const std::deque<Entry> &list = m_uncommittedData.at(db);
|
|
|
|
|
if (!list.empty()) {
|
|
|
|
|
const QString table = addressTable(db);
|
2019-06-26 21:39:07 +02:00
|
|
|
QSqlQuery query(m_insertDb);
|
2019-08-18 20:24:13 +02:00
|
|
|
if (!m_spec->queryTableExists(query, table)) {
|
2019-04-06 18:19:57 +02:00
|
|
|
static QString q("create table %1 ("
|
|
|
|
|
"address_row INTEGER, "
|
|
|
|
|
"block_height INTEGER, offset_in_block INTEGER, out_index INTEGER)");
|
|
|
|
|
if (!query.exec(q.arg(table))) {
|
|
|
|
|
logFatal() << "Failed to create table" << query.lastError().text();
|
2019-04-09 20:22:35 +02:00
|
|
|
QCoreApplication::exit(1);
|
2019-04-06 18:19:57 +02:00
|
|
|
}
|
2019-06-19 16:23:05 +02:00
|
|
|
// when creating a new table, set the index on the previous table.
|
2022-08-20 19:18:34 +02:00
|
|
|
if (db > 0 && m_topOfChain.loadRelaxed() == InitialSyncFinished) {
|
2019-08-14 22:10:06 +02:00
|
|
|
if (!m_spec->createIndexIfNotExists(query, addressTable(db - 1))) {
|
2019-06-19 16:23:05 +02:00
|
|
|
logFatal() << "Failed to create index" << query.lastError().text();
|
|
|
|
|
QCoreApplication::exit(1);
|
|
|
|
|
}
|
|
|
|
|
}
|
2019-04-06 18:19:57 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2019-06-26 21:39:07 +02:00
|
|
|
m_insertDb.transaction();
|
2019-04-06 18:19:57 +02:00
|
|
|
for (size_t db = 0; db < m_uncommittedData.size(); ++db) {
|
|
|
|
|
const std::deque<Entry> &list = m_uncommittedData.at(db);
|
|
|
|
|
if (!list.empty()) {
|
|
|
|
|
const QString table = addressTable(db);
|
2025-02-05 11:13:00 +01:00
|
|
|
|
2025-02-12 15:35:24 +01:00
|
|
|
std::ofstream outFile(m_postgresCsv.toStdString(), std::ios_base::trunc); // write mode
|
2025-02-05 11:13:00 +01:00
|
|
|
|
|
|
|
|
if (outFile.is_open()) {
|
|
|
|
|
for (auto entry : list) {
|
|
|
|
|
outFile << entry.row << "," << entry.height << "," << entry.offsetInBlock << "," << entry.outIndex << std::endl;
|
|
|
|
|
}
|
|
|
|
|
outFile.close();
|
|
|
|
|
} else {
|
2025-02-12 15:35:24 +01:00
|
|
|
logFatal() << "Unable to open file for writing: " << m_postgresCsv;
|
|
|
|
|
return;
|
2025-02-05 11:13:00 +01:00
|
|
|
}
|
|
|
|
|
|
2025-02-12 15:35:24 +01:00
|
|
|
QSqlQuery sqlQuery(m_insertDb);
|
|
|
|
|
QString query("COPY %1 (address_row, block_height, offset_in_block, out_index) FROM '%2' WITH (FORMAT csv)");
|
|
|
|
|
query = query.arg(table, m_postgresCsv);
|
|
|
|
|
if (!sqlQuery.exec(query)) {
|
|
|
|
|
logFatal() << "Failed to start COPY command:" << sqlQuery.lastError().text();
|
2025-02-05 11:13:00 +01:00
|
|
|
return;
|
2019-04-06 18:19:57 +02:00
|
|
|
}
|
2025-02-05 11:13:00 +01:00
|
|
|
|
2019-04-06 18:19:57 +02:00
|
|
|
rowsInserted += list.size();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
m_uncommittedData.clear();
|
2019-06-26 21:39:07 +02:00
|
|
|
QSqlQuery query(m_insertDb);
|
2019-04-06 18:19:57 +02:00
|
|
|
query.prepare("update LastKnownState set blockHeight=:bh");
|
|
|
|
|
query.bindValue(":bh", m_height);
|
|
|
|
|
if (!query.exec()) {
|
|
|
|
|
logFatal() << "Failed to update blockheight" << query.lastError().text();
|
|
|
|
|
logDebug() << " q" << query.lastQuery();
|
|
|
|
|
}
|
|
|
|
|
|
2019-06-26 21:39:07 +02:00
|
|
|
m_insertDb.commit();
|
2019-10-16 21:15:20 +02:00
|
|
|
logCritical().nospace() << "AddressDB: SQL-DB took " << time.elapsed() << "ms to insert " << rowsInserted << " rows";
|
2019-08-28 13:40:50 +02:00
|
|
|
|
|
|
|
|
if (m_topOfChain == FlushRequested) { // only ever run this code once per DB
|
|
|
|
|
logCritical() << "Reached top of chain, creating indexes on our tables";
|
|
|
|
|
for (int db = 0;; ++db) {
|
|
|
|
|
if (!m_spec->queryTableExists(query, addressTable(db))) // found the last DB
|
|
|
|
|
break;
|
|
|
|
|
const QString tableName = addressTable(db);
|
|
|
|
|
if (m_spec->createIndexIfNotExists(query, tableName))
|
|
|
|
|
logInfo() << "Created index on SQL table" << tableName;
|
|
|
|
|
}
|
|
|
|
|
logCritical() << "Dropping table 'IBD' which was our indicator of initial sync";
|
|
|
|
|
query.exec("drop table IBD");
|
|
|
|
|
m_topOfChain = InitialSyncFinished;
|
|
|
|
|
}
|
2019-03-31 15:11:03 +02:00
|
|
|
}
|