/* * This file is part of the Flowee project * Copyright (C) 2019-2020 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "Blockchain_p.h" #include #include #include #include #include #include #include #include namespace { Blockchain::Transaction fillTx(Streaming::MessageParser &parser, const Blockchain::Job &job, int jobId) { Blockchain::Transaction tx; tx.jobId = jobId; tx.blockHeight = job.intData; tx.offsetInBlock = job.intData2; tx.txid = job.data; while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::TxId) tx.txid = parser.bytesDataBuffer(); else if (parser.tag() == Api::BlockHeight) tx.blockHeight = parser.intData(); else if (parser.tag() == Api::OffsetInBlock) tx.offsetInBlock = parser.intData(); else if (parser.tag() == Api::GenericByteData) tx.fullTxData = parser.bytesDataBuffer(); else if (parser.tag() == Api::BlockChain::Tx_IN_TxId) { tx.inputs.resize(tx.inputs.size() + 1); tx.inputs.back().prevTxId = parser.bytesDataBuffer(); } else if (parser.tag() == Api::BlockChain::Tx_InputScript) { if (tx.inputs.empty()) // needed because for coinbase there is no Tx_IN_TxId tx.inputs.resize(1); tx.inputs.back().inputScript = parser.bytesDataBuffer(); } else if (parser.tag() == Api::BlockChain::Tx_IN_OutIndex) { assert (tx.inputs.size() != 0); tx.inputs.back().outIndex = parser.intData(); } else if (parser.tag() == Api::BlockChain::Tx_Out_Index) { tx.outputs.resize(tx.outputs.size() + 1); assert(parser.intData() < 0xFFFF); tx.outputs.back().index = static_cast(parser.intData()); } else if (parser.tag() == Api::BlockChain::Tx_Out_Amount) { assert (tx.outputs.size() != 0); tx.outputs.back().amount = parser.longData(); } else if (parser.tag() == Api::BlockChain::Tx_OutputScript) { assert (tx.outputs.size() != 0); tx.outputs.back().outScript = parser.bytesDataBuffer(); tx.outputs.back().type = Blockchain::Output::FullScript; } else if (parser.tag() == Api::BlockChain::Tx_Out_Address) { assert (tx.outputs.size() != 0); tx.outputs.back().outScript = parser.bytesDataBuffer(); tx.outputs.back().type = Blockchain::Output::OnlyAddress; } else if (parser.tag() == Api::BlockChain::GenericByteData) { tx.fullTxData = parser.bytesDataBuffer(); } else if (parser.tag() == Api::Separator) { break; } } if (tx.txid.isEmpty() && !tx.fullTxData.isEmpty()) { // then lets fill it. Streaming::BufferPool pool(32); Tx fullTx(tx.fullTxData); auto hash = fullTx.createHash(); memcpy(pool.begin(), hash.begin(), 32); tx.txid = pool.commit(32); } return tx; } void addIncludeRequests(Streaming::MessageBuilder &builder, uint32_t transactionFilters) { if (transactionFilters & Blockchain::IncludeInputs) builder.add(Api::BlockChain::Include_Inputs, true); builder.add(Api::BlockChain::Include_TxId, (transactionFilters & Blockchain::IncludeTxId) != 0); builder.add(Api::BlockChain::FullTransactionData, (transactionFilters & Blockchain::IncludeFullTransactionData) != 0); if (transactionFilters & Blockchain::IncludeOutputs) builder.add(Api::BlockChain::Include_Outputs, true); if (transactionFilters & Blockchain::IncludeOutputAmounts) builder.add(Api::BlockChain::Include_OutputAmounts, true); if (transactionFilters & Blockchain::IncludeOutputScripts) builder.add(Api::BlockChain::Include_OutputScripts, true); if (transactionFilters & Blockchain::IncludeOutputAddresses) builder.add(Api::BlockChain::Include_OutputAddresses, true); } } Blockchain::ServiceUnavailableException::ServiceUnavailableException(const char *error, Blockchain::Service service) : std::runtime_error(error), m_service(service) { } Blockchain::Search::~Search() { if (policy) policy->searchFinished(this); } Blockchain::SearchEngine::SearchEngine() : d(new SearchEnginePrivate(this)) { } Blockchain::SearchEngine::~SearchEngine() { delete d; } void Blockchain::SearchEngine::start(Blockchain::Search *request) { request->policy = d->txPolicy; { std::lock_guard lock(d->lock); request->requestId = d->nextRequestId++; d->searchers.insert(std::make_pair(request->requestId, request)); } request->policy->processRequests(request); } void Blockchain::SearchEngine::addIndexer(const EndPoint &ep) { auto connection = d->network.connection(ep); if (!connection.isValid()) throw std::runtime_error("Invalid Endpoint, can't create Indexer connection"); connection.setOnConnected(std::bind(&SearchEnginePrivate::indexerConnected, d, std::placeholders::_1)); connection.setOnDisconnected(std::bind(&SearchEnginePrivate::indexerDisconnected, d, std::placeholders::_1)); connection.setOnIncomingMessage(std::bind(&SearchEnginePrivate::indexerSentMessage, d, std::placeholders::_1)); d->connections.resize(d->connections.size() + 1); SearchEnginePrivate::Connection &c = d->connections.back(); c.con = std::move(connection); c.con.connect(); } void Blockchain::SearchEngine::addHub(const EndPoint &ep) { auto connection = d->network.connection(ep); if (!connection.isValid()) throw std::runtime_error("Invalid Endpoint, can't create Hub connection"); connection.setOnConnected(std::bind(&SearchEnginePrivate::hubConnected, d, std::placeholders::_1)); connection.setOnDisconnected(std::bind(&SearchEnginePrivate::hubDisconnected, d, std::placeholders::_1)); connection.setOnIncomingMessage(std::bind(&SearchEnginePrivate::hubSentMessage, d, std::placeholders::_1)); d->connections.resize(d->connections.size() + 1); SearchEnginePrivate::Connection &c = d->connections.back(); c.con = std::move(connection); c.con.setMessageQueueSizes(5000, 1); c.con.connect(); } void Blockchain::SearchEngine::setConfigFile(const std::string &configFile) { d->configFile = configFile; reparseConfig(); } void Blockchain::SearchEngine::parseConfig(const std::string &confFile) { // intentionally left empty } void Blockchain::SearchEngine::reparseConfig() { d->findServices(); parseConfig(d->configFile); } Streaming::BufferPool &Blockchain::SearchEngine::poolForThread() { return *d->pool(); } void Blockchain::SearchEngine::sendMessage(const Message &message, Blockchain::Service service) { d->sendMessage(message, service); } bool Blockchain::SearchEngine::isHubConnected() const { for (auto iter = d->connections.begin(); iter != d->connections.end(); ++iter) { if (iter->services.find(TheHub) != iter->services.end()) return true; } return false; } bool Blockchain::SearchEngine::isIndexerConnected() const { for (auto iter = d->connections.begin(); iter != d->connections.end(); ++iter) { if (iter->services.find(IndexerTxIdDb) != iter->services.end()) return true; } return false; } Blockchain::SearchEnginePrivate::SearchEnginePrivate(SearchEngine *q) : network(workers.ioService()), nextRequestId(1), q(q) { txPolicy = new SearchPolicy(this); } void Blockchain::SearchEnginePrivate::findServices() { logInfo(Log::SearchEngine) << "parsing config" << configFile; boost::filesystem::ifstream streamConfig(configFile); if (!streamConfig.good()) return; // No conf file is OK std::set setOptions; setOptions.insert("*"); using namespace boost::program_options; EndPoint ep; for (detail::config_file_iterator it(streamConfig, setOptions), end; it != end; ++it) { if (it->string_key == "services.indexer" && !it->value.empty()) { std::vector indexers; boost::split(indexers, it->value[0], boost::is_any_of(" \t;,")); for (auto indexer : indexers) { try { ep.announcePort = 1234; SplitHostPort(indexer, ep.announcePort, ep.hostname); if (!network.connection(ep, NetworkManager::OnlyExisting).isValid()) q->addIndexer(ep); } catch (const std::exception &e) { logCritical(Log::SearchEngine) << "Connecting to" << indexer << ep.announcePort << "failed with:" << e; } } } else if (it->string_key == "services.hub" && !it->value.empty()) { std::vector hubs; boost::split(hubs, it->value[0], boost::is_any_of(" \t;,")); for (auto hub : hubs) { try { ep.announcePort = 1235; SplitHostPort(hub, ep.announcePort, ep.hostname); if (!network.connection(ep, NetworkManager::OnlyExisting).isValid()) q->addHub(ep); } catch (const std::exception &e) { logCritical(Log::SearchEngine) << "Connecting to" << hub << ep.announcePort << "failed with:" << e; } } } } } void Blockchain::SearchEnginePrivate::hubConnected(const EndPoint &ep) { logDebug(Log::SearchEngine); auto con = network.connection(ep); con.send(Message(Api::APIService, Api::Meta::Version)); } void Blockchain::SearchEnginePrivate::hubDisconnected(const EndPoint &ep) { // TODO unset flag in connections logDebug(Log::SearchEngine); q->hubDisconnected(); } void Blockchain::SearchEnginePrivate::hubSentMessage(const Message &message) { const int id = message.headerInt(SearchRequestId); if (id > 0) { logDebug(Log::SearchEngine) << "Received hub message for search:" << id; std::lock_guard lock_(lock); auto searcher = searchers.find(id); if (searcher != searchers.end()) { assert(searcher->second->policy); searcher->second->dataAdded(message); searcher->second->policy->parseMessageFromHub(searcher->second, message); } else logDebug() << "No searcher matching the job"; return; } if (message.serviceId() == Api::APIService && message.messageId() == Api::Meta::VersionReply) { Streaming::MessageParser parser(message); std::string hubId; while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::GenericByteData) { hubId = parser.stringData(); logCritical(Log::SearchEngine) << " Upstream hub connected" << hubId; if (parser.stringData().compare("Flowee:1 (2019-9.1)") < 0) { logFatal() << " Hub is too old, not using"; return; } break; } } // find connection in connections and set flag that this is a known hub for (auto iter = connections.begin(); iter != connections.end(); ++iter) { if (iter->con.connectionId() == message.remote) { iter->services.insert(TheHub); break; } } // then as last thing, let our subclasses know; q->initializeHubConnection(network.connection(network.endPoint(message.remote)), hubId); return; } q->hubSentMessage(message); } void Blockchain::SearchEnginePrivate::indexerConnected(const EndPoint &ep) { logDebug(Log::SearchEngine); auto con = network.connection(ep); con.send(Message(Api::IndexerService, Api::Indexer::GetAvailableIndexers)); } void Blockchain::SearchEnginePrivate::indexerDisconnected(const EndPoint &) { // TODO unset flag in connections logDebug(Log::SearchEngine); q->indexerDisconnected(); } void Blockchain::SearchEnginePrivate::indexerSentMessage(const Message &message) { logDebug(Log::SearchEngine); const int id = message.headerInt(SearchRequestId); if (id > 0) { std::lock_guard lock_(lock); auto searcher = searchers.find(id); if (searcher != searchers.end()) { searcher->second->dataAdded(message); searcher->second->policy->parseMessageFromIndexer(searcher->second, message); } return; } // parse message on which indexers the indexer has. if (message.serviceId() == Api::IndexerService && message.messageId() == Api::Indexer::GetAvailableIndexersReply) { bool hasTxId = false, hasSpent = false, hasAddress = false; Streaming::MessageParser p(message); while (p.next() == Streaming::FoundTag) { if (p.tag() == Api::Indexer::AddressIndexer) { hasAddress = true; logInfo(Log::SearchEngine) << "Indexer 'address' available:" << p.boolData(); } else if (p.tag() == Api::Indexer::TxIdIndexer) { hasTxId = true; logInfo(Log::SearchEngine) << "Indexer 'TxID' available:" << p.boolData(); } else if (p.tag() == Api::Indexer::SpentOutputIndexer){ hasSpent = true; logInfo(Log::SearchEngine) << "Indexer 'Spent' available:" << p.boolData(); } } for (auto iter = connections.begin(); iter != connections.end(); ++iter) { if (iter->con.connectionId() == message.remote) { if (hasAddress) iter->services.insert(IndexerAddressDb); if (hasTxId) iter->services.insert(IndexerTxIdDb); if (hasSpent) iter->services.insert(IndexerSpentDb); break; } } std::set services; if (hasAddress) services.insert(IndexerAddressDb); if (hasTxId) services.insert(IndexerTxIdDb); if (hasSpent) services.insert(IndexerSpentDb); q->initializeIndexerConnection(network.connection(network.endPoint(message.remote)), services); return; } q->indexerSentMessage(message); } void Blockchain::SearchEnginePrivate::sendMessage(const Message &message, Blockchain::Service service) { auto iter = connections.begin(); while (iter != connections.end()) { if (iter->services.find(service) != iter->services.end()) { iter->con.send(message); return; } ++iter; } throw std::runtime_error("Backing service not connected"); } void Blockchain::SearchEnginePrivate::searchFinished(Blockchain::Search *searcher) { std::lock_guard lock_(lock); auto iter = searchers.find(searcher->requestId); if (iter != searchers.end()) searchers.erase(iter); } Streaming::BufferPool *Blockchain::SearchEnginePrivate::pool() { if (!pools.get()) pools.reset(new Streaming::BufferPool(1E6)); return pools.get(); } void Blockchain::SearchPolicy::parseMessageFromHub(Search *request, const Message &message) { const int jobId = message.headerInt(JobRequestId); logDebug(Log::SearchEngine) << " " << jobId; Streaming::MessageParser parser(message); { // jobsLock scope std::lock_guard lock(request->jobsLock); if (jobId < 0 || request->jobs.size() <= jobId) { logDebug(Log::SearchEngine) << "Hub message refers to non existing job Id"; return; } Job &job = request->jobs[jobId]; job.finished = true; if (message.serviceId() == Api::BlockChainService) { if (message.messageId() == Api::BlockChain::GetTransactionReply) { request->answer.push_back(fillTx(parser, job, jobId)); if (!request->answer.back().txid.isEmpty()) request->transactionMap.insert(std::make_pair(uint256(request->answer.back().txid.begin()), request->answer.size() - 1)); request->transactionAdded(request->answer.back()); } else if (message.messageId() == Api::BlockChain::GetBlockHeaderReply) { BlockHeader header; while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::BlockChain::BlockHash) header.hash = parser.bytesDataBuffer(); else if (parser.tag() == Api::BlockChain::Confirmations) header.confirmations = parser.intData(); else if (parser.tag() == Api::BlockChain::BlockHeight) header.height = parser.intData(); else if (parser.tag() == Api::BlockChain::Version) header.version = static_cast(parser.longData()); else if (parser.tag() == Api::BlockChain::MerkleRoot) header.merkleRoot = parser.bytesDataBuffer(); else if (parser.tag() == Api::BlockChain::Time) header.time = static_cast(parser.longData()); else if (parser.tag() == Api::BlockChain::MedianTime) header.medianTime = static_cast(parser.longData()); else if (parser.tag() == Api::BlockChain::Nonce) header.nonce = static_cast(parser.longData()); else if (parser.tag() == Api::BlockChain::Bits) header.bits = static_cast(parser.longData()); else if (parser.tag() == Api::BlockChain::Difficulty) header.difficulty = parser.doubleData(); } if (header.height > 0) request->blockHeaders.insert(std::make_pair(header.height, header)); } else if (message.messageId() == Api::BlockChain::GetBlockReply) { while (true) { bool more; parser.peekNext(&more); if (!more) break; request->answer.push_back(fillTx(parser, job, jobId)); request->transactionAdded(request->answer.back()); } } else { logDebug(Log::SearchEngine) << "Unknown message from Hub" << message.serviceId() << message.messageId(); } } } // jobsLock scope if (message.serviceId() == Api::LiveTransactionService) { // TODO also implement error reporting from the API module. // things like "OffsetInBlock larger than block" get reported there. if (message.messageId() == Api::LiveTransactions::IsUnspentReply) { int blockHeight = -1; int offsetInBlock = -1; int outIndex = -1; int64_t amount = -1; Streaming::ConstBuffer outputScript; bool unspent = false; while (parser.next() == Streaming::FoundTag) { switch (parser.tag()) { case Api::LiveTransactions::BlockHeight: blockHeight = parser.intData(); break; case Api::LiveTransactions::OffsetInBlock: offsetInBlock = parser.intData(); break; case Api::LiveTransactions::UnspentState: unspent = parser.boolData(); break; case Api::LiveTransactions::OutIndex: outIndex = parser.intData(); break; case Api::LiveTransactions::Amount: amount = int64_t(parser.longData()); break; case Api::LiveTransactions::OutputScript: outputScript = parser.bytesDataBuffer(); break; } } request->utxoLookup(jobId, blockHeight, offsetInBlock, outIndex, unspent, amount, outputScript); } } else if (message.serviceId() != Api::BlockChainService) { logDebug(Log::SearchEngine) << "Unknown message from Hub" << message.serviceId() << message.messageId(); } processRequests(request); } void Blockchain::SearchPolicy::parseMessageFromIndexer(Search *request, const Message &message) { const int jobId = message.headerInt(JobRequestId); logDebug(Log::SearchEngine) << " " << jobId; { // jobslock scope std::lock_guard lock(request->jobsLock); if (jobId < 0 || request->jobs.size() <= jobId) { logDebug(Log::SearchEngine) << "Indexer message refers to non existing job Id"; return; } Job &job = request->jobs[jobId]; job.finished = true; Streaming::MessageParser parser(message); if (message.messageId() == Api::Indexer::FindTransactionReply || message.messageId() == Api::Indexer::FindSpentOutputReply) { int height = 0; int offsetInBlock = 0; while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::BlockHeight) height = parser.intData(); else if (parser.tag() == Api::OffsetInBlock) offsetInBlock = parser.intData(); } if (height != -1) { // only update jobs when we actually found the thing we were looking for. updateJob(job.nextJobId, request, job.data, height, offsetInBlock); updateJob(job.nextJobId2, request, job.data, height, offsetInBlock); } if (message.messageId() == Api::Indexer::FindTransactionReply) request->txIdResolved(jobId, height, offsetInBlock); else request->spentOutputResolved(jobId, height, offsetInBlock); } else if (message.messageId() == Api::Indexer::FindAddressReply) { int blockHeight = -1, offsetInBlock = 0, outIndex = -1; while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::Indexer::BlockHeight) blockHeight = parser.intData(); else if (parser.tag() == Api::Indexer::OffsetInBlock) offsetInBlock = parser.intData(); else if (parser.tag() == Api::Indexer::OutIndex) { outIndex = parser.intData(); // TODO process. request->addressUsedInOutput(blockHeight, offsetInBlock, outIndex); } } } else { logDebug(Log::SearchEngine) << "Unknown message from Indexer"; } } // jobslock scope processRequests(request); } void Blockchain::SearchPolicy::processRequests(Blockchain::Search *request) { int jobsInFlight = 0; int jobsWaiting = 0; Streaming::BufferPool *pool = m_owner->pool(); { // jobsLock scope std::lock_guard lock(request->jobsLock); for (size_t i = 0; i < request->jobs.size(); ++i) { Job &job = request->jobs.at(i); if (job.finished) continue; if (job.started) { jobsInFlight++; continue; } try { switch (job.type) { case Blockchain::Unset: throw std::runtime_error("Invalid job definition"); case Blockchain::FetchUTXOUnspent: case Blockchain::FetchUTXODetails: { if (job.data.size() != 32 && (job.intData <= 0 || job.intData2 <= 0)) throw std::runtime_error("Invalid job definition"); pool->reserve(60); Streaming::MessageBuilder builder(*pool, Streaming::HeaderAndBody); builder.add(Network::ServiceId, Api::LiveTransactionService); builder.add(Network::MessageId, (job.type == Blockchain::FetchUTXODetails) ? Api::LiveTransactions::GetUnspentOutput : Api::LiveTransactions::IsUnspent); builder.add(SearchRequestId, request->requestId); builder.add(JobRequestId, i); builder.add(Network::HeaderEnd, true); // now decide if I send blockheight/offset or txid if (job.data.size() == 32) { builder.add(Api::TxId, job.data); builder.add(Api::LiveTransactions::OutIndex, job.intData); } else { builder.add(Api::BlockHeight, job.intData); builder.add(Api::OffsetInBlock, job.intData2); builder.add(Api::LiveTransactions::OutIndex, job.intData3); } job.started = true; sendMessage(request, builder.message(), TheHub); break; } case Blockchain::LookupTxById: { if (job.data.size() != 32) throw std::runtime_error("Invalid job definition"); logDebug(Log::SearchEngine) << "starting lookup (txid)" << i; pool->reserve(50); Streaming::MessageBuilder builder(*pool, Streaming::HeaderAndBody); builder.add(Network::ServiceId, Api::IndexerService); builder.add(Network::MessageId, Api::Indexer::FindTransaction); builder.add(SearchRequestId, request->requestId); builder.add(JobRequestId, i); builder.add(Network::HeaderEnd, true); builder.add(Api::Indexer::TxId, job.data); job.started = true; sendMessage(request, builder.message(), IndexerTxIdDb); break; } case Blockchain::LookupByAddress: { if (job.data.size() != 32) // expect a sha256 hash of the outputscript here throw std::runtime_error("Invalid job definition"); logDebug(Log::SearchEngine) << "starting lookup (address)" << i; pool->reserve(40); Streaming::MessageBuilder builder(*pool, Streaming::HeaderAndBody); builder.add(Network::ServiceId, Api::IndexerService); builder.add(Network::MessageId, Api::Indexer::FindAddress); builder.add(SearchRequestId, request->requestId); builder.add(JobRequestId, i); builder.add(Network::HeaderEnd, true); builder.add(Api::Indexer::BitcoinScriptHashed, job.data); job.started = true; sendMessage(request, builder.message(), IndexerAddressDb); break; } case Blockchain::LookupSpentTx: { if (job.data.size() != 32 || job.intData == -1) // expect a sha256 & outIndex here throw std::runtime_error("Invalid job definition"); logDebug(Log::SearchEngine) << "starting lookup (spentTx)" << i; pool->reserve(40); Streaming::MessageBuilder builder(*pool, Streaming::HeaderAndBody); builder.add(Network::ServiceId, Api::IndexerService); builder.add(Network::MessageId, Api::Indexer::FindSpentOutput); builder.add(SearchRequestId, request->requestId); builder.add(JobRequestId, i); builder.add(Network::HeaderEnd, true); builder.add(Api::Indexer::TxId, job.data); builder.add(Api::Indexer::OutIndex, job.intData); job.started = true; sendMessage(request, builder.message(), IndexerSpentDb); break; } case Blockchain::FetchTx: if (job.intData && job.intData2) { job.started = true; logDebug(Log::SearchEngine) << "starting fetch TX" << i; // simple, we just send the message. pool->reserve(40); Streaming::MessageBuilder builder(*pool, Streaming::HeaderAndBody); builder.add(Network::ServiceId, Api::BlockChainService); builder.add(Network::MessageId, Api::BlockChain::GetTransaction); builder.add(SearchRequestId, request->requestId); builder.add(JobRequestId, i); builder.add(Network::HeaderEnd, true); builder.add(Api::BlockChain::BlockHeight, job.intData); builder.add(Api::BlockChain::Tx_OffsetInBlock, job.intData2); addIncludeRequests(builder, job.transactionFilters); sendMessage(request, builder.message(), TheHub); } else if (job.data.size() == 32) { logDebug(Log::SearchEngine) << "Creating two new jobs to lookup and then fetch a TX"; job.finished = job.started = true; // first need to do a lookupByTxId Job lookupJob; lookupJob.type = LookupTxById; lookupJob.data = job.data; lookupJob.nextJobId = job.nextJobId; lookupJob.nextJobId2 = static_cast(request->jobs.size() + 1); request->jobs.push_back(lookupJob); Job fetchTxJob; fetchTxJob.type = Blockchain::FetchTx; fetchTxJob.transactionFilters = job.transactionFilters; request->jobs.push_back(fetchTxJob); } else jobsWaiting++; // Waiting for data break; case Blockchain::FetchBlockHeader: { if (job.data.size() != 32 && !job.intData) { // Waiting for data jobsWaiting++; continue; } job.started = true; logDebug(Log::SearchEngine) << "starting fetching of block header" << i; pool->reserve(60); Streaming::MessageBuilder builder(*pool, Streaming::HeaderAndBody); builder.add(Network::ServiceId, Api::BlockChainService); builder.add(Network::MessageId, Api::BlockChain::GetBlockHeader); builder.add(SearchRequestId, request->requestId); builder.add(JobRequestId, i); builder.add(Network::HeaderEnd, true); if (job.intData) builder.add(Api::BlockChain::BlockHeight, job.intData); else builder.add(Api::BlockChain::BlockHash, job.data); sendMessage(request, builder.message(), TheHub); break; } case Blockchain::FetchBlockOfTx: if (job.data.size() != 32 && !job.intData) { // Waiting for data jobsWaiting++; continue; } job.started = true; logDebug(Log::SearchEngine) << "starting fetching of block" << i; pool->reserve(60); Streaming::MessageBuilder builder(*pool, Streaming::HeaderAndBody); builder.add(Network::ServiceId, Api::BlockChainService); builder.add(Network::MessageId, Api::BlockChain::GetBlock); builder.add(SearchRequestId, request->requestId); builder.add(JobRequestId, i); builder.add(Network::HeaderEnd, true); if (job.intData) builder.add(Api::BlockChain::BlockHeight, job.intData); else builder.add(Api::BlockChain::BlockHash, job.data); addIncludeRequests(builder, job.transactionFilters); sendMessage(request, builder.message(), TheHub); break; } } catch (std::exception &e) { logCritical(Log::SearchEngine) << "Job processing failed due to" << e; job.started = true; job.finished = true; } if (job.started) jobsInFlight++; } } // jobsLock scope if (jobsInFlight == 0) request->finished(jobsWaiting); } void Blockchain::SearchPolicy::searchFinished(Blockchain::Search *request) { m_owner->searchFinished(request); } void Blockchain::SearchPolicy::sendMessage(Blockchain::Search *request, Message message, Blockchain::Service service) { if (!message.hasHeader()) message.setHeaderInt(SearchRequestId, request->requestId); m_owner->sendMessage(message, service); } void Blockchain::SearchPolicy::updateJob(int jobIndex, Search *request, const Streaming::ConstBuffer &data, int intData1, int intData2) { // assumes the jobsLock is locked by caller if (jobIndex == -1) return; assert(jobIndex >= 0); const size_t index = static_cast(jobIndex); assert(index < request->jobs.size()); Job &ref = request->jobs[index]; ref.intData = intData1; ref.intData2 = intData2; ref.data = data; }