Files
js/Search.cpp
T
TomZ 79bf994962 Fixlets in Search
This fixes a type on a method name,
this also fixes unexpected results in some cases when the user did not
implement onFinished()
2020-07-27 20:52:38 +02:00

563 lines
19 KiB
C++

/*
* This file is part of the Flowee project
* Copyright (C) 2019 Tom Zander <tomz@freedommail.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// #undef NDEBUG // make assert do something
#include "Search.h"
#include <Blockchain_p.h>
#include "Flowee.h"
#include <cashaddr.h>
#include <base58.h>
#include <streaming/BufferPool.h>
namespace {
void setFunction(Flowee::Callback &callback, const Napi::Object &source, const char *name)
{
callback.present = false;
Napi::Value val = source[name];
if (val.IsUndefined())
return;
if (val.IsFunction()) {
logDebug() << " Successfully copied function:" << name;
callback.f = Napi::ThreadSafeFunction::New(source.Env(), val.As<Napi::Function>(), "", 0, 1);
callback.present = true;
return;
}
// logCritical() << "Search request found with" << name << "which is not a function";
throw std::runtime_error("Not a function");
}
Blockchain::Job parseJob(Napi::Value job_)
{
if (!job_.IsObject())
throw std::runtime_error("Invalid Job description");
Napi::Object job = job_.As<Napi::Object>();
Napi::Value jobType = job["type"];
if (!jobType.IsNumber())
throw std::runtime_error("Invalid Job description");
Blockchain::Job searchJob;
searchJob.type = static_cast<Blockchain::JobType>(jobType.ToNumber().Int32Value());
if (searchJob.type <= Blockchain::Unset || searchJob.type > Blockchain::FetchUTXODetails)
throw std::runtime_error("Invalid Job description");
Napi::Value value = job["value"];
if (value.IsNumber())
searchJob.intData = value.ToNumber().Int32Value();
else if (value.IsString()) {
bool isHash;
searchJob.data = Flowee::parseAddress(value.ToString(), &isHash);
if (!isHash && searchJob.type != Blockchain::LookupByAddress)
throw std::runtime_error("Job error: Value-type of 'address' but not LookupByAddress job-type");
if (searchJob.data.isEmpty())
throw std::runtime_error("Job: unparsable 'value' property");
}
// logDebug() << "Job intData" << searchJob.intData << "bytearray:" << searchJob.data.size();
value = job["value2"];
if (value.IsNumber())
searchJob.intData2 = value.ToNumber().Int32Value();
value = job["value3"];
if (value.IsNumber())
searchJob.intData3 = value.ToNumber().Int32Value();
// txFilter
Napi::Value txFilter = job["txFilter"];
if (txFilter.IsArray()) {
Napi::Array filter = txFilter.As<Napi::Array>();
for (uint32_t i = 0; i < filter.Length(); ++i) {
Napi::Value item = filter[i];
if (!item.IsNumber())
throw std::runtime_error("Job: unparsable item in 'txFilter' array");
searchJob.transactionFilters |= item.ToNumber().Uint32Value();
}
} else if (txFilter.IsNumber()) {
searchJob.transactionFilters |= txFilter.ToNumber().Uint32Value();
}
else if (!txFilter.IsUndefined()) {
throw std::runtime_error("Job: unparsable 'txFilter' property");
}
// TODO more properties?
return searchJob;
}
std::deque<Blockchain::Job> parseJobs(Napi::Array jobs)
{
std::deque<Blockchain::Job> answer;
for (uint32_t index = 0; index < jobs.Length(); ++index) {
auto job = parseJob(jobs[index]);
if (index > 0) {
// do some magic here.
switch (job.type) {
case Blockchain::Unset:
throw std::runtime_error("Missing job-type");
case Blockchain::FetchBlockOfTx:
case Blockchain::FetchBlockHeader:
// if a job was passed of the above type, but missing value. Then assume that the
// value comes from the previous job.
if (job.intData == 0)
answer[index - 1].nextJobId = int(index);
break;
default:
break;
}
}
answer.push_back(job);
}
return answer;
}
}
Napi::Value searchAddJob(const Napi::CallbackInfo &info)
{
Search *search = reinterpret_cast<Search*>(info.Data());
assert(search);
if (info.Length() != 1 || !info[0].IsObject()) {
Napi::TypeError::New(info.Env(), "Expected Job as argument").ThrowAsJavaScriptException();
return Napi::Value();
}
try {
std::lock_guard<std::mutex> lock(search->jobsLock);
search->jobs.push_back(parseJob(info[0]));
assert(info[0].IsObject()); // the previous line would have thrown if job was malformed in any way
search->addJsJobObject(info[0].ToObject());
return Napi::Number::From(info.Env(), search->jobs.size());
} catch ( const std::exception &e) {
Napi::TypeError::New(info.Env(), e.what()).ThrowAsJavaScriptException();
return Napi::Value();
}
}
struct SearchWith3Ints {
Search *search;
int a, b, c;
};
// reuse the same 3 int one for lazyness sake
void callForwardWithIntArg(Napi::Env env, Napi::Function jsCallback, SearchWith3Ints *swi)
{
if (swi->b == -42) // magic value... (oh, come-on, everything about NodeJS is hacky)
swi->search->copyResults(env);
jsCallback.Call(swi->search->mainObject(), { Napi::Number::From(env, swi->a) });
swi->search->jsCallbackFinished(env);
delete swi;
}
void callForward3Ints(Napi::Env env, Napi::Function jsCallback, SearchWith3Ints *swi)
{
jsCallback.Call(swi->search->mainObject(), {
Napi::Number::From(env, swi->a),
Napi::Number::From(env, swi->b),
Napi::Number::From(env, swi->c)
});
swi->search->jsCallbackFinished(env);
delete swi;
}
struct SearchWithTx {
Search *search;
const Blockchain::Transaction *tx;
};
void callForwardWithTransaction(Napi::Env env, Napi::Function jsCallback, SearchWithTx *data)
{
Napi::Object tx = Flowee::populateTransaction(env, data->tx, data->search->useBinaryHashes());
Napi::Value txList = data->search->mainObject().Get("transactions");
assert(!txList.IsUndefined());
assert(txList.IsArray());
Napi::Array txArray = txList.As<Napi::Array>();
txArray[txArray.Length()] = tx;
jsCallback.Call(data->search->mainObject(), {tx});
data->search->jsCallbackFinished(env);
delete data;
}
void finishPromise(Napi::Env env, Napi::Function, Search *s)
{
s->copyResults(env);
// calls the promise based callback.
s->finishupPromise();
delete s;
}
struct UtxoLookupResults {
uint32_t jobId : 31; // take one bit, to save 8 bytes
uint32_t unspent : 1;
int blockHeight;
int offsetInBlock;
int outIndex;
Search *search;
int64_t amount;
Streaming::ConstBuffer outputScript;
};
void callUtxoForward(Napi::Env env, Napi::Function jsCallback, UtxoLookupResults *data)
{
Napi::Object utxo = Napi::Object::New(env);
utxo.Set("blockHeight", Napi::Number::From(env, data->blockHeight));
utxo.Set("offsetInBlock", Napi::Number::From(env, data->offsetInBlock));
utxo.Set("outIndex", Napi::Number::From(env, data->outIndex));
utxo.Set("jobId", Napi::Number::From(env, data->jobId));
utxo.Set("unspent", Napi::Boolean::From(env, data->unspent == 1));
if (data->unspent && !data->outputScript.isEmpty()) {
utxo.Set("amount", Napi::Number::From(env, data->amount));
utxo.Set("outputScript", Flowee::wrap(env, data->outputScript));
}
jsCallback.Call(data->search->mainObject(), {utxo});
data->search->jsCallbackFinished(env);
delete data;
}
Search *Search::create(Napi::Object requestObject)
{
Napi::Value rc;
/*
* First determine if the user used form 1 or form 2
*
* Form 1:
*
* [{}, {}]
*
* Form 2:
*
* {
* jobs: [
* { }
* ],
*
* onAddressUsedInOutput: function(blockHeight, offsetInBlock, outIndex) {
* },
*
* onFinished: function(unFinishedJobCount) { }
* }
*/
assert(!requestObject.IsUndefined());
Napi::Value jobs = requestObject["jobs"];
std::unique_ptr<Search> searchObject;
searchObject.reset(new Search(requestObject.Env()));
if (jobs.IsUndefined() && requestObject.IsArray()) {
Napi::Array jobsArray = requestObject.As<Napi::Array>();
searchObject->jobs = parseJobs(jobsArray);
searchObject->setJobs(jobsArray);
}
else if (jobs.IsArray()) { // Form 2
// copy those methods that were defined. Or throw if user error is found
setFunction(searchObject->m_onTxAdded, requestObject, "onTxAdded");
setFunction(searchObject->m_onFinished, requestObject, "onFinished");
setFunction(searchObject->m_onTxIdResolved, requestObject, "onTxIdResolved");
setFunction(searchObject->m_onSpentOutputResolved, requestObject, "onSpentOutputResolved");
setFunction(searchObject->m_onAddressUsedInOutput, requestObject, "onAddressUsedInOutput");
setFunction(searchObject->m_onUtxoLookup, requestObject, "onUtxoLookup");
Napi::Array jobsArray = jobs.As<Napi::Array>();
searchObject->jobs = parseJobs(jobsArray);
searchObject->setJobs(jobsArray);
}
else {
throw std::runtime_error("Arguments invalid");
}
Napi::Value binaryHashes = requestObject["binaryHashes"];
if (binaryHashes.IsBoolean()) {
Napi::Boolean bh = binaryHashes.ToBoolean();
if (bh.Value())
searchObject->setUseBinaryHashes(true);
}
Napi::Array properyNames = requestObject.GetPropertyNames();
for (uint32_t i = 0; i < properyNames.Length(); ++i) {
Napi::Value prop = properyNames[i];
assert(prop.IsString());
std::string key = prop.ToString().Utf8Value();
if (key.size() > 2 && key[0] == 'o' && key[1] == 'n')
continue;
if (key == "jobs" || key == "binaryHashes")
continue;
// copy properties from the request object to our 'main object'. Which will be the 'this' object for all callbacks.
searchObject->mainObject().Set(prop, requestObject.Get(prop));
}
return searchObject.release();
}
void Search::finished(int unfinishedJobs)
{
/*
* this callback can come in multiple times because we break the threading of the super object.
* Instead of doing everything 'direct' in javascript we use a 2nd thread to do the callbacks
* to user code. And user code can add more jobs.
*
* The result is that we might get a false finished call while the JS thread is still calling
* callbacks.
*
* Only when m_calls drops to zero (no pending JS callbacks) do we really mark the job as finished.
*/
if (m_calls.load() != 0 || m_done)
return;
if (m_onTxAdded.present && m_onTxAdded.acquired)
m_onTxAdded.f.Release();
if (m_onTxIdResolved.present && m_onTxIdResolved.acquired)
m_onTxIdResolved.f.Release();
if (m_onSpentOutputResolved.present && m_onSpentOutputResolved.acquired)
m_onSpentOutputResolved.f.Release();
if (m_onUtxoLookup.present && m_onUtxoLookup.acquired)
m_onUtxoLookup.f.Release();
if (m_onAddressUsedInOutput.present && m_onAddressUsedInOutput.acquired)
m_onAddressUsedInOutput.f.Release();
m_done = true;
if (!m_onFinished.present) {
// we need to resolve the promise in the main thread, so we use a dummy
// ThreadSafeFunction for this.
m_promiseFinished.acquire();
if (m_promiseFinished.present) {
auto status = m_promiseFinished.f.NonBlockingCall(this, finishPromise);
if (status == napi_ok) {
m_promiseFinished.f.Release();
m_promiseFinished.present = false;
}
else
delete this;
} else {
logCritical() << "Due to old (< 12.6) version of nodejs the Flowee search promise will not be called. Consider using onFinished() instead";
delete this;
}
return;
}
m_onFinished.acquire();
SearchWith3Ints *copy = new SearchWith3Ints();
copy->a = unfinishedJobs;
copy->b = -42; // magic value to call copyResults()
copy->search = this;
auto status = m_onFinished.f.NonBlockingCall(copy, callForwardWithIntArg);
if (status == napi_ok)
m_onFinished.f.Release();
m_onFinished.present = false;
}
void Search::transactionAdded(const Blockchain::Transaction &transaction)
{
if (!m_onTxAdded.present)
return;
m_onTxAdded.acquire();
addJsCallbackScheduled();
SearchWithTx *data = new SearchWithTx();
data->search = this;
data->tx = &transaction;
auto status = m_onTxAdded.f.NonBlockingCall(data, callForwardWithTransaction);
if (status != napi_ok)
m_onTxAdded.present = false;
}
void Search::txIdResolved(int jobId, int blockHeight, int offsetInBlock)
{
if (!m_onTxIdResolved.present)
return;
m_onTxIdResolved.acquire();
addJsCallbackScheduled();
SearchWith3Ints *copy = new SearchWith3Ints();
copy->a = jobId;
copy->b = blockHeight;
copy->c = offsetInBlock;
copy->search = this;
auto status = m_onTxIdResolved.f.NonBlockingCall(copy, callForward3Ints);
if (status != napi_ok)
m_onTxIdResolved.present = false;
}
void Search::spentOutputResolved(int jobId, int blockHeight, int offsetInBlock)
{
if (!m_onSpentOutputResolved.present)
return;
m_onSpentOutputResolved.acquire();
addJsCallbackScheduled();
SearchWith3Ints *copy = new SearchWith3Ints();
copy->a = jobId;
copy->b = blockHeight;
copy->c = offsetInBlock;
copy->search = this;
auto status = m_onSpentOutputResolved.f.NonBlockingCall(copy, callForward3Ints);
if (status != napi_ok)
m_onSpentOutputResolved.present = false;
}
void Search::addressUsedInOutput(int blockHeight, int offsetInBlock, int outIndex)
{
if (!m_onAddressUsedInOutput.present)
return;
m_onAddressUsedInOutput.acquire();
addJsCallbackScheduled();
SearchWith3Ints *copy = new SearchWith3Ints();
copy->a = blockHeight;
copy->b = offsetInBlock;
copy->c = outIndex;
copy->search = this;
auto status = m_onAddressUsedInOutput.f.NonBlockingCall(copy, callForward3Ints);
if (status != napi_ok)
m_onAddressUsedInOutput.present = false;
}
void Search::utxoLookup(int jobId, int blockHeight, int offsetInBlock, int outIndex, bool unspent, int64_t amount, Streaming::ConstBuffer outputScript)
{
if (!m_onUtxoLookup.present)
return;
m_onUtxoLookup.acquire();
addJsCallbackScheduled();
UtxoLookupResults *utxo = new UtxoLookupResults();
assert(jobId >= 0);
utxo->jobId = uint32_t(jobId);
utxo->blockHeight = blockHeight;
utxo->offsetInBlock = offsetInBlock;
utxo->outIndex = outIndex;
utxo->unspent = unspent;
utxo->amount = amount;
utxo->outputScript = outputScript;
utxo->search = this;
auto status = m_onUtxoLookup.f.NonBlockingCall(utxo, callUtxoForward);
if (status != napi_ok)
m_onUtxoLookup.present = false;
}
// notice that this should only ever be called in the javascript thread
void Search::setJobs(Napi::Array jobsArray)
{
m_me.Set("jobs", jobsArray);
}
// notice that this should only ever be called in the javascript thread
void Search::addJsJobObject(Napi::Object job)
{
Napi::Array a = m_me.Get("jobs").As<Napi::Array>();
a[a.Length()] = job;
}
void Search::addJsCallbackScheduled()
{
m_calls.fetch_add(1);
}
void Search::jsCallbackFinished(Napi::Env env)
{
const int newCount = m_calls.fetch_sub(1) - 1;
if (newCount != -1) {
// if processRequests appended jobs, we copy them to our jobs array.
Napi::Array array = m_me.Get("jobs").As<Napi::Array>();
std::lock_guard<std::mutex> lock(jobsLock);
for (auto i = array.Length(); i < jobs.size(); ++i) {
Napi::Object job = Napi::Object::New(env);
const auto orig = jobs[i];
job.Set("type", Napi::Number::New(env, orig.type));
job.Set("value", Napi::Number::New(env, orig.intData));
job.Set("value2", Napi::Number::New(env, orig.intData2));
// TODO copy more values?
array[i] = job;
}
}
if (newCount == 0) {
policy->processRequests(this);
}
else if (newCount == -1) {
copyResults(env);
m_promise.resolve(mainObject());
// all data is available to, and managed by, the JS side. We can safely delete this search object.
delete this;
}
}
void Search::copyResults(Napi::Env env)
{
Napi::Array txList = m_me.Get("transactions").As<Napi::Array>();
if (txList.Length() != answer.size()) {
// then we just overwrite it.
txList = Napi::Array::New(env, answer.size());
int i = 0;
auto iter = answer.begin();
while (iter != answer.end()) {
Napi::Object tx = Flowee::populateTransaction(env, &*iter, m_useBinaryHashes);
txList[i++] = tx;
++iter;
}
m_me.Set("transactions", txList);
}
auto iter = blockHeaders.begin();
while (iter != blockHeaders.end()) {
Napi::Object header = Napi::Object::New(env);
header.Set("height", Napi::Number::From(env, iter->second.height));
header.Set("confirmations", Napi::Number::From(env, iter->second.confirmations));
header.Set("version", Napi::Number::From(env, iter->second.version));
header.Set("time", Napi::Number::From(env, iter->second.time));
header.Set("mediantime", Napi::Number::From(env, iter->second.medianTime));
header.Set("nonce", Napi::Number::From(env, iter->second.nonce));
header.Set("bits", Napi::Number::From(env, iter->second.bits));
header.Set("difficulty", Napi::Number::From(env, iter->second.difficulty));
if (m_useBinaryHashes) {
header.Set("hash", Flowee::wrap(env, iter->second.hash));
header.Set("merkleroot", Flowee::wrap(env, iter->second.merkleRoot));
} else {
header.Set("hash", Flowee::hashToString(env, iter->second.hash));
header.Set("merkleroot", Flowee::hashToString(env, iter->second.merkleRoot));
}
char name[15];
snprintf(name, 15, "block%d", iter->first);
m_me.Set(Napi::String::New(env, name), header);
++iter;
}
}
Search::Search(Napi::Env env)
: m_promise(env),
m_me(Napi::ObjectReference::New(Napi::Object::New(env), 1)),
m_calls(0)
{
// allow our callbacks access to these properties on 'this'
m_me.Set("addJob", Napi::Function::New(env, searchAddJob, "addJob", this));
m_me.Set("transactions", Napi::Array::New(env, 0));
m_me.Set("jobs", Napi::Array::New(env, 0));
m_promiseFinished.present = true;
m_promiseFinished.f = Napi::ThreadSafeFunction::New(env, Napi::Function(), "", 1, 1);
}
bool Search::useBinaryHashes() const
{
return m_useBinaryHashes;
}
void Search::setUseBinaryHashes(bool useBinaryHashes)
{
m_useBinaryHashes = useBinaryHashes;
}