/* * This file is part of the Flowee project * Copyright (C) 2019-2020 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ // #undef NDEBUG // make assert do something #include "ContextData.h" #include "Search.h" #include #include #include namespace { // dummy 'getter' for write-only properties Napi::Value returnValue(const Napi::CallbackInfo &) { return Napi::Value(); } enum ApiOption { Required, NotRequired }; bool arg(const Napi::CallbackInfo &info, size_t index, std::string &newValue, ApiOption required = NotRequired) { if (info.Length() > index) { if (!info[index].IsString()) { Napi::TypeError::New(info.Env(), "Wrong arguments").ThrowAsJavaScriptException(); return false; } newValue = info[index].ToString(); return true; } return required == NotRequired; } bool arg(const Napi::CallbackInfo &info, size_t index, int &newValue, ApiOption required = NotRequired) { if (info.Length() > index) { if (!info[index].IsNumber()) { Napi::TypeError::New(info.Env(), "Wrong arguments").ThrowAsJavaScriptException(); return false; } newValue = info[index].ToNumber().Int32Value(); return true; } return required == NotRequired; } Napi::Value setOnConnectedHub(const Napi::CallbackInfo &info) { assert(info.Length() == 1); if (!info[0].IsFunction()) { Napi::TypeError::New(info.Env(), "Expected function argument").ThrowAsJavaScriptException(); return Napi::Value(); } ContextData *data = reinterpret_cast(info.Data()); assert(data); if (data->m_onHubConnect.present && data->m_onHubConnect.acquired) data->m_onHubConnect.f.Release(); data->m_onHubConnect.f = Napi::ThreadSafeFunction::New(info.Env(), info[0].As(), "", 0 /* Unlimited queue */, 1 /* using threads */); data->m_onHubConnect.present = true; return Napi::Value(); } Napi::Value setOnConnectIndexer(const Napi::CallbackInfo &info) { assert(info.Length() == 1); if (!info[0].IsFunction()) { Napi::TypeError::New(info.Env(), "Expected function argument").ThrowAsJavaScriptException(); return Napi::Value(); } ContextData *data = reinterpret_cast(info.Data()); assert(data); if (data->m_onIndexerConnect.present && data->m_onIndexerConnect.acquired) data->m_onIndexerConnect.f.Release(); data->m_onIndexerConnect.f = Napi::ThreadSafeFunction::New(info.Env(), info[0].As(), "", 0 /* Unlimited queue */, 1 /* using threads */); data->m_onIndexerConnect.present = true; return Napi::Value(); } Napi::Value setOnConnected(const Napi::CallbackInfo &info) { assert(info.Length() == 1); if (!info[0].IsFunction()) { Napi::TypeError::New(info.Env(), "Expected function argument").ThrowAsJavaScriptException(); return Napi::Value(); } ContextData *data = reinterpret_cast(info.Data()); assert(data); if (data->m_onAllConnected.present && data->m_onAllConnected.acquired) data->m_onAllConnected.f.Release(); data->m_onAllConnected.f = Napi::ThreadSafeFunction::New(info.Env(), info[0].As(), "", 0 /* Unlimited queue */, 1 /* using threads */); data->m_onAllConnected.present = true; return Napi::Value(); } Napi::Value setOnAddressChanged(const Napi::CallbackInfo &info) { assert(info.Length() == 1); if (!info[0].IsFunction()) { Napi::TypeError::New(info.Env(), "Expected function argument").ThrowAsJavaScriptException(); return Napi::Value(); } ContextData *data = reinterpret_cast(info.Data()); assert(data); if (data->m_addressMonitorCallback.present && data->m_addressMonitorCallback.acquired) data->m_addressMonitorCallback.f.Release(); data->m_addressMonitorCallback.f = Napi::ThreadSafeFunction::New(info.Env(), info[0].As(), "", 0 /* Unlimited queue */, 1 /* using threads */); data->m_addressMonitorCallback.present = true; return Napi::Value(); } Napi::Value contextData_sendMessage(const Napi::CallbackInfo &info) { ContextData *data = reinterpret_cast(info.Data()); assert(data); return data->sendJsMessage(info); } // user called connect() Napi::Value contextData_connect(const Napi::CallbackInfo &info) { ContextData *data = reinterpret_cast(info.Data()); assert(data); std::string hostname = "api.flowee.org"; if (!arg(info, 0, hostname)) return Napi::Value(); return data->connect(info.Env(), hostname); } // user called connectHub() Napi::Value contextData_connectHub(const Napi::CallbackInfo &info) { ContextData *data = reinterpret_cast(info.Data()); assert(data); std::string hostname = "api.flowee.org"; if (!arg(info, 0, hostname)) return Napi::Value(); int port = 1235; if (!arg(info, 1, port)) return Napi::Value(); return data->connectHub(info.Env(), hostname, port); } struct HubVersion { char *floweeVersion = nullptr; ContextData *data = nullptr; }; void contextData_hubConnectedCallback(Napi::Env env, Napi::Function jsCallback, HubVersion *hubVersion) { const Napi::String version = Napi::String::New(env, hubVersion->floweeVersion); jsCallback.Call({ version }); hubVersion->data->m_hubConnectPromise.resolve(version); if (hubVersion->data->isIndexerConnected()) hubVersion->data->m_fullConnectPromise.resolve(env, "ok"); delete hubVersion; } struct IndexerServices { std::set services; ContextData *data = nullptr; }; void contextData_IndexerConnectedCallback(Napi::Env env, Napi::Function jsCallback, IndexerServices *is) { Napi::Array list = Napi::Array::New(env); for (auto service : is->services) { std::string name; switch (service) { case Blockchain::IndexerTxIdDb: name = "txid-db"; break; case Blockchain::IndexerAddressDb: name = "address-db"; break; case Blockchain::IndexerSpentDb: name = "spent-db"; break; default: assert(false); } list[list.Length()] = Napi::String::New(env, name); } jsCallback.Call({ list }); is->data->m_indexerConnectPromise.resolve(list); if (is->data->isHubConnected()) is->data->m_fullConnectPromise.resolve(env, "ok"); delete is; } void contextData_allConnectedCallback(Napi::Env env, Napi::Function jsCallback, void *) { jsCallback.Call({ Napi::String::New(env, "ok") }); } Napi::Value contextData_connectIndexer(const Napi::CallbackInfo &info) { ContextData *data = reinterpret_cast(info.Data()); assert(data); std::string hostname = "api.flowee.org"; if (!arg(info, 0, hostname)) return Napi::Value(); int port = 1234; if (!arg(info, 1, port)) return Napi::Value(); if (port < 0 || port > 0xFFFF) { Napi::TypeError::New(info.Env(), "Port out of range").ThrowAsJavaScriptException(); return Napi::Value(); } return data->connectIndexer(info.Env(), hostname, port); } Napi::Value contextData_startSearch(const Napi::CallbackInfo &info) { ContextData *data = reinterpret_cast(info.Data()); assert(data); return data->startSearch(info); } Napi::Value contextData_fulfillFullConnectPromise(Napi::Env env, Napi::Function, ContextData *data) { data->m_fullConnectPromise.resolve(env, "ok"); data->m_hubConnectPromise.resolve(env, "ok"); data->m_indexerConnectPromise.resolve(env, "ok"); return Napi::Value(); } Napi::Value contextData_sendTransaction(const Napi::CallbackInfo &info) { ContextData *data = reinterpret_cast(info.Data()); assert(data); return data->sendTransaction(info); } Napi::Value contextData_subscribeToAddress(const Napi::CallbackInfo &info) { ContextData *data = reinterpret_cast(info.Data()); assert(data); return data->updateAddressMonitor(info, ContextData::Subscribe); } Napi::Value contextData_unsubscribeAddress(const Napi::CallbackInfo &info) { ContextData *data = reinterpret_cast(info.Data()); assert(data); return data->updateAddressMonitor(info, ContextData::Unsubscribe); } struct NetMessageData { Message message; ContextData *data; }; Napi::Value contextData_handleNetMessageFinished(Napi::Env env, Napi::Function, NetMessageData *data) { data->data->handleMessageForNetPromise(env, data->message); delete data; return Napi::Value(); } Napi::Value contextData_handleAM(Napi::Env env, Napi::Function jsCallback, NetMessageData *data) { logDebug() << "AddressMonitor handling called"; Message message = data->message; Streaming::MessageParser parser(message.body()); Napi::Object arg = Napi::Object::New(env); std::string reason; if (message.messageId() == Api::AddressMonitor::TransactionFound) reason = "transaction-found"; else reason = "double-spend"; Napi::Array bshs = Napi::Array::New(env); Napi::Array addresses = Napi::Array::New(env); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::AddressMonitor::TxId) { arg.Set("txid", Flowee::hashToString(env, parser.bytesDataBuffer())); } else if (parser.tag() == Api::AddressMonitor::BitcoinScriptHashed) { Napi::Value bsh = Flowee::hashToString(env, parser.bytesDataBuffer()); bshs[bshs.Length()] = bsh; Napi::Value address = Napi::String::From(env, data->data->addressForScriptHash(parser.bytesDataBuffer())); addresses[addresses.Length()] = address; if (bshs.Length() <= 1) { arg.Set("address", address); arg.Set("bitcoinScriptHashed", bsh); } } else if (parser.tag() == Api::AddressMonitor::Amount) { arg.Set("amount", Napi::Number::New(env, parser.longData())); } else if (parser.tag() == Api::AddressMonitor::OffsetInBlock) { arg.Set("offsetInBlock", Napi::Number::New(env, parser.intData())); reason = "transaction-mined"; } else if (parser.tag() == Api::AddressMonitor::TransactionData) { arg.Set("doubleSpentTx", Flowee::wrap(env, parser.bytesDataBuffer())); } else if (parser.tag() == Api::AddressMonitor::DoubleSpendProofData) { arg.Set("doubleSpentProof", Flowee::wrap(env, parser.bytesDataBuffer())); } else if (parser.tag() == Api::AddressMonitor::BlockHeight) { arg.Set("blockHeight", Napi::Number::New(env, parser.intData())); } } arg.Set("reason", reason); arg.Set("bitcoinScriptHashedArray", bshs); arg.Set("addressArray", addresses); delete data; jsCallback.Call({ arg }); return Napi::Value(); } } ContextData::ContextData(Napi::Env env) : m_hubConnectPromise(env), m_indexerConnectPromise(env), m_fullConnectPromise(env) { } void ContextData::setupBindings(Napi::Env env, Napi::Object exports) { auto m1 = Napi::PropertyDescriptor::Function(env, exports, "connect", contextData_connect, napi_default, this); auto m2 = Napi::PropertyDescriptor::Function(env, exports, "connectHub", contextData_connectHub, napi_default, this); auto m3 = Napi::PropertyDescriptor::Function(env, exports, "connectIndexer", contextData_connectIndexer, napi_default, this); auto m4 = Napi::PropertyDescriptor::Function(env, exports, "search", contextData_startSearch, napi_default, this); auto m5 = Napi::PropertyDescriptor::Function(env, exports, "sendTransaction", contextData_sendTransaction, napi_default, this); auto m6 = Napi::PropertyDescriptor::Function(env, exports, "subscribeToAddress", contextData_subscribeToAddress, napi_default, this); auto m7 = Napi::PropertyDescriptor::Function(env, exports, "unsubscribeAddress", contextData_unsubscribeAddress, napi_default, this); auto m8 = Napi::PropertyDescriptor::Function(env, exports, "sendMessage", contextData_sendMessage, napi_default, this); auto p1 = Napi::PropertyDescriptor::Accessor(env, exports, "onConnectHub", returnValue, setOnConnectedHub, napi_writable, this); auto p2 = Napi::PropertyDescriptor::Accessor(env, exports, "onConnectIndexer", returnValue, setOnConnectIndexer, napi_writable, this); auto p3 = Napi::PropertyDescriptor::Accessor(env, exports, "onConnected", returnValue, setOnConnected, napi_writable, this); auto p4 = Napi::PropertyDescriptor::Accessor(env, exports, "onAddressChanged", returnValue, setOnAddressChanged, napi_writable, this); auto enum1 = Napi::PropertyDescriptor::Value("IncludeOffsetInBlock", Napi::Value::From(env, int(Blockchain::IncludeOffsetInBlock))); auto enum2 = Napi::PropertyDescriptor::Value("IncludeInputs", Napi::Value::From(env, int(Blockchain::IncludeInputs))); auto enum3 = Napi::PropertyDescriptor::Value("IncludeTxid", Napi::Value::From(env, int(Blockchain::IncludeTxId))); auto enum4 = Napi::PropertyDescriptor::Value("IncludeFullTxData", Napi::Value::From(env, int(Blockchain::IncludeFullTransactionData))); auto enum5 = Napi::PropertyDescriptor::Value("IncludeOutputs", Napi::Value::From(env, int(Blockchain::IncludeOutputs))); auto enum6 = Napi::PropertyDescriptor::Value("IncludeOutputAmounts", Napi::Value::From(env, int(Blockchain::IncludeOutputAmounts))); auto enum7 = Napi::PropertyDescriptor::Value("IncludeOutputScripts", Napi::Value::From(env, int(Blockchain::IncludeOutputScripts))); auto enum8 = Napi::PropertyDescriptor::Value("IncludeOutputAddresses", Napi::Value::From(env, int(Blockchain::IncludeOutputAddresses))); exports.DefineProperties({m1, m2, m3, m4, m5, m6, m7, m8, p1, p2, p3, p4, enum1, enum2, enum3, enum4, enum5, enum6, enum7, enum8 }); Napi::Object jobEnum = Napi::Object::New(env); auto jobVal1 = Napi::PropertyDescriptor::Value("LookupTxById", Napi::Value::From(env, int(Blockchain::LookupTxById))); auto jobVal2 = Napi::PropertyDescriptor::Value("LookupByAddress", Napi::Value::From(env, int(Blockchain::LookupByAddress))); auto jobVal3 = Napi::PropertyDescriptor::Value("LookupSpentTx", Napi::Value::From(env, int(Blockchain::LookupSpentTx))); auto jobVal4 = Napi::PropertyDescriptor::Value("FetchTx", Napi::Value::From(env, int(Blockchain::FetchTx))); auto jobVal5 = Napi::PropertyDescriptor::Value("FetchBlockHeader", Napi::Value::From(env, int(Blockchain::FetchBlockHeader))); auto jobVal6 = Napi::PropertyDescriptor::Value("FetchBlockOfTx", Napi::Value::From(env, int(Blockchain::FetchBlockOfTx))); auto jobVal7 = Napi::PropertyDescriptor::Value("FetchUTXOUnspent", Napi::Value::From(env, int(Blockchain::FetchUTXOUnspent))); auto jobVal8 = Napi::PropertyDescriptor::Value("FetchUTXODetails", Napi::Value::From(env, int(Blockchain::FetchUTXODetails))); jobEnum.DefineProperties({ jobVal1, jobVal2, jobVal3, jobVal4, jobVal5, jobVal6, jobVal7, jobVal8, }); exports.Set("Job", jobEnum); } Napi::Value ContextData::connect(napi_env env, const std::string &hostname) { startedHubConnection = true; startedIndexerConnection = true; EndPoint ep(hostname, 1235); addHub(ep); ep.peerPort = ep.announcePort = 1234; addIndexer(ep); createPromiseCallback(env); return m_fullConnectPromise.promise(env); } Napi::Value ContextData::connectHub(napi_env env, const std::string &hostname, int port) { if (port < 0 || port > 0xFFFF) { Napi::TypeError::New(env, "Port out of range").ThrowAsJavaScriptException(); return Napi::Value(); } startedHubConnection = true; addHub(EndPoint(hostname, static_cast(port))); createPromiseCallback(env); return m_hubConnectPromise.promise(env); } Napi::Value ContextData::connectIndexer(napi_env env, const std::string &hostname, int port) { if (port < 0 || port > 0xFFFF) { Napi::TypeError::New(env, "Port out of range").ThrowAsJavaScriptException(); return Napi::Value(); } startedIndexerConnection = true; addIndexer(EndPoint(hostname, static_cast(port))); createPromiseCallback(env); return m_indexerConnectPromise.promise(env); } Napi::Value ContextData::startSearch(const Napi::CallbackInfo &info) { if (info.Length() != 1) { Napi::TypeError::New(info.Env(), "One arg (object literal) expected").ThrowAsJavaScriptException(); return Napi::Value(); } auto request = info[0]; if (!request.IsObject()) { Napi::TypeError::New(info.Env(), "Argument should be an object literal").ThrowAsJavaScriptException(); return Napi::Value(); } try { Search *searchObject = Search::create(request.As()); start(searchObject); return searchObject->promise(info.Env()); } catch (const std::runtime_error &e) { Napi::TypeError::New(info.Env(), e.what()).ThrowAsJavaScriptException(); return Napi::Value(); } } Napi::Value ContextData::sendTransaction(const Napi::CallbackInfo &info) { if (info.Length() != 1) { Napi::TypeError::New(info.Env(), "Missing transaction argument").ThrowAsJavaScriptException(); return Napi::Value(); } Napi::Value txval = info[0]; Streaming::MessageBuilder builder(poolForThread()); if (txval.IsTypedArray()) { Napi::TypedArray data = txval.As(); Napi::ArrayBuffer buffer = data.ArrayBuffer(); builder.addByteArray(Api::LiveTransactions::Transaction, static_cast(buffer.Data()) + data.ByteOffset(), data.ByteLength()); } else if (txval.IsString()) { Napi::String string = txval.ToString(); try { // assume hex encoding. std::vector bytes; boost::algorithm::unhex(string.Utf8Value(), back_inserter(bytes)); builder.add(Api::LiveTransactions::Transaction, bytes); } catch (std::exception &e) { Napi::TypeError::New(info.Env(), "Tx decoding failed; not hex.").ThrowAsJavaScriptException(); return Napi::Value(); } // } else if(...) { // TODO support other formats? } else { Napi::TypeError::New(info.Env(), "Argument error, no tx found").ThrowAsJavaScriptException(); return Napi::Value(); } if (!isHubConnected()) { Napi::TypeError::New(info.Env(), "Not connected to a hub").ThrowAsJavaScriptException(); return Napi::Value(); } // we will need that to fulfill the promise createPromiseCallback(info.Env()); const int jobId = m_nextNetworkJobId++; auto iterator = m_networkJobs.insert(std::make_pair(jobId, Flowee::PromiseCallback(info.Env()))).first; logInfo() << "Created sentTx promise with ID:" << jobId; try { auto message = builder.message(Api::LiveTransactionService, Api::LiveTransactions::SendTransaction); message.setHeaderInt(Api::RequestId, jobId); sendMessage(message, Blockchain::TheHub); return iterator->second.promise(info.Env()); } catch (const std::exception &e) { logInfo() << " failed sendTransaction, not connected to a hub"; Napi::TypeError::New(info.Env(), "Not connected to a hub").ThrowAsJavaScriptException(); m_networkJobs.erase(iterator); return Napi::Value(); } } Napi::Value ContextData::updateAddressMonitor(const Napi::CallbackInfo &info, ContextData::UpdateType type) { // we expect a string (or bytearray) for the address. if (info.Length() != 1) { Napi::TypeError::New(info.Env(), "Incorrect argument count").ThrowAsJavaScriptException(); return Napi::Value(); } Napi::Array addresses; if (info[0].IsArray()) { addresses = info[0].As(); } else { addresses = Napi::Array::New(info.Env()); addresses[uint32_t(0)] = info[0]; } for (uint32_t i = 0; i < addresses.Length(); ++i) { Napi::Value addressVal = addresses[i]; std::string orig; Streaming::ConstBuffer addressHash; if (addressVal.IsString()) { auto str = addressVal.ToString(); orig = str.Utf8Value(); addressHash = Flowee::parseAddress(str); } // else if (addressVal.IsTypedArray()) // TODO support other formats if (addressHash.isEmpty()) { Napi::TypeError::New(info.Env(), "could not parse address").ThrowAsJavaScriptException(); return Napi::Value(); } std::lock_guard lock(m_subscribeAddressesLock); if (type == Subscribe) { m_subscribedAddresses.push_back({addressHash, orig}); if (isHubConnected()) { Streaming::MessageBuilder builder(poolForThread()); builder.add(Api::AddressMonitor::BitcoinScriptHashed, addressHash); sendMessage(builder.message(Api::AddressMonitorService, Api::AddressMonitor::Subscribe), Blockchain::TheHub); } return Napi::Boolean::New(info.Env(), true); } else { assert(type == Unsubscribe); // remove from m_subscribedAddresses auto iter = m_subscribedAddresses.begin(); while (iter != m_subscribedAddresses.end()) { if (iter->bitcoinScriptHashed == addressHash) { m_subscribedAddresses.erase(iter); // so, we have it, then the Hub has it too. Lets tell them to remove it. if (isHubConnected()) { Streaming::MessageBuilder builder(poolForThread()); builder.add(Api::AddressMonitor::BitcoinScriptHashed, addressHash); sendMessage(builder.message(Api::AddressMonitorService, Api::AddressMonitor::Unsubscribe), Blockchain::TheHub); } return Napi::Boolean::New(info.Env(), true); } ++iter; } } } return Napi::Boolean::New(info.Env(), false); } void ContextData::createPromiseCallback(Napi::Env env) { if (m_promiseCallback.present) return; // we only need it if no other callback exists. if (!m_onHubConnect.present && !m_onIndexerConnect.present) { m_promiseCallback.f = Napi::ThreadSafeFunction::New(env, Napi::Function(), "", 0, 1); m_promiseCallback.present = true; } } void ContextData::startAllConnectedCallbacks() { /* * In case both the indexer and hub are connected we need to do two things. * 1. call back the function passed to the onAllConnected property. * 2. resolve the allconnected promise. * * The second will be called from the first, but if we don't do the first then * we need to find another callback to handle the promise. */ // then the "allConnected" callback etc need to be called too. m_onAllConnected.acquire(); if (m_onAllConnected.present) m_onAllConnected.f.NonBlockingCall(this, contextData_allConnectedCallback); else promiseCallback(this, contextData_fulfillFullConnectPromise); } void ContextData::hubSentMessage(const Message &message) { const int id = message.headerInt(Api::RequestId); if (id > 0) { // then likely this is the result of one of the network jobs. Those are handled in the NodeJS thread NetMessageData *nmd = new NetMessageData(); nmd->data = this; nmd->message = message; if (!promiseCallback(nmd, contextData_handleNetMessageFinished)) delete nmd; // failed, cleanup after ourselves. return; } if (message.serviceId() == Api::AddressMonitorService) { if (message.messageId() == Api::AddressMonitor::SubscribeReply) { Streaming::MessageParser parser(message.body()); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::AddressMonitor::ErrorMessage) { logCritical() << "AddressMonitor subscribe returned error:" << parser.stringData(); } } } else if (message.messageId() == Api::AddressMonitor::TransactionFound || message.messageId() == Api::AddressMonitor::DoubleSpendFound) { m_addressMonitorCallback.acquire(); if (m_addressMonitorCallback.present) { NetMessageData *m = new NetMessageData(); m->message = message; m->data = this; m_addressMonitorCallback.f.NonBlockingCall(m, contextData_handleAM); } } } else { logInfo() << "Hub reply that is unrecognized"; Streaming::MessageParser::debugMessage(message); } } void ContextData::indexerSentMessage(const Message &message) { if (message.serviceId() == Api::IndexerService && message.messageId() == Api::Indexer::GetIndexerLastBlockReply) { Streaming::MessageParser parser(message.body()); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::BlockHeight) { logCritical() << "Indexer is at block-height:" << parser.intData(); return; } } } } void ContextData::handleMessageForNetPromise(Napi::Env env, const Message &message) { const int id = message.headerInt(Api::RequestId); auto iterator = m_networkJobs.find(id); if (iterator == m_networkJobs.end()) return; Streaming::MessageParser parser(message.body()); try { if (message.serviceId() == Api::LiveTransactionService && message.messageId() == Api::LiveTransactions::SendTransactionReply) { Napi::Value hash = Napi::Boolean::From(env, true); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::LiveTransactions::GenericByteData) { hash = Flowee::hashToString(env, parser.bytesDataBuffer()); break; } } iterator->second.resolve(hash); } else if (message.serviceId() == Api::APIService && message.messageId() == Api::Meta::CommandFailed) { std::string error = "unknown failure"; while (parser.next() == Streaming::FoundTag) { if (parser.tag() == Api::Meta::FailedReason) { error = parser.stringData(); break; } } iterator->second.reject(env, error); } else { // build an object copying the data from the message to be // parsed in JS. const Napi::Value KEY = Napi::String::New(env, "key"); const Napi::Value VALUE = Napi::String::New(env, "value"); const Napi::Value VALUE2 = Napi::String::New(env, "string"); Napi::Object msgJs = Napi::Object::New(env); Napi::Object headers = Napi::Object::New(env); Napi::Array body = Napi::Array::New(env); for (auto iter = message.headerData().begin(); iter != message.headerData().end(); ++iter) { if (iter->first == Network::ServiceId) msgJs.Set("serviceId", Napi::Number::New(env, iter->second)); else if (iter->first == Network::MessageId) msgJs.Set("messageId", Napi::Number::New(env, iter->second)); else if (iter->first > 11) { headers.Set(Napi::String::New(env, std::to_string(iter->first)), Napi::Number::New(env, iter->second)); } } msgJs.Set("header", headers); int arrayIndex = 0; while (parser.next() == Streaming::FoundTag) { Napi::Value key = Napi::String::New(env, std::to_string(parser.tag())); Napi::Value value; if (parser.isLong()) value = Napi::Number::New(env, parser.longData()); else if (parser.isString()) value = Napi::String::New(env, parser.stringData()); else if (parser.isBool()) value = Napi::Boolean::New(env, parser.boolData()); else if (parser.isByteArray()) value = Flowee::wrap(env, parser.bytesDataBuffer()); else if (parser.isDouble()) value = Napi::Number::New(env, parser.doubleData()); if (!value.IsEmpty()) { if (parser.tag() != 0) // not the separator msgJs.Set(key, value); Napi::Object pair = Napi::Object::New(env); pair.Set(KEY, key); pair.Set(VALUE, value); body[arrayIndex++] = pair; if (parser.isByteArray() && parser.dataLength() == 32) { value = Flowee::hashToString(env, parser.bytesDataBuffer()); pair.Set(VALUE2, value); key = Napi::String::New(env, std::to_string(parser.tag()) + "str"); msgJs.Set(key, value); } } } msgJs.Set("body", body); iterator->second.resolve(msgJs); } } catch (const std::exception &e) { logCritical() << "internal error:" << e; } // a second resolve has no effect, so lets just do it to be sure our user doesn't hang. iterator->second.resolve(env, "result lost"); m_networkJobs.erase(iterator); } std::string ContextData::addressForScriptHash(const Streaming::ConstBuffer &buffer) const { std::lock_guard lock(m_subscribeAddressesLock); auto iter = m_subscribedAddresses.begin(); while (iter != m_subscribedAddresses.end()) { if (iter->bitcoinScriptHashed == buffer) { return iter->origRequest; } ++iter; } return std::string(); } void ContextData::initializeHubConnection(NetworkConnection connection, const std::string &hubVersion) { // first re-subscribe the addresses to the AddressMonitorService std::lock_guard lock(m_subscribeAddressesLock); for (auto i = m_subscribedAddresses.begin(); i != m_subscribedAddresses.end(); ++i) { Streaming::MessageBuilder builder(poolForThread()); builder.add(Api::AddressMonitor::BitcoinScriptHashed, i->bitcoinScriptHashed); connection.send(builder.message(Api::AddressMonitorService, Api::AddressMonitor::Subscribe)); } /* * On connection complete we; * 1. call back the function passed to the onConnectHub property. * 2. potentially call back the function passed to the onAllConnected property. * 3. resolve the hub promise. * 4. potentially resolve the allconnected promise. * * 3 and 4 are done in the callback C++ methods. */ bool sent = false; // allow for the usecase where only a hub is connected to without callback m_onHubConnect.acquire(); if (m_onHubConnect.present) { HubVersion *hv = new HubVersion(); hv->floweeVersion = static_cast(malloc(hubVersion.size())); strcpy(hv->floweeVersion, hubVersion.c_str()); hv->data = this; m_onHubConnect.f.NonBlockingCall(hv, contextData_hubConnectedCallback); sent = true; } if (startedIndexerConnection && isIndexerConnected() || !sent && !startedIndexerConnection) startAllConnectedCallbacks(); } void ContextData::initializeIndexerConnection(NetworkConnection connection, const std::set &services) { Message message(Api::IndexerService, Api::Indexer::GetIndexerLastBlock); connection.send(message); /* * On connection complete we; * 1. call back the function passed to the onConnectIndexer property. * 2. potentially call back the function passed to the onAllConnected property. * 3. resolve the indexer promise. * 4. potentially resolve the allconnected promise. * * 3 and 4 are done in the callback C++ methods. */ m_onIndexerConnect.acquire(); bool sent = false; if (m_onIndexerConnect.present) { IndexerServices *is = new IndexerServices; is->services = services; is->data = this; m_onIndexerConnect.f.NonBlockingCall(is, contextData_IndexerConnectedCallback); sent = true; } if (startedHubConnection && isHubConnected() || !sent && !startedHubConnection) startAllConnectedCallbacks(); } void ContextData::hubDisconnected() { // TODO maybe notify JS? } void ContextData::indexerDisconnected() { // TODO maybe notify JS? } Napi::Value ContextData::sendJsMessage(const Napi::CallbackInfo &info) { logDebug() << "sendjsmessage called"; if (info.Length() < 0) { Napi::TypeError::New(info.Env(), "Missing message arg").ThrowAsJavaScriptException(); return Napi::Value(); } auto request = info[0]; if (!request.IsObject()) { Napi::TypeError::New(info.Env(), "Argument should be an object literal").ThrowAsJavaScriptException(); return Napi::Value(); } auto jsMessage = request.As(); if (!jsMessage.Get("header").IsObject() || !jsMessage.Get("body").IsObject()) { Napi::TypeError::New(info.Env(), "Bad message").ThrowAsJavaScriptException(); return Napi::Value(); } auto header = jsMessage.Get("header").As(); Napi::Value serviceIdVal = header.Get("serviceId"); Napi::Value messageIdVal = header.Get("messageId"); if (!serviceIdVal.IsNumber() || !messageIdVal.IsNumber()) { Napi::TypeError::New(info.Env(), "Missing 'serviceId' or 'messageId' in header").ThrowAsJavaScriptException(); return Napi::Value(); } Streaming::MessageBuilder builder(poolForThread()); auto body = jsMessage.Get("body").As(); auto keys = body.GetPropertyNames(); for (uint32_t i = 0; i < keys.Length(); ++i) { Napi::Value prop = keys[i]; assert(prop.IsString()); std::string propStr = prop.ToString().Utf8Value(); try { int key_ = std::stoi(propStr); // throws if not a number if (key_ < 0) // negative numbers not allowed for key. continue; const uint32_t key = static_cast(key_); Napi::Value value = body[propStr]; if (value.IsNumber()) { int64_t num = value.ToNumber().Int64Value(); if (num < 0) builder.add(key, static_cast(num)); else builder.add(key, static_cast(num)); } else if (value.IsString()) { std::string str = value.ToString().Utf8Value(); builder.add(key, str); } else if (value.IsBoolean()) { builder.add(key, value.ToBoolean().Value()); } else if (value.IsBuffer()) { Napi::Buffer buf = value.As>(); builder.addByteArray(key, buf.Data(), buf.Length()); } else logCritical() << "Failed to process message-key" << propStr; } catch (const std::exception &) {} // silently ignore invalid body items } const int serviceId = serviceIdVal.As().Int32Value(); const int messageId = messageIdVal.As().Int32Value(); auto message = builder.message(serviceId, messageId); // find if there are additional items in the header to add. keys = header.GetPropertyNames(); for (uint32_t i = 0; i < keys.Length(); ++i) { Napi::Value prop = keys[i]; assert(prop.IsString()); std::string key = prop.ToString().Utf8Value(); try { int headerKey = std::stoi(key); if (headerKey > 11) { // everything lower is not for users Napi::Value value = header[key]; if (value.IsNumber()) message.setHeaderInt(headerKey, value.As().Int32Value()); else logCritical() << "Header" << key << "has value which is not a number"; } } catch (const std::exception &) {} // silently ignore invalid header items } // we will need that initialized to fulfill the promise createPromiseCallback(info.Env()); const int jobId = m_nextNetworkJobId++; message.setHeaderInt(Api::RequestId, jobId); auto iterator = m_networkJobs.insert(std::make_pair(jobId, Flowee::PromiseCallback(info.Env()))).first; logInfo() << "Created sentTx promise with ID:" << jobId; // Streaming::MessageParser::debugMessage(message); Blockchain::Service s = Blockchain::TheHub; if (serviceId == Api::IndexerService) { switch (messageId) { case Api::Indexer::FindAddress: s = Blockchain::IndexerAddressDb; break; case Api::Indexer::FindSpentOutput: s = Blockchain::IndexerSpentDb; break; default: s = Blockchain::IndexerTxIdDb; break; } } sendMessage(message, s); return iterator->second.promise(info.Env()); }