/* * This file is part of the Flowee project * Copyright (C) 2019-2021 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "Engine.h" #include "ContextData.h" #include #include Engine::Engine(ContextData *cd) : m_verbosity(Log::DebugLevel), m_cd(cd) { assert(m_cd); setLogLevel(Log::FatalLevel); // by default we are silent. } void Engine::hubSentMessage(const Message &message) { m_cd->messageFromHub(message); } void Engine::indexerSentMessage(const Message &message) { if (message.serviceId() == Api::IndexerService && message.messageId() == Api::Indexer::GetIndexerLastBlockReply) { Streaming::MessageParser parser(message.body()); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::BlockHeight) { logInfo() << "Indexer is at block-height:" << parser.intData(); return; } } } } bool Engine::unsubscribe(Streaming::ConstBuffer addressHash) { std::lock_guard lock(m_subscribeAddressesLock); auto iter = m_subscribedAddresses.begin(); while (iter != m_subscribedAddresses.end()) { if (iter->bitcoinScriptHashed == addressHash) { m_subscribedAddresses.erase(iter); // so, we have it, then the Hub has it too. Lets tell them to remove it. if (isHubConnected()) { Streaming::MessageBuilder builder(poolForThread(40)); builder.add(Api::AddressMonitor::BitcoinScriptHashed, addressHash); sendMessage(builder.message(Api::AddressMonitorService, Api::AddressMonitor::Unsubscribe), Blockchain::TheHub); } return true; } ++iter; } return false; } void Engine::subscribe(const std::vector &addresses) { if (addresses.empty()) return; const bool sendNow = isHubConnected(); std::lock_guard lock(m_subscribeAddressesLock); Streaming::MessageBuilder builder(poolForThread(40 * addresses.size())); for (const auto &ad : addresses) { m_subscribedAddresses.push_back(ad); if (sendNow) builder.add(Api::AddressMonitor::BitcoinScriptHashed, ad.bitcoinScriptHashed); } if (sendNow) { sendMessage(builder.message(Api::AddressMonitorService, Api::AddressMonitor::Subscribe), Blockchain::TheHub); } } std::string Engine::addressForScriptHash(const Streaming::ConstBuffer &buffer) const { std::lock_guard lock(m_subscribeAddressesLock); auto iter = m_subscribedAddresses.begin(); while (iter != m_subscribedAddresses.end()) { if (iter->bitcoinScriptHashed == buffer) { return iter->origRequest; } ++iter; } return std::string(); } void Engine::initializeHubConnection(NetworkConnection connection, const std::string &hubVersion) { // first re-subscribe the addresses to the AddressMonitorService std::lock_guard lock(m_subscribeAddressesLock); for (auto i = m_subscribedAddresses.begin(); i != m_subscribedAddresses.end(); ++i) { Streaming::MessageBuilder builder(poolForThread(40)); builder.add(Api::AddressMonitor::BitcoinScriptHashed, i->bitcoinScriptHashed); connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::Subscribe)); } if (m_listenToBlockUpdates) { sendMessage(Message(Api::BlockNotificationService, Api::BlockNotification::Subscribe), Blockchain::TheHub); } const bool sent = m_cd->hubConnected(hubVersion); if (m_startedIndexerConnection && isIndexerConnected() || !sent && !m_startedIndexerConnection) m_cd->startAllConnectedCallbacks(); } void Engine::initializeIndexerConnection(NetworkConnection connection, const std::set &services) { Message message(Api::IndexerService, Api::Indexer::GetIndexerLastBlock); connection.send(message); bool sent = m_cd->indexerConnected(services); if (m_startedHubConnection && isHubConnected() || !sent && !m_startedHubConnection) m_cd->startAllConnectedCallbacks(); } void Engine::hubDisconnected() { // TODO maybe notify JS? } void Engine::indexerDisconnected() { // TODO maybe notify JS? } void Engine::setLogLevel(Log::Verbosity verbosity) { if (verbosity == m_verbosity) return; m_verbosity = verbosity; auto *logger = Log::Manager::instance(); logger->clearLogLevels(verbosity); } Log::Verbosity Engine::logLevel() const { return m_verbosity; } void Engine::connect(const std::string &hostname, Net net) { EndPoint ep(hostname, (net == Mainnet) ? 1235: 21235); if (!m_startedHubConnection) { m_startedHubConnection = true; addHub(ep); } if (!m_startedIndexerConnection) { ep.peerPort = ep.announcePort = (net == Mainnet) ? 1234 : 21234; addIndexer(ep); m_startedIndexerConnection = true; } } void Engine::connectHub(const std::string &hostname, int port) { assert(port >= 0 && port <= 0xFFFF); if (!m_startedHubConnection) { addHub(EndPoint(hostname, static_cast(port))); m_startedHubConnection = true; } } void Engine::connectIndexer(const std::string &hostname, int port) { assert(port >= 0 && port <= 0xFFFF); if (!m_startedIndexerConnection) { addIndexer(EndPoint(hostname, static_cast(port))); m_startedIndexerConnection = true; } } void Engine::setListenToBlockUpdates(bool on) { if (on == m_listenToBlockUpdates) return; m_listenToBlockUpdates = on; if (isHubConnected()) { // subscribe if we need sendMessage(Message(Api::BlockNotificationService, m_listenToBlockUpdates ? Api::BlockNotification::Subscribe : Api::BlockNotification::Unsubscribe), Blockchain::TheHub); } }