Files

377 lines
15 KiB
C++
Raw Permalink Normal View History

2018-09-16 20:02:16 +02:00
/*
* This file is part of the Flowee project
2021-06-20 22:44:44 +02:00
* Copyright (C) 2018-2020 Tom Zander <tom@flowee.org>
2018-09-16 20:02:16 +02:00
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Pruner_p.h"
#include "UnspentOutputDatabase_p.h"
2020-03-02 14:59:59 +01:00
#include <utils/streaming/MessageBuilder.h>
#include <utils/streaming/MessageParser.h>
#include <utils/hash.h>
2019-08-24 15:01:22 +02:00
#include <utils/utiltime.h>
2020-03-02 14:59:59 +01:00
#include <boost/iostreams/device/mapped_file.hpp>
#include <boost/filesystem.hpp>
#include <boost/filesystem/fstream.hpp>
2019-08-24 15:01:22 +02:00
#include <stdlib.h>
2020-03-02 14:59:59 +01:00
#include <fstream>
#include <deque>
2018-09-16 20:02:16 +02:00
namespace {
static void nothing(const char *){}
struct Leaf {
int blockHeight = -1;
int offsetInBlock = -1;
int outIndex = 0;
uint256 txid;
};
struct LeafRef
2018-09-16 20:02:16 +02:00
{
uint32_t diskPosition;
uint256 txid;
int output;
static bool compare(const LeafRef &one, const LeafRef &two) {
if (one.txid == two.txid)
return one.output < two.output;
return one.txid < two.txid;
}
};
enum Type {
OnlyLeafs,
OnlYBucket,
BucketAndLeafs
};
std::vector<LeafRef> readLeafRefs(const Bucket &bucket, const std::shared_ptr<char> &inputBuf, size_t bufSize)
{
std::vector<LeafRef> leafRefs;
leafRefs.reserve(bucket.unspentOutputs.size());
2018-09-16 20:02:16 +02:00
for (auto ref : bucket.unspentOutputs) {
const uint32_t pos = ref.leafPos;
// read from old
Streaming::ConstBuffer buf(inputBuf, inputBuf.get() + pos, inputBuf.get() + bufSize);
UnspentOutput leaf(ref.cheapHash, buf);
2018-09-16 20:02:16 +02:00
if (leaf.blockHeight() < 1 || leaf.offsetInBlock() <= 0 || leaf.outIndex() < 0) {
logCritical() << "Error found while copying a bucket, the leaf at pos-in-file" << pos
<< "didn't have the required minimum info";
throw std::runtime_error("Error found, failed to parse leaf");
}
leafRefs.push_back({pos, leaf.prevTxId(), leaf.outIndex()});
}
return leafRefs;
}
2023-12-21 15:13:56 +01:00
void copyLeafs(const std::shared_ptr<char> &inputBuf, size_t bufSize, const std::shared_ptr<Streaming::BufferPool> &outBuf, Streaming::MessageBuilder &builder, std::vector<LeafRef> &leafRefs)
{
for (size_t i = 0; i < leafRefs.size(); ++i) { // write leafs
LeafRef &ref = leafRefs[i];
const uint32_t pos = ref.diskPosition;
// read from old
Streaming::ConstBuffer buf(inputBuf, inputBuf.get() + pos, inputBuf.get() + bufSize);
UnspentOutput leaf(ref.txid.GetCheapHash(), buf);
const uint256 txid = leaf.prevTxId();
2023-12-21 15:13:56 +01:00
ref.diskPosition = outBuf->offset();
if (leafRefs.size() > i || leafRefs.at(i + 1).txid != txid) {
builder.add(UODB::BlockHeight, leaf.blockHeight());
builder.add(UODB::OffsetInBlock, leaf.offsetInBlock());
std::vector<char> shortTxId;
shortTxId.resize(24);
memcpy(shortTxId.data(), txid.begin() + 8, 24);
builder.add(UODB::TXID, shortTxId);
}
2018-09-16 20:02:16 +02:00
if (leaf.outIndex() != 0)
builder.add(UODB::OutIndex, leaf.outIndex());
builder.add(UODB::Separator, true);
}
}
2023-12-21 15:13:56 +01:00
uint32_t writeBucketData(const std::shared_ptr<Streaming::BufferPool> &outBuf, Streaming::MessageBuilder &builder, const std::vector<LeafRef> &leafRefs)
{
2023-12-21 15:13:56 +01:00
outBuf->commit();
const uint32_t posOfBucket = outBuf->offset();
uint64_t prevCH = 0;
for (size_t i = 0; i < leafRefs.size(); ++i) {
const uint64_t cheapHash = leafRefs.at(i).txid.GetCheapHash();
if (prevCH != cheapHash) {
builder.add(UODB::CheapHash, cheapHash);
prevCH = cheapHash;
2018-09-16 20:02:16 +02:00
}
assert(posOfBucket > leafRefs.at(i).diskPosition);
int offset = posOfBucket - leafRefs.at(i).diskPosition;
assert(offset > 0);
builder.add(UODB::LeafPosRelToBucket, static_cast<int>(posOfBucket - leafRefs.at(i).diskPosition));
2018-09-16 20:02:16 +02:00
}
builder.add(UODB::Separator, true);
2023-12-21 15:13:56 +01:00
outBuf->commit();
return posOfBucket;
}
// copies the entire bucket, keeping the leafs and the bucket data as close together as possible
2023-12-21 15:13:56 +01:00
uint32_t copyBucket(const Bucket &bucket, const std::shared_ptr<char> &inputBuf, size_t bufSize, const std::shared_ptr<Streaming::BufferPool> &outBuf, Streaming::MessageBuilder &builder)
{
std::vector<LeafRef> leafRefs = readLeafRefs(bucket, inputBuf, bufSize);
if (leafRefs.empty())
return 0;
std::sort(leafRefs.begin(), leafRefs.end(), &LeafRef::compare);
copyLeafs(inputBuf, bufSize, outBuf, builder, leafRefs);
return writeBucketData(outBuf, builder, leafRefs);
2018-09-16 20:02:16 +02:00
}
}
Pruner::Pruner(const std::string &dbFile, const std::string &infoFile, DBType dbType)
2018-09-16 20:02:16 +02:00
: m_dbFile(dbFile),
m_infoFile(infoFile),
m_dbType(dbType)
2018-09-16 20:02:16 +02:00
{
2019-08-24 15:01:22 +02:00
srandom(static_cast<uint32_t>(GetTimeMillis()));
assert(m_dbType == OlderDB || m_dbType == MostActiveDB);
2018-09-16 20:02:16 +02:00
char buf[20];
2020-02-27 14:43:30 +01:00
snprintf(buf, 20, ".new%d", int(random()));
2018-09-16 20:02:16 +02:00
m_tmpExtension = std::string(buf);
}
void Pruner::commit()
{
boost::filesystem::rename(m_dbFile + m_tmpExtension, m_dbFile);
boost::filesystem::rename(m_infoFile + m_tmpExtension, m_infoFile);
}
2018-09-16 21:14:42 +02:00
void Pruner::cleanup()
{
boost::filesystem::remove(m_dbFile + m_tmpExtension);
boost::filesystem::remove(m_infoFile + m_tmpExtension);
}
int Pruner::bucketsSize() const
{
return m_bucketsSize;
}
2018-09-16 20:02:16 +02:00
void Pruner::prune()
{
2019-06-22 10:34:58 +02:00
logCritical() << "Garbage Collecting" << m_dbFile;
logInfo() << "Starting gc. Counting buckets...";
2018-09-16 20:02:16 +02:00
std::ifstream in(m_infoFile, std::ios::binary | std::ios::in);
if (!in.is_open())
throw std::runtime_error("Failed to open info file");
int initialBlockHeight = -1;
int lastBlockHeight = 1;
uint256 lastBlockHash;
int posInFile = 0;
2020-03-02 14:59:59 +01:00
bool isTip = false;
std::deque<uint256> invalidBlocks;
2018-09-16 20:02:16 +02:00
int posOfJumptable = 0;
uint256 checksum;
{
std::shared_ptr<char> buf(new char[256], std::default_delete<char[]>());
in.read(buf.get(), 256);
Streaming::MessageParser parser(Streaming::ConstBuffer(buf, buf.get(), buf.get() + 256));
while (parser.next() == Streaming::FoundTag) {
if (parser.tag() == UODB::LastBlockHeight)
lastBlockHeight = parser.intData();
else if (parser.tag() == UODB::FirstBlockHeight)
initialBlockHeight = parser.intData();
else if (parser.tag() == UODB::LastBlockId)
lastBlockHash = parser.uint256Data();
else if (parser.tag() == UODB::JumpTableHash)
checksum = parser.uint256Data();
else if (parser.tag() == UODB::PositionInFile)
posInFile = parser.intData();
2020-03-02 14:59:59 +01:00
else if (parser.tag() == UODB::IsTip)
isTip = parser.boolData();
else if (parser.tag() == UODB::InvalidBlockHash)
invalidBlocks.push_back(parser.uint256Data());
2018-09-16 20:02:16 +02:00
else if (parser.tag() == UODB::Separator)
break;
}
posOfJumptable = parser.consumed();
}
in.seekg(posOfJumptable);
uint32_t jumptable[0x100000];
in.read(reinterpret_cast<char*>(jumptable), sizeof(jumptable));
{
CHash256 ctx;
ctx.write(reinterpret_cast<const unsigned char*>(jumptable), sizeof(jumptable));
2018-09-16 20:02:16 +02:00
uint256 result;
ctx.finalize(reinterpret_cast<unsigned char*>(&result));
2018-09-16 20:02:16 +02:00
if (result != checksum)
throw std::runtime_error("info file is mangled, checksum failed");
}
boost::iostreams::mapped_file file;
file.open(m_dbFile, std::ios_base::binary | std::ios_base::in);
if (!file.is_open())
throw std::runtime_error("Failed to open db file");
std::shared_ptr<char> buffer = std::shared_ptr<char>(const_cast<char*>(file.const_data()), nothing);
std::vector<Bucket> buckets;
buckets.reserve(100000);
// Find all buckets
for (int i = 0; i < 0x100000; ++i) {
if (jumptable[i] == 0)
continue;
if (jumptable[i] > 0x7FFFFFFF)
throw std::runtime_error("Info file jumps to pos > 2GB. Needs to be repaired first.");
2020-02-27 14:43:30 +01:00
const uint32_t bucketOffsetInFile = jumptable[i];
assert(bucketOffsetInFile < 0x80000000);
if (size_t(bucketOffsetInFile) > file.size())
2018-09-16 20:02:16 +02:00
throw std::runtime_error("Info file links to pos greater than DB file.");
Bucket bucket;
bucket.fillFromDisk(Streaming::ConstBuffer(buffer, buffer.get() + bucketOffsetInFile, buffer.get() + file.size()),
static_cast<int>(bucketOffsetInFile));
// bucket.shorthash = i;
if (!bucket.unspentOutputs.empty())
buckets.push_back(bucket);
}
logInfo() << "Pruner found" << buckets.size() << "buckets";
memset(jumptable, 0, sizeof(jumptable));
const std::string outFilename(m_dbFile + m_tmpExtension);
{
boost::filesystem::remove(outFilename);
boost::filesystem::ofstream outFle(outFilename);
outFle.close();
2019-06-19 16:53:22 +02:00
uint32_t newFileSize = 0;
2020-03-02 14:59:59 +01:00
if (isTip) {
newFileSize = 2147483600;
} else {
// new file size is all leafs (55 bytes each)
// then the max 30 bytes to link to it from a bucket, times the amount of leafs in a bucket.
// since we can expect a bucket to be re-written that (=amount of leafs) amount of times.
for (auto bucket : buckets) {
newFileSize += static_cast<int>(bucket.unspentOutputs.size()) * (55 + 30 + /* add some for security */ 20);
}
2018-09-16 20:02:16 +02:00
}
2019-06-19 16:53:22 +02:00
boost::filesystem::resize_file(outFilename, std::min((uint32_t) 0x7FFFFFFE, newFileSize));
2018-09-16 20:02:16 +02:00
}
int outFileSize = 0;
{
/*
* If the DB is older we sort the buckets assuming that an access to the bucket will
* lead to an access to the leaf.
* This, in practice, means we keep write all almost-empty buckets at the front where
* we first write the leafs immediately followed by the bucket.
* Then we follow with the rest, following the assumption that larger buckets have greater
* chance of being written out again at the end.
*
* For more recent database files we optimize to keep all the buckets together and thus keep
* that part of the file in memory as much as possible.
* This is more valuable here as many of the searches that are not hits still need to read
* the bucket before they move to the next DB.
*
* For those more recent DBs, we write all the leafs first and append the buckets at the end.
*/
2018-09-16 20:02:16 +02:00
boost::iostreams::mapped_file outFile;
outFile.open(outFilename, std::ios_base::binary | std::ios_base::out);
if (!outFile.is_open())
throw std::runtime_error("Failed to open replacement db file for writing");
2019-06-22 10:34:58 +02:00
logInfo() << "GC is now copying leafs and buckets";
2018-09-16 20:02:16 +02:00
std::shared_ptr<char> outStream = std::shared_ptr<char>(const_cast<char*>(outFile.const_data()), nothing);
2023-12-21 15:13:56 +01:00
auto outBuf = std::make_shared<Streaming::BufferPool>(outStream, static_cast<int>(outFile.size()), true);
2018-09-16 20:02:16 +02:00
Streaming::MessageBuilder builder(outBuf);
2020-03-02 14:59:59 +01:00
if (m_dbType == MostActiveDB || isTip) {
for (const Bucket &bucket : buckets) {
if (bucket.unspentOutputs.size() > 2)
continue;
// copy buckets
uint32_t newPos = copyBucket(bucket, buffer, file.size(), outBuf, builder);
2018-09-16 20:02:16 +02:00
jumptable[createShortHash(bucket.unspentOutputs.front().cheapHash)] = newPos;
}
for (const Bucket &bucket : buckets) {
if (bucket.unspentOutputs.size() <= 2)
continue;
uint32_t newPos = copyBucket(bucket, buffer, file.size(), outBuf, builder);
2018-09-16 20:02:16 +02:00
jumptable[createShortHash(bucket.unspentOutputs.front().cheapHash)] = newPos;
}
} else {
// leafs first, buckets at end of file
assert(m_dbType == OlderDB);
// first we only copy the leafs.
for (size_t index = 0; index < buckets.size(); ++index) {
Bucket &bucket = buckets[index];
assert(!bucket.unspentOutputs.empty());
std::vector<LeafRef> leafRefs = readLeafRefs(bucket, buffer, file.size());
assert(leafRefs.size() == bucket.unspentOutputs.size()); // we either copy everything or nothing
copyLeafs(buffer, file.size(), outBuf, builder, leafRefs);
size_t i = 0;
for (auto iter = bucket.unspentOutputs.begin(); iter != bucket.unspentOutputs.end(); ++iter, ++i) {
assert(iter->cheapHash == leafRefs[i].txid.GetCheapHash());
iter->leafPos = leafRefs.at(i).diskPosition;
}
}
2023-12-21 15:13:56 +01:00
const uint32_t startJumptables = outBuf->offset();
// next we write the bucket
for (size_t index = 0; index < buckets.size(); ++index) {
const Bucket &bucket = buckets[index];
assert(!bucket.unspentOutputs.empty());
int32_t newPos = bucket.saveToDisk(outBuf);
assert(newPos >= 0);
2018-09-24 23:42:00 +02:00
jumptable[createShortHash(bucket.unspentOutputs.front().cheapHash)] = static_cast<uint32_t>(newPos);
}
2023-12-21 15:13:56 +01:00
m_bucketsSize = int(outBuf->offset() - startJumptables); // remember the size of the jumptables
assert(m_bucketsSize >= 0);
2018-09-16 20:02:16 +02:00
}
2023-12-21 15:13:56 +01:00
outFileSize = outBuf->offset();
2018-09-16 20:02:16 +02:00
outFile.close();
}
file.close();
logInfo() << outFileSize << "bytes written.";
// write new info file
boost::filesystem::remove(m_infoFile + m_tmpExtension);
std::ofstream outInfo(m_infoFile + m_tmpExtension, std::ios::binary | std::ios::out | std::ios::trunc);
if (!outInfo.is_open())
throw std::runtime_error("Failed to open new index file");
Streaming::MessageBuilder builder(Streaming::NoHeader, 256);
builder.add(UODB::FirstBlockHeight, initialBlockHeight);
builder.add(UODB::LastBlockHeight, lastBlockHeight);
builder.add(UODB::LastBlockId, lastBlockHash);
builder.add(UODB::PositionInFile, outFileSize);
2020-03-02 14:59:59 +01:00
if (isTip) {
builder.add(UODB::IsTip, true);
for (auto hash : invalidBlocks) {
builder.add(UODB::InvalidBlockHash, hash);
}
}
2018-09-16 20:02:16 +02:00
{
CHash256 ctx;
ctx.write(reinterpret_cast<const unsigned char*>(jumptable), sizeof(jumptable));
2018-09-16 20:02:16 +02:00
uint256 result;
ctx.finalize(reinterpret_cast<unsigned char*>(&result));
2018-09-16 20:02:16 +02:00
builder.add(UODB::JumpTableHash, result);
}
builder.add(UODB::Separator, true);
Streaming::ConstBuffer header = builder.buffer();
outInfo.write(header.constData(), header.size());
outInfo.write(reinterpret_cast<const char*>(jumptable), sizeof(jumptable));
outInfo.flush();
outInfo.close();
}