Files

656 lines
25 KiB
C++
Raw Permalink Normal View History

2019-03-30 14:26:00 +01:00
/*
* This file is part of the Flowee project
2021-02-06 23:09:03 +01:00
* Copyright (C) 2019-2021 Tom Zander <tom@flowee.org>
2019-03-30 14:26:00 +01:00
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2026-05-14 13:13:40 +02:00
#if defined(HAVE_CONFIG_H)
#include "config/flowee-config.h"
#endif
2019-03-30 14:26:00 +01:00
#include "Indexer.h"
2019-06-26 21:39:07 +02:00
#include "AddressIndexer.h"
#include "TxIndexer.h"
#include "SpentOuputIndexer.h"
2019-03-30 14:26:00 +01:00
#include <Logger.h>
2021-01-05 22:05:25 +01:00
#include <hash.h>
#include <cashaddr.h>
2019-03-30 14:26:00 +01:00
#include <streaming/MessageBuilder.h>
#include <streaming/MessageParser.h>
2022-01-25 23:20:12 +01:00
#include <streaming/BufferPools.h>
2019-03-30 14:26:00 +01:00
#include <APIProtocol.h>
#include <qbytearray.h>
2019-04-06 15:16:37 +02:00
#include <qsettings.h>
2019-04-11 14:59:14 +02:00
#include <qdatetime.h>
2023-11-24 18:01:36 +01:00
#include <primitives/PublicKey.h>
#include <utilstrencodings.h>
2019-03-30 14:26:00 +01:00
2019-04-06 15:16:37 +02:00
#include <qfile.h>
2019-04-06 18:19:57 +02:00
#include <qcoreapplication.h>
#include <QFileInfo>
2019-04-06 15:16:37 +02:00
2019-06-26 21:39:07 +02:00
namespace {
static std::vector<std::atomic<int> > s_requestedHeights = std::vector<std::atomic<int> >(3);
struct Token {
2019-08-09 17:18:08 +02:00
Token(int wantedHeight) : m_wantedHeight(wantedHeight) {
2020-07-27 21:27:26 +02:00
for (int i = 0; i < int(s_requestedHeights.size()); ++i) {
2019-08-09 17:18:08 +02:00
int expected = -1;
if (s_requestedHeights[i].compare_exchange_strong(expected, wantedHeight)) {
2019-06-26 21:39:07 +02:00
m_token = i;
break;
}
}
assert(m_token >= 0); // if fail, then make sure your vector size matches the max number of indexer-threads
}
~Token() {
2019-08-09 17:18:08 +02:00
// only exchange when someone else didn't take our slot yet.
s_requestedHeights[m_token].compare_exchange_strong(m_wantedHeight, -1);
2019-06-26 21:39:07 +02:00
}
int allocatedTokens() const {
int answer = 0;
for (size_t i = 0; i < s_requestedHeights.size(); ++i) {
if (s_requestedHeights[i].load() != -1)
answer++;
}
return answer;
}
private:
int m_token = -1;
2019-08-09 17:18:08 +02:00
int m_wantedHeight = 0;
2019-06-26 21:39:07 +02:00
};
void buildAddressSearchReply(Streaming::MessageBuilder &builder, const std::vector<AddressIndexer::TxData> &data)
{
int bh = -1, oib = -1;
for (auto item : data) {
if (item.blockHeight != bh) // avoid repeating oneself (makes the message smaller).
builder.add(Api::Indexer::BlockHeight, item.blockHeight);
bh = item.blockHeight;
if (item.offsetInBlock != oib)
builder.add(Api::Indexer::OffsetInBlock, item.offsetInBlock);
oib = item.offsetInBlock;
builder.add(Api::Indexer::OutIndex, item.outputIndex);
builder.add(Api::Indexer::Separator, true);
}
}
2019-06-26 21:39:07 +02:00
}
2019-03-30 14:26:00 +01:00
Indexer::Indexer(const boost::filesystem::path &basedir)
2019-04-09 19:34:55 +02:00
: NetworkService(Api::IndexerService),
2023-12-21 15:13:56 +01:00
m_pool(std::make_shared<Streaming::BufferPool>()),
m_poolAddressAnswers(std::make_shared<Streaming::BufferPool>()),
2019-09-12 15:21:48 +02:00
m_basedir(basedir),
2025-02-08 19:05:26 +01:00
m_network(m_workers.ioContext()),
2019-12-12 14:49:46 +01:00
m_bestBlockHeight(0)
2019-03-30 14:26:00 +01:00
{
qRegisterMetaType<Message>("Message");
2019-04-10 14:30:30 +02:00
m_network.addService(this);
2019-06-26 21:39:07 +02:00
// init static
for (size_t i = 0; i < s_requestedHeights.size(); ++i) {
s_requestedHeights[i] = -1;
}
2019-04-11 14:59:14 +02:00
connect (&m_pollingTimer, SIGNAL(timeout()), SLOT(checkBlockArrived()));
2019-04-10 22:17:13 +02:00
m_pollingTimer.start(2 * 60 * 1000);
connect (this, SIGNAL(requestFindAddress(Message)), this, SLOT(onFindAddressRequest(Message)), Qt::QueuedConnection);
2019-03-30 14:26:00 +01:00
}
Indexer::~Indexer()
{
2019-06-26 21:39:07 +02:00
if (m_txdb)
m_txdb->requestInterruption();
if (m_addressdb)
m_addressdb->requestInterruption();
if (m_spentOutputDb)
m_spentOutputDb->requestInterruption();
2019-06-26 22:48:00 +02:00
m_waitForBlock.wakeAll();
2019-06-26 21:39:07 +02:00
if (m_txdb) {
m_txdb->wait();
delete m_txdb;
}
if (m_addressdb) {
m_addressdb->wait();
delete m_addressdb;
}
if (m_spentOutputDb) {
m_spentOutputDb->wait();
delete m_spentOutputDb;
}
2019-03-30 14:26:00 +01:00
}
void Indexer::tryConnectHub(const EndPoint &ep)
{
2019-09-12 15:21:48 +02:00
m_serverConnection = m_network.connection(ep);
2019-03-30 14:26:00 +01:00
if (!m_serverConnection.isValid())
throw std::runtime_error("Invalid Endpoint, can't create connection");
m_serverConnection.setOnConnected(std::bind(&Indexer::hubConnected, this, std::placeholders::_1));
m_serverConnection.setOnDisconnected(std::bind(&Indexer::hubDisconnected, this));
m_serverConnection.setOnIncomingMessage(std::bind(&Indexer::hubSentMessage, this, std::placeholders::_1));
m_serverConnection.connect();
}
void Indexer::bind(const boost::asio::ip::tcp::endpoint &endpoint)
2019-04-09 19:34:55 +02:00
{
m_network.bind(endpoint);
m_isServer = true;
2019-04-09 19:34:55 +02:00
}
void Indexer::loadConfig(const QString &filename, const EndPoint &prioHubLocation)
2019-04-06 15:16:37 +02:00
{
using boost::asio::ip::tcp;
EndPoint hub(prioHubLocation);
2019-04-06 15:16:37 +02:00
2019-08-18 16:23:32 +02:00
if (!QFile::exists(filename)) {
if (m_txdb == nullptr && hub.isValid()) {
// lets do SOMETHING by default.
2025-02-08 19:05:26 +01:00
m_txdb = new TxIndexer(m_workers.ioContext(), m_basedir / "txindex", this);
2019-08-18 16:23:32 +02:00
tryConnectHub(hub);
QTimer::singleShot(500, m_txdb, SLOT(start()));
}
return;
}
logCritical() << "Reading config from:" << filename;
2019-08-18 16:23:32 +02:00
QSettings settings(filename, QSettings::IniFormat);
auto datadir = settings.value("datadir");
if (datadir.isValid()) {
QFileInfo dir(datadir.toString());
if (dir.exists() && dir.isDir()) {
m_basedir = dir.absolutePath().toStdString();
}
else {
logFatal() << "Requested basedir can't be found:" << datadir.toString();
return;
}
}
logCritical() << "basedir for data is:" << m_basedir.string();
2019-08-18 16:23:32 +02:00
bool enableTxDB = false, enableAddressDb = false, enableSpentDb = false;
2019-04-06 15:16:37 +02:00
const QStringList groups = settings.childGroups();
2020-12-25 23:51:06 +01:00
for (auto &group : groups) {
if (group == "addressdb") {
2019-06-26 21:39:07 +02:00
enableAddressDb = settings.value("addressdb/enabled", "false").toBool();
}
else if (group == "txdb") {
2019-06-26 21:39:07 +02:00
enableTxDB = settings.value("txdb/enabled", "false").toBool();
2019-06-11 17:28:16 +02:00
}
else if (group == "spentdb") {
2019-06-26 21:39:07 +02:00
enableSpentDb = settings.value("spentdb/enabled", "false").toBool();
}
else if (group == "services") {
2019-08-28 13:40:50 +02:00
if (hub.hostname.empty()) { // only if user didn't override using commandline
QString connectionString = settings.value("services/hub").toString();
2019-08-28 13:40:50 +02:00
hub = EndPoint("", 1235); // clear the IP address-default
SplitHostPort(connectionString.toStdString(), hub.announcePort, hub.hostname);
}
}
else if (settings.value(group + "/ip").isValid()) {
EndPoint ep("", 1234);
auto portVar = settings.value(group + "/port");
if (portVar.isValid()) {
bool ok;
ep.announcePort = portVar.toInt(&ok);
if (!ok) {
logCritical() << "Config file has 'port' value that is not a number.";
continue;
}
}
try {
QString bindAddress = settings.value(group + "/ip").toString();
ep.ipAddress = bindAddress == "localhost"
? boost::asio::ip::address_v4::loopback()
2025-02-11 19:41:22 +01:00
: boost::asio::ip::make_address(bindAddress.toStdString());
2019-09-12 15:21:48 +02:00
} catch (const std::runtime_error &) {
logCritical() << "Config file has invalid IP address value to bind to.";
continue;
}
logCritical().nospace() << "Binding to " << ep.ipAddress.to_string().c_str() << ":" << ep.announcePort;
try {
bind(tcp::endpoint(ep.ipAddress, ep.announcePort));
} catch (std::exception &e) {
logCritical() << " " << e << "skipping";
}
}
else {
logCritical().nospace() << "Config file has unrecognized or empty group. Skipping: "
<< "[" << group << "]";
}
2019-04-06 15:16:37 +02:00
}
if (!m_isServer) // then add localhost
bind(tcp::endpoint(boost::asio::ip::address_v4::loopback(), 1234));
if (!m_isServer) // then add localhost ipv6
bind(tcp::endpoint(boost::asio::ip::address_v6::loopback(), 1234));
2019-06-26 21:39:07 +02:00
// make sure we have the right workers.
if (enableAddressDb && !m_addressdb) {
m_addressdb = new AddressIndexer(m_basedir / "addresses", this);
2019-08-13 22:21:22 +02:00
m_addressdb->loadSetting(settings);
m_addressdb->start();
2019-06-26 21:39:07 +02:00
} else if (!enableAddressDb && m_addressdb) {
m_addressdb->requestInterruption();
m_addressdb->wait();
delete m_addressdb;
m_addressdb = nullptr;
}
if (enableTxDB && !m_txdb) {
2025-02-08 19:05:26 +01:00
m_txdb = new TxIndexer(m_workers.ioContext(), m_basedir / "txindex", this);
m_txdb->start();
2019-06-26 21:39:07 +02:00
} else if (!enableTxDB && m_txdb) {
m_txdb->requestInterruption();
m_txdb->wait();
delete m_txdb;
m_txdb = nullptr;
}
if (enableSpentDb && !m_spentOutputDb) {
2025-02-08 19:05:26 +01:00
m_spentOutputDb = new SpentOutputIndexer(m_workers.ioContext(), m_basedir / "spent", this);
m_spentOutputDb->start();
2019-06-26 21:39:07 +02:00
} else if (!enableSpentDb && m_spentOutputDb) {
m_spentOutputDb->requestInterruption();
m_spentOutputDb->wait();
delete m_spentOutputDb;
m_spentOutputDb = nullptr;
}
// connect to upstream Hub
try {
if (hub.isValid())
tryConnectHub(hub);
} catch (const std::exception &e) {
logFatal() << "Config: Hub connection string invalid." << e;
}
2019-04-06 15:16:37 +02:00
}
2019-04-09 19:34:55 +02:00
void Indexer::onIncomingMessage(NetworkService::Remote *con, const Message &message, const EndPoint &)
{
2021-02-06 23:09:03 +01:00
assert(message.serviceId() == Api::IndexerService);
2019-04-09 19:34:55 +02:00
if (message.messageId() == Api::Indexer::GetAvailableIndexers) {
2023-12-21 15:13:56 +01:00
auto pool = Streaming::pool(10);
2022-01-25 23:20:12 +01:00
Streaming::MessageBuilder builder(pool);
2019-06-26 21:39:07 +02:00
if (m_txdb)
2019-04-09 19:34:55 +02:00
builder.add(Api::Indexer::TxIdIndexer, true);
2019-06-26 21:39:07 +02:00
if (m_addressdb)
2019-04-09 19:34:55 +02:00
builder.add(Api::Indexer::AddressIndexer, true);
2019-06-26 21:39:07 +02:00
if (m_spentOutputDb)
2019-06-11 17:28:16 +02:00
builder.add(Api::Indexer::SpentOutputIndexer, true);
2019-06-26 22:19:28 +02:00
con->connection.send(builder.reply(message));
2019-04-09 19:34:55 +02:00
}
else if (message.messageId() == Api::Indexer::FindTransaction) {
2019-06-26 21:39:07 +02:00
if (!m_txdb) {
2019-04-09 19:34:55 +02:00
con->connection.disconnect();
return;
}
Streaming::MessageParser parser(message.body());
while (parser.next() == Streaming::FoundTag) {
if (parser.tag() == Api::Indexer::TxId) {
if (parser.dataLength() != 32) {
con->connection.disconnect();
return;
}
const uint256 *txid = reinterpret_cast<const uint256*>(parser.bytesDataBuffer().begin());
2019-06-26 21:39:07 +02:00
auto data = m_txdb->find(*txid);
2023-12-21 15:13:56 +01:00
auto pool = Streaming::pool(20);
2022-01-25 23:20:12 +01:00
Streaming::MessageBuilder builder(pool);
2019-04-09 19:34:55 +02:00
builder.add(Api::Indexer::BlockHeight, data.blockHeight);
builder.add(Api::Indexer::OffsetInBlock, data.offsetInBlock);
2019-06-26 22:19:28 +02:00
con->connection.send(builder.reply(message));
2019-04-09 19:34:55 +02:00
return; // just one item per message
}
}
}
else if (message.messageId() == Api::Indexer::FindAddress) {
2019-06-26 21:39:07 +02:00
if (!m_addressdb) {
2019-04-09 19:34:55 +02:00
con->connection.disconnect();
return;
}
// since the AddressDB is backed by a slow SQL database, move the
2019-09-13 15:12:54 +02:00
// handling out of this thread in order to keep networkmanager IO going fast.
emit requestFindAddress(message);
2019-04-09 19:34:55 +02:00
}
2019-06-11 17:28:16 +02:00
else if (message.messageId() == Api::Indexer::FindSpentOutput) {
2019-06-26 21:39:07 +02:00
if (!m_spentOutputDb) {
2019-06-11 17:28:16 +02:00
con->connection.disconnect();
return;
}
Streaming::MessageParser parser(message.body());
const uint256 *txid = nullptr;
int outIndex = 0;
while (parser.next() == Streaming::FoundTag) {
if (parser.tag() == Api::Indexer::TxId) {
if (parser.dataLength() != 32) {
con->connection.disconnect();
return;
}
txid = reinterpret_cast<const uint256*>(parser.bytesDataBuffer().begin());
}
else if (parser.tag() == Api::Indexer::OutIndex) {
outIndex = parser.intData();
}
}
if (txid == nullptr || outIndex < 0) {
con->connection.disconnect();
return;
}
2019-06-26 21:39:07 +02:00
auto data = m_spentOutputDb->findSpendingTx(*txid, outIndex);
2023-12-21 15:13:56 +01:00
auto pool = Streaming::pool(20);
2022-01-25 23:20:12 +01:00
Streaming::MessageBuilder builder(pool);
2019-06-11 17:28:16 +02:00
builder.add(Api::Indexer::BlockHeight, data.blockHeight);
builder.add(Api::Indexer::OffsetInBlock, data.offsetInBlock);
2019-06-26 22:19:28 +02:00
con->connection.send(builder.reply(message));
2019-06-11 17:28:16 +02:00
}
2020-01-20 12:17:55 +01:00
else if (message.messageId() == Api::Indexer::GetIndexerLastBlock) {
int lastReceivedBlock = -1;
Streaming::MessageParser parser(m_nextBlock.body());
while (parser.next() == Streaming::FoundTag) {
if (parser.tag() == Api::BlockChain::BlockHeight) {
lastReceivedBlock = parser.intData();
break;
}
}
2023-12-21 15:13:56 +01:00
auto pool = Streaming::pool(10);
2022-01-25 23:20:12 +01:00
Streaming::MessageBuilder builder(pool);
2020-01-20 12:17:55 +01:00
builder.add(Api::Indexer::BlockHeight, lastReceivedBlock);
con->connection.send(builder.reply(message));
}
2020-07-27 21:27:26 +02:00
else if (message.messageId() == Api::Indexer::Version) {
2023-12-21 15:13:56 +01:00
auto pool = Streaming::pool(10);
2022-01-25 23:20:12 +01:00
Streaming::MessageBuilder builder(pool);
2020-07-27 21:27:26 +02:00
std::ostringstream ss;
ss << "Flowee Indexer:" << HUB_SERIES << " (" << CLIENT_VERSION_MAJOR << "-";
ss.width(2);
ss.fill('0');
ss << CLIENT_VERSION_MINOR << ")";
builder.add(Api::Indexer::GenericByteData, ss.str());
con->connection.send(builder.reply(message));
}
2019-04-09 19:34:55 +02:00
}
2019-12-12 14:49:46 +01:00
Message Indexer::nextBlock(int height, int *knownTip, unsigned long timeout)
2019-03-30 14:26:00 +01:00
{
2019-12-12 14:49:46 +01:00
if (knownTip)
*knownTip = 0;
2019-06-26 21:39:07 +02:00
QMutexLocker lock(&m_nextBlockLock);
// store an RAII token to synchronize all threads.
Token token(height);
while (!QThread::currentThread()->isInterruptionRequested()) {
if (m_nextBlock.serviceId() == Api::BlockChainService && m_nextBlock.messageId() == Api::BlockChain::GetBlockReply) {
Streaming::MessageParser parser(m_nextBlock.body());
parser.next();
2019-12-12 14:49:46 +01:00
if (parser.tag() == Api::BlockChain::BlockHeight && parser.intData() == height) {
if (knownTip)
*knownTip = m_bestBlockHeight.load();
2019-06-26 21:39:07 +02:00
return m_nextBlock;
2019-12-12 14:49:46 +01:00
}
2019-06-26 21:39:07 +02:00
}
int totalWanted = 0;
if (m_txdb) totalWanted++;
if (m_spentOutputDb) totalWanted++;
if (m_addressdb) totalWanted++;
if (token.allocatedTokens() == totalWanted && height <= m_bestBlockHeight.load())
requestBlock();
2019-06-26 21:39:07 +02:00
// wait until the network-manager thread actually finds the block-message as sent by the Hub
2019-08-28 13:40:50 +02:00
if (!m_waitForBlock.wait(&m_nextBlockLock, timeout))
break;
2019-06-26 21:39:07 +02:00
}
return Message();
2019-03-30 14:26:00 +01:00
}
2019-04-11 14:59:14 +02:00
void Indexer::checkBlockArrived()
2019-04-10 22:17:13 +02:00
{
2019-04-11 14:59:14 +02:00
if (!m_serverConnection.isConnected())
return;
if (QDateTime::currentMSecsSinceEpoch() - m_timeLastRequest < 20000)
return;
2019-06-26 21:39:07 +02:00
QMutexLocker lock(&m_nextBlockLock);
int lastReceivedBlock = -1;
Streaming::MessageParser parser(m_nextBlock.body());
while (parser.next() == Streaming::FoundTag) {
if (parser.tag() == Api::BlockChain::BlockHeight) {
lastReceivedBlock = parser.intData();
break;
}
}
if (m_lastRequestedBlock != lastReceivedBlock) {
2019-04-11 14:59:14 +02:00
logDebug() << "repeating block request";
// Hub never sent the block to us :(
requestBlock(m_lastRequestedBlock);
2019-04-11 14:59:14 +02:00
}
2019-12-12 16:02:08 +01:00
// also poll the block count, so we can progress if for some reason the notification was not send.
m_serverConnection.send(Message(Api::BlockChainService, Api::BlockChain::GetBlockCount));
2019-04-10 22:17:13 +02:00
}
void Indexer::onFindAddressRequest(const Message &message)
{
NetworkConnection con;
try {
2019-09-12 15:21:48 +02:00
con = m_network.connection(m_network.endPoint(message.remote), NetworkManager::OnlyExisting);
} catch (...) {
// remote no longer connected.
return;
}
if (!con.isConnected())
return;
Streaming::MessageParser parser(message);
while (parser.next() == Streaming::FoundTag) {
if (parser.tag() == Api::Indexer::BitcoinScriptHashed) {
if (parser.dataLength() != 32) {
con.disconnect();
return;
}
const uint256 *a = reinterpret_cast<const uint256*>(parser.bytesDataBuffer().begin());
logDebug() << "FindAddress on hash:" << *a;
auto data = m_addressdb->find(*a);
2023-12-21 15:13:56 +01:00
auto pool = Streaming::pool(data.size() * 30);
Streaming::MessageBuilder builder(pool);
buildAddressSearchReply(builder, data);
con.send(builder.reply(message));
return; // just one request per message
}
if (parser.tag() == Api::Indexer::BitcoinP2PKHAddress) {
if (parser.dataLength() != 20) {
con.disconnect();
return;
}
logDebug() << "FindAddress on address" << parser.bytesDataBuffer();
static const uint8_t prefix[3] = { 0x76, 0xA9, 20}; // OP_DUP OP_HASH160, 20-byte-push
static const uint8_t postfix[2] = { 0x88, 0xAC }; // OP_EQUALVERIFY OP_CHECKSIG
CSHA256 sha;
sha.write(prefix, 3);
sha.write(reinterpret_cast<const uint8_t*>(parser.bytesDataBuffer().begin()), 20);
sha.write(postfix, 2);
uint256 hash;
sha.finalize(reinterpret_cast<unsigned char*>(&hash));
logDebug() << " + on hash:" << hash;
auto data = m_addressdb->find(hash);
2023-12-21 15:13:56 +01:00
auto pool = Streaming::pool(data.size() * 30);
Streaming::MessageBuilder builder(pool);
buildAddressSearchReply(builder, data);
con.send(builder.reply(message));
return; // just one request per message
}
}
}
2019-04-06 18:19:57 +02:00
void Indexer::hubConnected(const EndPoint &ep)
2019-03-30 14:26:00 +01:00
{
2019-06-26 21:39:07 +02:00
int txHeight = m_txdb ? m_txdb->blockheight() : -1;
int adHeight = m_addressdb ? m_addressdb->blockheight() : -1;
int spentHeight = m_spentOutputDb ? m_spentOutputDb->blockheight() : -1;
2019-06-11 17:28:16 +02:00
logCritical() << "Connection to hub established." << ep << "TxDB:" << txHeight
<< "addressDB:" << adHeight
<< "spentOutputDB" << spentHeight;
m_serverConnection.send(Message(Api::APIService, Api::Meta::Version));
2019-12-12 14:49:46 +01:00
m_serverConnection.send(Message(Api::BlockChainService, Api::BlockChain::GetBlockCount));
2019-04-10 22:17:13 +02:00
m_serverConnection.send(Message(Api::BlockNotificationService, Api::BlockNotification::Subscribe));
2019-04-06 18:19:57 +02:00
}
2019-08-13 16:19:31 +02:00
void Indexer::requestBlock(int newBlockHeight)
2019-04-06 18:19:57 +02:00
{
2019-08-28 13:40:50 +02:00
if (!m_serverConnection.isConnected()) {
2019-08-29 22:33:04 +02:00
logCritical() << "Waiting for hub" << m_serverConnection.endPoint();
2019-08-28 13:40:50 +02:00
return;
}
2019-04-10 14:30:30 +02:00
int blockHeight = 9999999;
2019-06-26 21:39:07 +02:00
for (size_t i = 0; i < s_requestedHeights.size(); ++i) {
int h = s_requestedHeights.at(i).load();
if (h != -1)
blockHeight = std::min(h, blockHeight);
}
if (blockHeight == 9999999) {
if (newBlockHeight == m_lastRequestedBlock && m_lastRequestedBlock > 0)
// we restart or timeout and someone requests the m_lastRequested one again.
2019-08-13 16:19:31 +02:00
blockHeight = newBlockHeight;
else // no valid block to get.
return;
2019-08-13 16:19:31 +02:00
}
const auto now = QDateTime::currentMSecsSinceEpoch();
if (m_lastRequestedBlock == blockHeight && now - m_timeLastRequest < 1200)
return;
2020-01-15 12:16:56 +01:00
if (blockHeight > m_bestBlockHeight.load())
return;
2019-04-11 14:59:14 +02:00
m_lastRequestedBlock = blockHeight;
m_timeLastRequest = now;
2023-12-21 15:13:56 +01:00
m_pool->reserve(20);
2019-03-30 14:26:00 +01:00
Streaming::MessageBuilder builder(m_pool);
2019-04-06 18:19:57 +02:00
builder.add(Api::BlockChain::BlockHeight, blockHeight);
2019-06-26 21:39:07 +02:00
if (m_txdb)
builder.add(Api::BlockChain::Include_TxId, true);
2019-06-26 21:39:07 +02:00
if (m_addressdb)
builder.add(Api::BlockChain::Include_OutputScriptHash, true);
2019-06-26 21:39:07 +02:00
if (m_spentOutputDb)
2019-06-11 17:28:16 +02:00
builder.add(Api::BlockChain::Include_Inputs, true);
builder.add(Api::BlockChain::Include_OffsetInBlock, true);
2019-06-26 21:39:07 +02:00
logDebug() << "requesting block" << blockHeight;
2019-03-30 14:26:00 +01:00
m_serverConnection.send(builder.message(Api::BlockChainService, Api::BlockChain::GetBlock));
}
void Indexer::hubDisconnected()
{
logCritical() << "Hub disconnected";
2019-03-30 14:26:00 +01:00
}
void Indexer::hubSentMessage(const Message &message)
{
if (message.serviceId() == Api::BlockChainService) {
if (message.messageId() == Api::BlockChain::GetBlockReply) {
2019-06-26 21:39:07 +02:00
int blockHeight = -1;
Streaming::MessageParser parser(message.body());
while (parser.next() == Streaming::FoundTag) {
if (parser.tag() == Api::BlockChain::BlockHeight) {
blockHeight = parser.intData();
logDebug() << "Hub sent us block" << blockHeight;
2019-12-12 14:49:46 +01:00
if ((blockHeight % 500) == 0 || m_timeLastLogLine + 2000 < QDateTime::currentMSecsSinceEpoch()) {
2019-08-18 20:24:13 +02:00
m_timeLastLogLine = QDateTime::currentMSecsSinceEpoch();
2019-06-26 21:39:07 +02:00
logCritical() << "Processing block" << blockHeight;
2019-08-18 20:24:13 +02:00
}
2019-06-26 21:39:07 +02:00
break;
}
}
QMutexLocker lock(&m_nextBlockLock);
if (m_lastRequestedBlock == blockHeight) {
m_nextBlock = message;
// Clear tokens.
// While the threads do this themselves too, its better to do it
// here in order to make sure **our** internel state is proper and
// we can avoid multiple calls to requestBlock().
for (size_t i = 0; i < s_requestedHeights.size(); ++i) {
int expected = blockHeight;
s_requestedHeights[i].compare_exchange_strong(expected, -1);
}
2019-06-26 22:48:00 +02:00
m_waitForBlock.wakeAll();
2019-04-11 14:59:14 +02:00
}
2019-03-30 14:26:00 +01:00
}
2019-12-12 14:49:46 +01:00
else if (message.messageId() == Api::BlockChain::GetBlockCountReply) {
Streaming::MessageParser parser(message.body());
while (parser.next() == Streaming::FoundTag) {
2019-12-12 16:02:08 +01:00
if (parser.tag() == Api::BlockChain::BlockHeight) {
const int tipHeight = parser.intData();
2020-01-15 12:16:56 +01:00
m_bestBlockHeight.store(tipHeight);
requestBlock(tipHeight);
2019-12-12 16:02:08 +01:00
}
2019-12-12 14:49:46 +01:00
}
}
2019-03-30 14:26:00 +01:00
}
2019-04-10 22:17:13 +02:00
else if (message.serviceId() == Api::APIService) {
if (message.messageId() == Api::Meta::VersionReply) {
Streaming::MessageParser parser(message.body());
while (parser.next() == Streaming::FoundTag) {
2019-06-11 17:28:16 +02:00
if (parser.tag() == Api::Meta::GenericByteData) {
2019-08-16 23:39:40 +02:00
logCritical() << "Server is at version" << parser.stringData();
if (parser.stringData().compare("Flowee:1 (2019-9.1)") < 0) {
logFatal() << " Hub server is too old";
m_network.punishNode(message.remote, 1000); // instant disconnect.
}
2019-06-11 17:28:16 +02:00
}
2019-04-10 22:17:13 +02:00
}
}
2019-04-10 22:17:13 +02:00
else if (message.messageId() == Api::Meta::CommandFailed) {
Streaming::MessageParser parser(message.body());
int serviceId = -1;
int messageId = -1;
std::string failedReason;
2019-04-10 22:17:13 +02:00
while (parser.next() == Streaming::FoundTag) {
if (parser.tag() == Api::Meta::FailedCommandServiceId)
serviceId = parser.intData();
else if (parser.tag() == Api::Meta::FailedCommandId)
messageId = parser.intData();
2019-04-11 14:59:14 +02:00
else if (parser.tag() == Api::Meta::FailedReason)
failedReason = parser.stringData();
2019-04-10 22:17:13 +02:00
}
if (serviceId == Api::BlockChainService && messageId == Api::BlockChain::GetBlock) {
if (m_lastRequestedBlock > m_bestBlockHeight.load())
logInfo().nospace() << "Reached top of chain (" << m_bestBlockHeight.load() << ")";
else
logWarning() << "Failed to get block, hub didn't have it." << m_lastRequestedBlock;
} else {
logWarning() << "Hub reported failure" << serviceId << messageId << failedReason;
2019-04-10 22:17:13 +02:00
}
}
}
else if (message.serviceId() == Api::BlockNotificationService && message.messageId() == Api::BlockNotification::NewBlockOnChain) {
Streaming::MessageParser parser(message.body());
while (parser.next() == Streaming::FoundTag) {
2019-12-12 14:49:46 +01:00
if (parser.tag() == Api::BlockNotification::BlockHeight) {
m_bestBlockHeight.store(parser.intData());
2019-08-13 16:19:31 +02:00
requestBlock(parser.intData());
2019-12-12 14:49:46 +01:00
}
}
}
2019-03-30 14:26:00 +01:00
else {
Streaming::MessageParser::debugMessage(message);
}
}