377 lines
15 KiB
C++
377 lines
15 KiB
C++
/*
|
|
* This file is part of the Flowee project
|
|
* Copyright (C) 2018-2020 Tom Zander <tom@flowee.org>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "Pruner_p.h"
|
|
|
|
#include "UnspentOutputDatabase_p.h"
|
|
#include <utils/streaming/MessageBuilder.h>
|
|
#include <utils/streaming/MessageParser.h>
|
|
#include <utils/hash.h>
|
|
#include <utils/utiltime.h>
|
|
|
|
#include <boost/iostreams/device/mapped_file.hpp>
|
|
#include <boost/filesystem.hpp>
|
|
#include <boost/filesystem/fstream.hpp>
|
|
|
|
#include <stdlib.h>
|
|
#include <fstream>
|
|
#include <deque>
|
|
|
|
namespace {
|
|
static void nothing(const char *){}
|
|
|
|
struct Leaf {
|
|
int blockHeight = -1;
|
|
int offsetInBlock = -1;
|
|
int outIndex = 0;
|
|
uint256 txid;
|
|
};
|
|
|
|
struct LeafRef
|
|
{
|
|
uint32_t diskPosition;
|
|
uint256 txid;
|
|
int output;
|
|
|
|
static bool compare(const LeafRef &one, const LeafRef &two) {
|
|
if (one.txid == two.txid)
|
|
return one.output < two.output;
|
|
return one.txid < two.txid;
|
|
}
|
|
};
|
|
|
|
enum Type {
|
|
OnlyLeafs,
|
|
OnlYBucket,
|
|
BucketAndLeafs
|
|
};
|
|
|
|
std::vector<LeafRef> readLeafRefs(const Bucket &bucket, const std::shared_ptr<char> &inputBuf, size_t bufSize)
|
|
{
|
|
std::vector<LeafRef> leafRefs;
|
|
leafRefs.reserve(bucket.unspentOutputs.size());
|
|
for (auto ref : bucket.unspentOutputs) {
|
|
const uint32_t pos = ref.leafPos;
|
|
// read from old
|
|
Streaming::ConstBuffer buf(inputBuf, inputBuf.get() + pos, inputBuf.get() + bufSize);
|
|
UnspentOutput leaf(ref.cheapHash, buf);
|
|
if (leaf.blockHeight() < 1 || leaf.offsetInBlock() <= 0 || leaf.outIndex() < 0) {
|
|
logCritical() << "Error found while copying a bucket, the leaf at pos-in-file" << pos
|
|
<< "didn't have the required minimum info";
|
|
throw std::runtime_error("Error found, failed to parse leaf");
|
|
}
|
|
leafRefs.push_back({pos, leaf.prevTxId(), leaf.outIndex()});
|
|
}
|
|
return leafRefs;
|
|
}
|
|
|
|
void copyLeafs(const std::shared_ptr<char> &inputBuf, size_t bufSize, Streaming::BufferPool &outBuf, Streaming::MessageBuilder &builder, std::vector<LeafRef> &leafRefs)
|
|
{
|
|
for (size_t i = 0; i < leafRefs.size(); ++i) { // write leafs
|
|
LeafRef &ref = leafRefs[i];
|
|
const uint32_t pos = ref.diskPosition;
|
|
// read from old
|
|
Streaming::ConstBuffer buf(inputBuf, inputBuf.get() + pos, inputBuf.get() + bufSize);
|
|
UnspentOutput leaf(ref.txid.GetCheapHash(), buf);
|
|
const uint256 txid = leaf.prevTxId();
|
|
ref.diskPosition = outBuf.offset();
|
|
if (leafRefs.size() > i || leafRefs.at(i + 1).txid != txid) {
|
|
builder.add(UODB::BlockHeight, leaf.blockHeight());
|
|
builder.add(UODB::OffsetInBlock, leaf.offsetInBlock());
|
|
std::vector<char> shortTxId;
|
|
shortTxId.resize(24);
|
|
memcpy(shortTxId.data(), txid.begin() + 8, 24);
|
|
builder.add(UODB::TXID, shortTxId);
|
|
}
|
|
if (leaf.outIndex() != 0)
|
|
builder.add(UODB::OutIndex, leaf.outIndex());
|
|
builder.add(UODB::Separator, true);
|
|
}
|
|
}
|
|
|
|
uint32_t writeBucketData(Streaming::BufferPool &outBuf, Streaming::MessageBuilder &builder, const std::vector<LeafRef> &leafRefs)
|
|
{
|
|
outBuf.commit();
|
|
const uint32_t posOfBucket = outBuf.offset();
|
|
uint64_t prevCH = 0;
|
|
for (size_t i = 0; i < leafRefs.size(); ++i) {
|
|
const uint64_t cheapHash = leafRefs.at(i).txid.GetCheapHash();
|
|
if (prevCH != cheapHash) {
|
|
builder.add(UODB::CheapHash, cheapHash);
|
|
prevCH = cheapHash;
|
|
}
|
|
assert(posOfBucket > leafRefs.at(i).diskPosition);
|
|
int offset = posOfBucket - leafRefs.at(i).diskPosition;
|
|
assert(offset > 0);
|
|
builder.add(UODB::LeafPosRelToBucket, static_cast<int>(posOfBucket - leafRefs.at(i).diskPosition));
|
|
}
|
|
builder.add(UODB::Separator, true);
|
|
outBuf.commit();
|
|
return posOfBucket;
|
|
}
|
|
|
|
// copies the entire bucket, keeping the leafs and the bucket data as close together as possible
|
|
uint32_t copyBucket(const Bucket &bucket, const std::shared_ptr<char> &inputBuf, size_t bufSize, Streaming::BufferPool &outBuf, Streaming::MessageBuilder &builder)
|
|
{
|
|
std::vector<LeafRef> leafRefs = readLeafRefs(bucket, inputBuf, bufSize);
|
|
if (leafRefs.empty())
|
|
return 0;
|
|
std::sort(leafRefs.begin(), leafRefs.end(), &LeafRef::compare);
|
|
copyLeafs(inputBuf, bufSize, outBuf, builder, leafRefs);
|
|
return writeBucketData(outBuf, builder, leafRefs);
|
|
}
|
|
}
|
|
|
|
Pruner::Pruner(const std::string &dbFile, const std::string &infoFile, DBType dbType)
|
|
: m_dbFile(dbFile),
|
|
m_infoFile(infoFile),
|
|
m_dbType(dbType)
|
|
{
|
|
srandom(static_cast<uint32_t>(GetTimeMillis()));
|
|
assert(m_dbType == OlderDB || m_dbType == MostActiveDB);
|
|
char buf[20];
|
|
snprintf(buf, 20, ".new%d", int(random()));
|
|
m_tmpExtension = std::string(buf);
|
|
}
|
|
|
|
void Pruner::commit()
|
|
{
|
|
boost::filesystem::rename(m_dbFile + m_tmpExtension, m_dbFile);
|
|
boost::filesystem::rename(m_infoFile + m_tmpExtension, m_infoFile);
|
|
}
|
|
|
|
void Pruner::cleanup()
|
|
{
|
|
boost::filesystem::remove(m_dbFile + m_tmpExtension);
|
|
boost::filesystem::remove(m_infoFile + m_tmpExtension);
|
|
}
|
|
|
|
int Pruner::bucketsSize() const
|
|
{
|
|
return m_bucketsSize;
|
|
}
|
|
|
|
void Pruner::prune()
|
|
{
|
|
logCritical() << "Garbage Collecting" << m_dbFile;
|
|
logInfo() << "Starting gc. Counting buckets...";
|
|
std::ifstream in(m_infoFile, std::ios::binary | std::ios::in);
|
|
if (!in.is_open())
|
|
throw std::runtime_error("Failed to open info file");
|
|
|
|
int initialBlockHeight = -1;
|
|
int lastBlockHeight = 1;
|
|
uint256 lastBlockHash;
|
|
int posInFile = 0;
|
|
bool isTip = false;
|
|
std::deque<uint256> invalidBlocks;
|
|
|
|
int posOfJumptable = 0;
|
|
uint256 checksum;
|
|
{
|
|
std::shared_ptr<char> buf(new char[256], std::default_delete<char[]>());
|
|
in.read(buf.get(), 256);
|
|
Streaming::MessageParser parser(Streaming::ConstBuffer(buf, buf.get(), buf.get() + 256));
|
|
while (parser.next() == Streaming::FoundTag) {
|
|
if (parser.tag() == UODB::LastBlockHeight)
|
|
lastBlockHeight = parser.intData();
|
|
else if (parser.tag() == UODB::FirstBlockHeight)
|
|
initialBlockHeight = parser.intData();
|
|
else if (parser.tag() == UODB::LastBlockId)
|
|
lastBlockHash = parser.uint256Data();
|
|
else if (parser.tag() == UODB::JumpTableHash)
|
|
checksum = parser.uint256Data();
|
|
else if (parser.tag() == UODB::PositionInFile)
|
|
posInFile = parser.intData();
|
|
else if (parser.tag() == UODB::IsTip)
|
|
isTip = parser.boolData();
|
|
else if (parser.tag() == UODB::InvalidBlockHash)
|
|
invalidBlocks.push_back(parser.uint256Data());
|
|
else if (parser.tag() == UODB::Separator)
|
|
break;
|
|
}
|
|
posOfJumptable = parser.consumed();
|
|
}
|
|
in.seekg(posOfJumptable);
|
|
uint32_t jumptable[0x100000];
|
|
in.read(reinterpret_cast<char*>(jumptable), sizeof(jumptable));
|
|
|
|
{
|
|
CHash256 ctx;
|
|
ctx.write(reinterpret_cast<const unsigned char*>(jumptable), sizeof(jumptable));
|
|
uint256 result;
|
|
ctx.finalize(reinterpret_cast<unsigned char*>(&result));
|
|
if (result != checksum)
|
|
throw std::runtime_error("info file is mangled, checksum failed");
|
|
}
|
|
|
|
boost::iostreams::mapped_file file;
|
|
file.open(m_dbFile, std::ios_base::binary | std::ios_base::in);
|
|
if (!file.is_open())
|
|
throw std::runtime_error("Failed to open db file");
|
|
|
|
std::shared_ptr<char> buffer = std::shared_ptr<char>(const_cast<char*>(file.const_data()), nothing);
|
|
std::vector<Bucket> buckets;
|
|
buckets.reserve(100000);
|
|
// Find all buckets
|
|
for (int i = 0; i < 0x100000; ++i) {
|
|
if (jumptable[i] == 0)
|
|
continue;
|
|
if (jumptable[i] > 0x7FFFFFFF)
|
|
throw std::runtime_error("Info file jumps to pos > 2GB. Needs to be repaired first.");
|
|
const uint32_t bucketOffsetInFile = jumptable[i];
|
|
assert(bucketOffsetInFile < 0x80000000);
|
|
if (size_t(bucketOffsetInFile) > file.size())
|
|
throw std::runtime_error("Info file links to pos greater than DB file.");
|
|
|
|
Bucket bucket;
|
|
bucket.fillFromDisk(Streaming::ConstBuffer(buffer, buffer.get() + bucketOffsetInFile, buffer.get() + file.size()),
|
|
static_cast<int>(bucketOffsetInFile));
|
|
// bucket.shorthash = i;
|
|
if (!bucket.unspentOutputs.empty())
|
|
buckets.push_back(bucket);
|
|
}
|
|
logInfo() << "Pruner found" << buckets.size() << "buckets";
|
|
|
|
memset(jumptable, 0, sizeof(jumptable));
|
|
const std::string outFilename(m_dbFile + m_tmpExtension);
|
|
{
|
|
boost::filesystem::remove(outFilename);
|
|
boost::filesystem::ofstream outFle(outFilename);
|
|
outFle.close();
|
|
uint32_t newFileSize = 0;
|
|
if (isTip) {
|
|
newFileSize = 2147483600;
|
|
} else {
|
|
// new file size is all leafs (55 bytes each)
|
|
// then the max 30 bytes to link to it from a bucket, times the amount of leafs in a bucket.
|
|
// since we can expect a bucket to be re-written that (=amount of leafs) amount of times.
|
|
for (auto bucket : buckets) {
|
|
newFileSize += static_cast<int>(bucket.unspentOutputs.size()) * (55 + 30 + /* add some for security */ 20);
|
|
}
|
|
}
|
|
boost::filesystem::resize_file(outFilename, std::min((uint32_t) 0x7FFFFFFE, newFileSize));
|
|
}
|
|
int outFileSize = 0;
|
|
{
|
|
/*
|
|
* If the DB is older we sort the buckets assuming that an access to the bucket will
|
|
* lead to an access to the leaf.
|
|
* This, in practice, means we keep write all almost-empty buckets at the front where
|
|
* we first write the leafs immediately followed by the bucket.
|
|
* Then we follow with the rest, following the assumption that larger buckets have greater
|
|
* chance of being written out again at the end.
|
|
*
|
|
* For more recent database files we optimize to keep all the buckets together and thus keep
|
|
* that part of the file in memory as much as possible.
|
|
* This is more valuable here as many of the searches that are not hits still need to read
|
|
* the bucket before they move to the next DB.
|
|
*
|
|
* For those more recent DBs, we write all the leafs first and append the buckets at the end.
|
|
*/
|
|
|
|
boost::iostreams::mapped_file outFile;
|
|
outFile.open(outFilename, std::ios_base::binary | std::ios_base::out);
|
|
if (!outFile.is_open())
|
|
throw std::runtime_error("Failed to open replacement db file for writing");
|
|
logInfo() << "GC is now copying leafs and buckets";
|
|
|
|
std::shared_ptr<char> outStream = std::shared_ptr<char>(const_cast<char*>(outFile.const_data()), nothing);
|
|
Streaming::BufferPool outBuf = Streaming::BufferPool(outStream, static_cast<int>(outFile.size()), true);
|
|
Streaming::MessageBuilder builder(outBuf);
|
|
|
|
if (m_dbType == MostActiveDB || isTip) {
|
|
for (const Bucket &bucket : buckets) {
|
|
if (bucket.unspentOutputs.size() > 2)
|
|
continue;
|
|
// copy buckets
|
|
uint32_t newPos = copyBucket(bucket, buffer, file.size(), outBuf, builder);
|
|
jumptable[createShortHash(bucket.unspentOutputs.front().cheapHash)] = newPos;
|
|
}
|
|
for (const Bucket &bucket : buckets) {
|
|
if (bucket.unspentOutputs.size() <= 2)
|
|
continue;
|
|
uint32_t newPos = copyBucket(bucket, buffer, file.size(), outBuf, builder);
|
|
jumptable[createShortHash(bucket.unspentOutputs.front().cheapHash)] = newPos;
|
|
}
|
|
} else {
|
|
// leafs first, buckets at end of file
|
|
assert(m_dbType == OlderDB);
|
|
// first we only copy the leafs.
|
|
for (size_t index = 0; index < buckets.size(); ++index) {
|
|
Bucket &bucket = buckets[index];
|
|
assert(!bucket.unspentOutputs.empty());
|
|
std::vector<LeafRef> leafRefs = readLeafRefs(bucket, buffer, file.size());
|
|
assert(leafRefs.size() == bucket.unspentOutputs.size()); // we either copy everything or nothing
|
|
copyLeafs(buffer, file.size(), outBuf, builder, leafRefs);
|
|
size_t i = 0;
|
|
for (auto iter = bucket.unspentOutputs.begin(); iter != bucket.unspentOutputs.end(); ++iter, ++i) {
|
|
assert(iter->cheapHash == leafRefs[i].txid.GetCheapHash());
|
|
iter->leafPos = leafRefs.at(i).diskPosition;
|
|
}
|
|
}
|
|
const uint32_t startJumptables = outBuf.offset();
|
|
// next we write the bucket
|
|
for (size_t index = 0; index < buckets.size(); ++index) {
|
|
const Bucket &bucket = buckets[index];
|
|
assert(!bucket.unspentOutputs.empty());
|
|
int32_t newPos = bucket.saveToDisk(outBuf);
|
|
assert(newPos >= 0);
|
|
jumptable[createShortHash(bucket.unspentOutputs.front().cheapHash)] = static_cast<uint32_t>(newPos);
|
|
}
|
|
m_bucketsSize = int(outBuf.offset() - startJumptables); // remember the size of the jumptables
|
|
assert(m_bucketsSize >= 0);
|
|
}
|
|
outFileSize = outBuf.offset();
|
|
outFile.close();
|
|
}
|
|
file.close();
|
|
logInfo() << outFileSize << "bytes written.";
|
|
|
|
// write new info file
|
|
boost::filesystem::remove(m_infoFile + m_tmpExtension);
|
|
std::ofstream outInfo(m_infoFile + m_tmpExtension, std::ios::binary | std::ios::out | std::ios::trunc);
|
|
if (!outInfo.is_open())
|
|
throw std::runtime_error("Failed to open new index file");
|
|
|
|
Streaming::MessageBuilder builder(Streaming::NoHeader, 256);
|
|
builder.add(UODB::FirstBlockHeight, initialBlockHeight);
|
|
builder.add(UODB::LastBlockHeight, lastBlockHeight);
|
|
builder.add(UODB::LastBlockId, lastBlockHash);
|
|
builder.add(UODB::PositionInFile, outFileSize);
|
|
if (isTip) {
|
|
builder.add(UODB::IsTip, true);
|
|
for (auto hash : invalidBlocks) {
|
|
builder.add(UODB::InvalidBlockHash, hash);
|
|
}
|
|
}
|
|
|
|
{
|
|
CHash256 ctx;
|
|
ctx.write(reinterpret_cast<const unsigned char*>(jumptable), sizeof(jumptable));
|
|
uint256 result;
|
|
ctx.finalize(reinterpret_cast<unsigned char*>(&result));
|
|
builder.add(UODB::JumpTableHash, result);
|
|
}
|
|
builder.add(UODB::Separator, true);
|
|
Streaming::ConstBuffer header = builder.buffer();
|
|
outInfo.write(header.constData(), header.size());
|
|
outInfo.write(reinterpret_cast<const char*>(jumptable), sizeof(jumptable));
|
|
outInfo.flush();
|
|
outInfo.close();
|
|
}
|