/* * This file is part of the Flowee project * Copyright (C) 2019-2023 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ // #undef NDEBUG // uncomment to enable asserts even in release builds #include "Pruner_p.h" #include "UTXOInteralError.h" #include "UnspentOutputDatabase.h" #include "UnspentOutputDatabase_p.h" #include #include #include #include #include #include #include #include #include #include // #define DEBUG_UTXO #ifdef DEBUG_UTXO # define DEBUGUTXO logCritical(Log::UTXO) #else # define DEBUGUTXO BCH_NO_DEBUG_MACRO() #endif // numbering in the .info files. constexpr int MAX_INFO_NUM = 20; constexpr int MAX_INFO_FILES = 13; /* * Threading rules; * The connection between m_jumptables and m_buckets is via a unique Id * registered via m_nextBucketIndex (atomic) * We guarentee that 100% of the buckets are stored on disk periodically at which * point the atomic is reset to 1. * This means I can assume that a bucketId I find in the jumptables refers to a * unique bucket, even in a multi-threaded environemnt. */ Limits UODBPrivate::limits = Limits(); static std::uint32_t createShortHash(const uint256 &hash) { auto txid = hash.begin(); return (static_cast(txid[0]) << 12) + (static_cast(txid[1]) << 4) + ((static_cast(txid[2]) & 0xF0) >> 4); } static bool matchesOutput(const Streaming::ConstBuffer &buffer, const uint256 &txid, int index) { bool txidMatched = false, indexMatched = false; Streaming::MessageParser parser(buffer); bool separatorHit = false; while (!(indexMatched && txidMatched) && parser.next() == Streaming::FoundTag) { if (!txidMatched && parser.tag() == UODB::TXID) { if (parser.dataLength() == 32 && txid == parser.uint256Data()) txidMatched = true; else if (parser.dataLength() == 24) txidMatched = memcmp(txid.begin() + 8, parser.bytesDataBuffer().begin(), 24) == 0; else return false; } else if (!indexMatched && !separatorHit && parser.tag() == UODB::OutIndex) { if (index == parser.intData()) indexMatched = true; else return false; } else if (!indexMatched && parser.tag() == UODB::Separator) { indexMatched = 0 == index; separatorHit = true; } if (separatorHit && txidMatched) break; } return indexMatched && txidMatched; } ////////////////////////////////////////////////////////////// UnspentOutput::UnspentOutput(const std::shared_ptr &pool, const uint256 &txid, int outIndex, int blockHeight, int offsetInBlock) : m_outIndex(outIndex), m_offsetInBlock(offsetInBlock), m_blockHeight(blockHeight) { assert(outIndex >= 0); assert(blockHeight > 0); assert(offsetInBlock > 80); pool->reserve(55); Streaming::MessageBuilder builder(pool); builder.add(UODB::BlockHeight, blockHeight); builder.add(UODB::OffsetInBlock, offsetInBlock); builder.add(UODB::TXID, txid); if (outIndex != 0) builder.add(UODB::OutIndex, outIndex); builder.add(UODB::Separator, true); m_data = pool->commit(); } UnspentOutput::UnspentOutput(uint64_t cheapHash, const Streaming::ConstBuffer &buffer) : m_data(buffer), m_outIndex(0), m_offsetInBlock(-1), m_blockHeight(-1), m_cheapHash(cheapHash) { bool hitSeparator = false, foundUtxo = false; Streaming::MessageParser parser(m_data); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == UODB::BlockHeight) m_blockHeight = parser.intData(); else if (parser.tag() == UODB::OffsetInBlock) m_offsetInBlock = parser.intData(); else if (!hitSeparator && parser.tag() == UODB::OutIndex) m_outIndex = parser.intData(); else if (parser.tag() == UODB::TXID) foundUtxo = true; else if (parser.tag() == UODB::Separator) hitSeparator = true; if (hitSeparator && foundUtxo) break; } if (parser.next() == Streaming::Error) throw UTXOInternalError("Unparsable UTXO-record"); assert(m_blockHeight > 0 && m_offsetInBlock >= 0); } uint256 UnspentOutput::prevTxId() const { Streaming::MessageParser parser(m_data); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == UODB::TXID) { if (parser.dataLength() == 32) return parser.uint256Data(); else if (parser.dataLength() != 24) throw UTXOInternalError("TXID of wrong length"); else { // pruned style, shorter hash, combine with our m_shortHash. char fullHash[32]; WriteLE64(reinterpret_cast(fullHash), m_cheapHash); memcpy(fullHash + 8, parser.bytesData().data(), 24); return uint256(fullHash); } } } throw UTXOInternalError("No txid in UnspentOutput buffer found"); } bool UnspentOutput::isCoinbase() const { return m_offsetInBlock >= 81 && m_offsetInBlock < 90; } ////////////////////////////////////////////////////////////// UnspentOutputDatabase::UnspentOutputDatabase(boost::asio::io_service &service, const boost::filesystem::path &basedir) : d(new UODBPrivate(service, basedir)) { } UnspentOutputDatabase::UnspentOutputDatabase(UODBPrivate *priv) : d(priv) { } UnspentOutputDatabase::~UnspentOutputDatabase() { if (d->memOnly) { for (int i = 0; i < d->dataFiles.size(); ++i) { delete d->dataFiles.at(i); } } else { logCritical() << "Flushing UTXO cashes to disk..."; bool m_changed = false; for (int i = 0; i < d->dataFiles.size() && !m_changed; ++i) { m_changed |= d->dataFiles.at(i)->m_needsSave; } for (int i = 0; i < d->dataFiles.size(); ++i) { auto df = d->dataFiles.at(i); DataFile::LockGuard deleteLock(df); deleteLock.deleteLater(); if (m_changed) { std::lock_guard saveLock(df->m_saveLock); std::lock_guard lock2(df->m_lock); df->rollback(); df->flushAll(); } } } d->dataFiles.clear(); fflush(nullptr); delete d; } UnspentOutputDatabase *UnspentOutputDatabase::createMemOnlyDB(const boost::filesystem::path &basedir) { boost::asio::io_service ioService; auto d = new UODBPrivate(ioService, basedir); d->memOnly = true; return new UnspentOutputDatabase(d); } void UnspentOutputDatabase::setSmallLimits() { UODBPrivate::limits.DBFileSize = 50000000; UODBPrivate::limits.FileFull = 30000000; UODBPrivate::limits.ChangesToSave = 50000; } void UnspentOutputDatabase::setChangeCountCausesStore(int count) { assert(count > 1000); UODBPrivate::limits.ChangesToSave = count; } void UnspentOutputDatabase::insertAll(const UnspentOutputDatabase::BlockData &data) { for (size_t i = 0; i < data.outputs.size(); i += 2000) { auto df = d->checkCapacity(); df->insertAll(d, data, i, std::min(data.outputs.size(), i + 2000)); } } void UnspentOutputDatabase::insert(const uint256 &txid, int outIndex, int blockHeight, int offsetInBlock) { auto df = d->checkCapacity(); df->insert(d, txid, outIndex, outIndex, blockHeight, offsetInBlock); } UnspentOutput UnspentOutputDatabase::find(const uint256 &txid, int index) const { DataFileList dataFiles(d->dataFiles); for (int i = dataFiles.size(); i > 0; --i) { auto answer = dataFiles.at(i - 1)->find(txid, index); if (answer.isValid()) { answer.m_privData += (static_cast(i) << 32); return answer; } } return UnspentOutput(); } SpentOutput UnspentOutputDatabase::remove(const uint256 &txid, int index, uint64_t rmHint) { SpentOutput done; const int32_t dbHint = static_cast((rmHint >> 32) & 0xFFFFFF); const uint32_t leafHint = rmHint & 0xFFFFFFFF; if (dbHint == 0) { // we don't know which one holds the data, which means we'll have to try all until we got a hit. DataFileList dataFiles(d->dataFiles); for (int i = dataFiles.size(); i > 0; --i) { done = dataFiles.at(i - 1)->remove(d, txid, index, leafHint); if (done.isValid()) break; } } else { assert(dbHint > 0); DataFileList dataFiles(d->dataFiles); if (dbHint > dataFiles.size()) throw std::runtime_error("dbHint out of range"); if (dbHint == dataFiles.size()) d->checkCapacity(); done = dataFiles.at(dbHint - 1)->remove(d, txid, index, leafHint); } return done; } bool UnspentOutputDatabase::blockFinished(int blockheight, const uint256 &blockId) { DEBUGUTXO << blockheight << blockId; uint32_t totalChanges = 0; for (int i = 0; i < d->dataFiles.size(); ++i) { DataFile* df = d->dataFiles.at(i); std::lock_guard lock(df->m_lock); df->m_lastBlockHash = blockId; df->m_lastBlockHeight = blockheight; df->m_needsSave = true; totalChanges += df->m_changesSinceJumptableWritten; df->commit(d); if (!d->memOnly && !df->m_dbIsTip) { /* * Avoid too great fragmentation by doing a garbage-collection (aka prune of dead records). * * The series of databases have 3 types of fragmentation. * The last one will be written until its full, possibly creating large fragmentation. * the one prior to that keeps the buckets close to the leafs. * For this one we can't really talk about fragmentation unless we check the actual buckets. * All DBs prior to that will have their buckets at the end and we can talk about fragmentation. */ if (d->dataFiles.size() - 2 > i) // check all but the last two for fragmentation. d->doPrune = d->doPrune || df ->fragmentationLevel() > 60000000; // greater than 60MB else // use only changesSincePrune d->doPrune = d->doPrune || df->m_changesSincePrune > 800000; } } if (d->memOnly) return false; d->checkCapacity(); if (d->doPrune || totalChanges > UODBPrivate::limits.AutoFlush) { logCritical() << "Sha256 DB writing checkpoints" << d->basedir.string(); std::vector infoFilenames; for (int i = 0; i < d->dataFiles.size(); ++i) { DataFile *df = d->dataFiles.at(i); std::lock_guard saveLock(df->m_saveLock); infoFilenames.push_back(df->flushAll()); df->m_changesSinceJumptableWritten = 0; } if (d->doPrune && d->dataFiles.size() > 1) { // prune the DB files. d->doPrune = false; logCritical() << "Garbage-collecting the sha256-DB" << d->basedir.string(); for (int db = 0; db < d->dataFiles.size() - 1; ++db) { DataFile* df = d->dataFiles.at(db); if (d->dataFiles.size() - 2 > db) { if (df->fragmentationLevel() < 40000000) // not worth pruning, skip continue; } else if (df->m_changesSincePrune < 200000) { continue; // not worth pruning, skip } auto dbFilename = df->m_path; Pruner pruner(dbFilename.string() + ".db", infoFilenames.at(static_cast(db)), (db == d->dataFiles.size() - 2) ? Pruner::MostActiveDB : Pruner::OlderDB); logDebug() << "GC-ing file" << dbFilename.string() << infoFilenames.at(db); try { pruner.prune(); DataFileCache cache(dbFilename.string()); for (int i = 0; i < MAX_INFO_NUM; ++i) boost::filesystem::remove(cache.filenameFor(i)); DataFile::LockGuard delLock(d->dataFiles.at(db)); delLock.deleteLater(); pruner.commit(); const auto newDf = new DataFile(dbFilename); newDf->m_initialBucketSize = pruner.bucketsSize(); d->dataFiles[db] = newDf; } catch (const std::runtime_error &failure) { logCritical() << "Skipping GCing of db file" << db << "reason:" << failure; pruner.cleanup(); } } fflush(nullptr); } return true; } return false; } void UnspentOutputDatabase::rollback() { DataFileList dataFiles(d->dataFiles); for (int i = 0; i < dataFiles.size(); ++i) { dataFiles.at(i)->rollback(); } } void UnspentOutputDatabase::saveCaches() { if (d->memOnly) return; auto dfs(d->dataFiles); for (int i = 0; i < dfs.size(); ++i) { DataFile *df = dfs.at(i); bool old = false; if (df->m_flushScheduled.compare_exchange_strong(old, true)) d->ioService.post(std::bind(&DataFile::flushSomeNodesToDisk_callback, df)); } } void UnspentOutputDatabase::setFailedBlockId(const uint256 &blockId) { auto dfs(d->dataFiles); assert(dfs.size() > 0); auto df = dfs.last(); std::lock_guard lock(df->m_lock); const auto size = df->m_rejectedBlocks.size(); df->m_rejectedBlocks.insert(blockId); if (size != df->m_rejectedBlocks.size()) df->m_needsSave = true; } bool UnspentOutputDatabase::blockIdHasFailed(const uint256 &blockId) const { auto dfs(d->dataFiles); assert(dfs.size() > 0); auto df = dfs.last(); std::lock_guard lock(df->m_lock); return df->m_rejectedBlocks.find(blockId) != df->m_rejectedBlocks.end(); } void UnspentOutputDatabase::clearFailedBlockId(const uint256 &blockId) { auto dfs(d->dataFiles); assert(dfs.size() > 0); auto df = dfs.last(); std::lock_guard lock(df->m_lock); auto i = df->m_rejectedBlocks.find(blockId); if (i == df->m_rejectedBlocks.end()) // wasn't there, nothing to clear. return; df->m_rejectedBlocks.erase(i); df->m_needsSave = true; } bool UnspentOutputDatabase::loadOlderState() { assert(d); assert(d->dataFiles.size() > 0); auto newD = new UODBPrivate(d->ioService, d->basedir, blockheight()); newD->memOnly = d->memOnly; if (blockheight() == newD->dataFiles.last()->m_lastBlockHeight) { delete newD; return false; } delete d; d = newD; return true; } int UnspentOutputDatabase::blockheight() const { return DataFileList(d->dataFiles).last()->m_lastBlockHeight; } uint256 UnspentOutputDatabase::blockId() const { return DataFileList(d->dataFiles).last()->m_lastBlockHash; } // /////////////////////////////////////////////////////////////////////// #ifdef linux # include # include #endif UODBPrivate::UODBPrivate(boost::asio::io_service &service, const boost::filesystem::path &basedir, int beforeHeight) : ioService(service), basedir(basedir) { boost::system::error_code error; boost::filesystem::create_directories(basedir, error); #ifdef linux // make sure that the dir we open up in has the "NO-CoW" flag set, in case this is // a btrfs filesystem. We are much slower when copy-on-write is enabled. FILE *fp = fopen(basedir.string().c_str(), "r"); if (fp) { int flags; int rc = ioctl(fileno(fp), FS_IOC_GETFLAGS, &flags); if (rc == 0 && (flags & FS_NOCOW_FL) == 0) { flags |= FS_NOCOW_FL; ioctl(fileno(fp), FS_IOC_SETFLAGS, &flags); // ignore result, its Ok to fail. } fclose(fp); } #endif int i = 1; while (true) { auto path = filepathForIndex(i); auto dbFile(path); dbFile.concat(".db"); auto status = boost::filesystem::status(dbFile); if (status.type() != boost::filesystem::regular_file) break; dataFiles.append(new DataFile(path, beforeHeight)); ++i; } if (dataFiles.size() > 1 && dataFiles.last()->m_lastBlockHeight == 0) { dataFiles.removeLast(); } if (dataFiles.isEmpty()) { dataFiles.append(DataFile::createDatafile(filepathForIndex(1), 0, uint256())); } else { // find a checkpoint version all datafiles can agree on. bool allEqual = false; int tries = 0; while (!allEqual) { allEqual = true; // we assume they are until they are not. if (++tries > 9) { // can't find a state all databases rely on. This is a fatal problem. throw UTXOInternalError("Can't find a usable UTXO state"); } int lastBlock = -1; uint256 lastBlockId; for (int i2 = 0; i2 < dataFiles.size(); ++i2) { DataFile *df = dataFiles.at(i2); if (lastBlock == -1) { lastBlock = df->m_lastBlockHeight; lastBlockId = df->m_lastBlockHash; } else if (lastBlock >= beforeHeight || lastBlock != df->m_lastBlockHeight || lastBlockId != df->m_lastBlockHash) { allEqual = false; int oldestHeight = std::min(lastBlock, df->m_lastBlockHeight); oldestHeight = std::min(oldestHeight, beforeHeight - 1); logCritical() << "Need to roll back to an older state:" << oldestHeight; logDebug() << "First:" << lastBlock << lastBlockId << "datafile" << i2 << df->m_lastBlockHeight << df->m_lastBlockHash; for (int i3 = 0; i3 < dataFiles.size(); ++i3) { DataFile *dataFile = dataFiles.at(i3); bool ok = dataFile->openInfo(oldestHeight); if (!ok) { // if we can't find something before 'oldestHeight', then we have nothing more to search logWarning() << "finding the wanted block info file (height:" << oldestHeight << ") failed for" << dataFile->m_path.string(); throw UTXOInternalError("Can't find a usable UTXO state"); } } break; } } } } if (dataFiles.size() > 1) { auto lastFull = dataFiles.at(dataFiles.size() - 2); doPrune = lastFull->m_file.size() == limits.DBFileSize; // the original size if (doPrune) lastFull->m_changesSinceJumptableWritten = UODBPrivate::limits.AutoFlush; // prune it sooner } dataFiles.last()->m_dbIsTip = true; } boost::filesystem::path UODBPrivate::filepathForIndex(int fileIndex) { boost::filesystem::path answer = basedir; std::stringstream ss; ss << std::fixed << std::setprecision(2) << "data-" << fileIndex; answer = answer / ss.str(); return answer; } DataFile *UODBPrivate::checkCapacity() { auto df = DataFileList(dataFiles).last(); int fullValue = 1; // what the flush() method sets fileFull to const bool isFull = df->m_fileFull.compare_exchange_strong(fullValue, 2); // only true once after it was set to '1' if (!isFull) return df; doPrune = true; DEBUGUTXO << "Creating a new DataFile" << dataFiles.size(); auto newDf = DataFile::createDatafile(filepathForIndex(dataFiles.size() + 1), df->m_lastBlockHeight, df->m_lastBlockHash); newDf->m_rejectedBlocks = df->m_rejectedBlocks; df->m_rejectedBlocks.clear(); df->m_dbIsTip = false; dataFiles.append(newDf); return newDf; } ////////////////////////////////////////////////////////////////////////////////////////// static void nothing(const char *){} DataFile::DataFile(const boost::filesystem::path &filename, int beforeHeight) : m_fileFull(0), m_memBuffers(std::make_shared(100000)), m_nextBucketIndex(1), m_nextLeafIndex(1), m_path(filename), m_changeCountBlock(0), m_changeCount(0), m_fragmentationCalcTimestamp(boost::gregorian::date(1970,1,1)), m_flushScheduled(false), m_usageCount(1) { memset(m_jumptables, 0, sizeof(m_jumptables)); auto dbFile(filename); dbFile.concat(".db"); m_file.open(dbFile, std::ios_base::binary | std::ios_base::in | std::ios_base::out); if (!m_file.is_open()) throw UTXOInternalError("Failed to open UTXO DB file read/write"); m_buffer = std::shared_ptr(const_cast(m_file.const_data()), nothing); m_writeBuffer = std::make_shared(m_buffer, static_cast(m_file.size()), true); DataFileCache cache(m_path); while (!cache.m_validInfoFiles.empty()) { auto iter = cache.m_validInfoFiles.begin(); auto highest = iter; while (iter != cache.m_validInfoFiles.end()) { if (iter->lastBlockHeight > highest->lastBlockHeight && iter->lastBlockHeight < beforeHeight) highest = iter; ++iter; } if (cache.load(*highest, this)) break; // all ok cache.m_validInfoFiles.erase(highest); } } DataFile::DataFile(int startHeight, int endHeight) : m_fileFull(0), m_nextBucketIndex(1), m_nextLeafIndex(1), m_initialBlockHeight(startHeight), m_lastBlockHeight(endHeight), m_changeCountBlock(0), m_changeCount(0), m_flushScheduled(false), m_usageCount(1) { // Notice that this constructor is only for unit testing purposes memset(m_jumptables, 1, sizeof(m_jumptables)); } void DataFile::insert(const UODBPrivate *priv, const uint256 &txid, int firstOutput, int lastOutput, int blockHeight, int offsetInBlock) { assert(offsetInBlock > 80); assert(blockHeight > 0); assert(firstOutput >= 0); assert(lastOutput >= firstOutput); assert(!txid.IsNull()); LockGuard delLock(this); const uint32_t shortHash = createShortHash(txid); uint32_t bucketId; { BucketHolder bucket; uint32_t lastCommittedBucketIndex; do { bucket.unlock(); { std::lock_guard lock(m_lock); lastCommittedBucketIndex = m_lastCommittedBucketIndex; bucketId = m_jumptables[shortHash]; if (bucketId == 0) {// doesn't exist yet. Create now. bucketId = static_cast(m_nextBucketIndex.fetch_add(1)); DEBUGUTXO << "Insert leafs" << txid << firstOutput << "-" << lastOutput << "creates new bucket id:" << bucketId; bucket = m_buckets.lock(static_cast(bucketId)); assert(*bucket == nullptr); bucket.insertBucket(static_cast(bucketId), Bucket()); m_jumptables[shortHash] = bucketId + MEMBIT; break; } } if (bucketId < MEMBIT) // not in memory break; bucket = m_buckets.lock(static_cast(bucketId & MEMMASK)); } while (*bucket == nullptr); if (*bucket) { for (int i = firstOutput; i <= lastOutput; ++i) { const std::int32_t leafPos = m_nextLeafIndex.fetch_add(1); DEBUGUTXO << "Insert leaf" << (leafPos & MEMMASK) << "shortHash:" << Log::Hex << shortHash; bucket->unspentOutputs.push_back( OutputRef(txid.GetCheapHash(), static_cast(leafPos) + MEMBIT, new UnspentOutput(m_memBuffers, txid, i, blockHeight, offsetInBlock))); } bucket->saveAttempt = 0; bucket.unlock(); addChange(lastOutput - firstOutput + 1); if (bucketId > MEMMASK && (bucketId & MEMMASK) <= lastCommittedBucketIndex) { std::lock_guard lock(m_lock); m_bucketsToNotSave.insert(bucketId); } return; } if (bucketId >= m_file.size()) // data corruption throw UTXOInternalError("Bucket points past end of file."); } // if we are still here that means that the bucket is stored on disk, we need to load it first. Bucket memBucket; assert(bucketId != 0); assert((bucketId & MEMBIT) == 0); // read from disk outside of the mutex, this is an expensive operation (because disk-io) memBucket.fillFromDisk(Streaming::ConstBuffer(m_buffer, m_buffer.get() + bucketId, m_buffer.get() + m_file.size()), static_cast(bucketId)); // after Disk-IO, acquire lock again. const int bucketIndex = m_nextBucketIndex.fetch_add(1); FlexLockGuard lock(m_lock); // re-fetch in case we had an AbA race bucketId = m_jumptables[shortHash]; if ((bucketId & MEMBIT) || bucketId == 0) {// it got loaded into mem in parallel to our attempt lock.unlock(); return insert(priv, txid, firstOutput, lastOutput, blockHeight, offsetInBlock); } m_committedBucketLocations.insert(std::make_pair(shortHash, bucketId)); auto bucket = m_buckets.lock(bucketIndex); bucket.insertBucket(bucketIndex, std::move(memBucket)); m_jumptables[shortHash] = static_cast(bucketIndex) + MEMBIT; lock.unlock(); for (int i = firstOutput; i <= lastOutput; ++i) { const std::int32_t leafPos = m_nextLeafIndex.fetch_add(1); DEBUGUTXO << "Insert leaf" << (leafPos & MEMMASK) << "shortHash:" << Log::Hex << shortHash; DEBUGUTXO << Log::Hex << " + from disk, bucketId:" << bucketIndex; bucket->unspentOutputs.push_back( OutputRef(txid.GetCheapHash(), static_cast(leafPos) + MEMBIT, new UnspentOutput(m_memBuffers, txid, i, blockHeight, offsetInBlock))); } bucket->saveAttempt = 0; bucket.unlock(); addChange(); } void DataFile::insertAll(const UODBPrivate *priv, const UnspentOutputDatabase::BlockData &data, size_t start, size_t end) { for (size_t i = start; i < end; ++i) { assert(data.outputs.size() > i); const auto &o = data.outputs.at(i); insert(priv, o.txid, o.firstOutput, o.lastOutput, data.blockHeight, o.offsetInBlock); } assert(m_writeBuffer.get()); int spaceLeft = UODBPrivate::limits.FileFull - m_writeBuffer->offset(); if (m_changeCountBlock.load() * 120 > spaceLeft) { int notFull = 0; // only change if its still the default value. m_fileFull.compare_exchange_strong(notFull, 1); DEBUGUTXO << "insertAll: Marking file full"; } } UnspentOutput DataFile::find(const uint256 &txid, int index) const { LockGuard delLock(this); const uint32_t shortHash = createShortHash(txid); const auto cheapHash = txid.GetCheapHash(); uint32_t bucketId; DEBUGUTXO << txid << index << Log::Hex << shortHash; BucketHolder bucketHolder; /* first get the a bucket from the jumptables if its there and try to lock * the bucket in a lock-free manner, looping to try again if it fails to lock. */ do { bucketHolder.unlock(); { std::lock_guard lock(m_lock); bucketId = m_jumptables[shortHash]; if (bucketId == 0) // not found return UnspentOutput(); } if (bucketId < MEMBIT) // not in memory break; // a successfully locked bucket gives us a BucketHolder, which will ensure we lock again when leaving scope bucketHolder = m_buckets.lock(static_cast(bucketId & MEMMASK)); } while (*bucketHolder == nullptr); Bucket bucket; if (*bucketHolder) { // Bucket found in memory. It is exclusively ours until BucketHolder releases it. const Bucket *bucketRef = *bucketHolder; for (const OutputRef &ref : bucketRef->unspentOutputs) { // the cheapref is 64 bits, making it even more certain we have the right one before needing another disk-io. if ((ref.leafPos & MEMBIT) && ref.cheapHash == cheapHash) { // oh, its in memory already, lets check if the txid AND the index fully match. assert(ref.unspentOutput); // the 'matchesOutput' parses the output while its still encoded. if (matchesOutput(ref.unspentOutput->data(), txid, index)) {// found it! UnspentOutput answer = *ref.unspentOutput; answer.setRmHint(ref.leafPos); return answer; } } } bucket.operator=(*bucketRef); // deep copy } else if (bucketId >= m_file.size()) // disk based bucket, data corruption throw UTXOInternalError("Bucket points past end of file."); bucketHolder.unlock(); // If it was locked, we now have a deep copy of the bucket if ((bucketId & MEMBIT) == 0) { // copy from disk // disk is immutable, so this is safe outside of the mutex. bucket.fillFromDisk(Streaming::ConstBuffer(m_buffer, m_buffer.get() + bucketId, m_buffer.get() + m_file.size()), static_cast(bucketId)); // FYI: a bucket coming from disk implies all leafs are not in memory. } // Only on-disk leafs to check, to do this fast we sort by position on disk for mem-locality. std::vector diskRefs; for (auto ref : bucket.unspentOutputs) { if (!(ref.leafPos & MEMBIT) && ref.cheapHash == cheapHash) diskRefs.push_back(ref.leafPos); } std::sort(diskRefs.begin(), diskRefs.end()); for (size_t i = diskRefs.size(); i > 0; --i) { const uint32_t pos = diskRefs.at(i - 1); // we do this all without any locking on a copy of the bucket because we know that stuff written to // m_buffer is immutable. Streaming::ConstBuffer buf(m_buffer, m_buffer.get() + pos, m_buffer.get() + m_file.size()); if (matchesOutput(buf, txid, index)) { // found it! UnspentOutput answer(cheapHash, buf); answer.setRmHint(pos); return answer; } } return UnspentOutput(); } SpentOutput DataFile::remove(const UODBPrivate *priv, const uint256 &txid, int index, uint32_t leafHint) { LockGuard delLock(this); SpentOutput answer; const auto cheapHash = txid.GetCheapHash(); const uint32_t shortHash = createShortHash(cheapHash); uint32_t bucketId; BucketHolder bucket; do { bucket.unlock(); { std::lock_guard lock(m_lock); bucketId = m_jumptables[shortHash]; if (bucketId == 0) // not found return answer; } if (bucketId < MEMBIT) // not in memory break; bucket = m_buckets.lock(static_cast(bucketId & MEMMASK)); } while (*bucket== nullptr); Bucket memBucket; if (*bucket) { assert(!bucket->unspentOutputs.empty()); DEBUGUTXO << "remove" << txid << index << "from bucket in memory. shortHash:" <unspentOutputs.begin(); ref != bucket->unspentOutputs.end(); ++ref) { if ((ref->leafPos & MEMBIT) && (ref->leafPos == leafHint || ref->cheapHash == cheapHash)) { UnspentOutput *output = ref->unspentOutput; if (ref->leafPos == leafHint || matchesOutput(output->data(), txid, index)) { // found it! DEBUGUTXO << " +r " << txid << index << "removed, was in-mem leaf" << (ref->leafPos & MEMMASK); answer.blockHeight = output->blockHeight(); answer.offsetInBlock = output->offsetInBlock(); assert(answer.isValid()); bucket->saveAttempt = 0; const uint32_t leafIndex = ref->leafPos & MEMMASK; //copy as the next section frees ref const bool deleteBucket = bucket->unspentOutputs.size() == 1; if (deleteBucket) bucket.deleteBucket(); // remove whole bucket if left empty else bucket->unspentOutputs.erase(ref); bucket.unlock(); addChange(); std::lock_guard lock(m_lock); if (deleteBucket) m_jumptables[shortHash] = 0; if (leafIndex <= m_lastCommittedLeafIndex) { // make backup of a leaf that has been committed but not yet saved m_leafsBackup.push_back(output); } else { delete output; } // Mark bucket to not be saved. Bucket IDs with values higher than lastCommited don't get saved either way if ((bucketId & MEMMASK) <= m_lastCommittedBucketIndex) m_bucketsToNotSave.insert(bucketId); return answer; } } } memBucket = **bucket; // maybe the hit is in an on-disk leaf (of this in-memory-bucket) bucket.unlock(); } if (bucketId < MEMBIT) { // we could not find bucket in memory, read it from disk. // disk is immutable, so this is safe outside of the mutex. memBucket.fillFromDisk(Streaming::ConstBuffer(m_buffer, m_buffer.get() + bucketId, m_buffer.get() + m_file.size()), static_cast(bucketId)); // FYI: a bucket coming from disk implies all leafs are also on disk. } // Only on-disk leafs to check, to avoid unneeded disk-IO we sort by position-on-disk for locality. std::vector diskRefs; bool hintFound = false; for (auto ref : memBucket.unspentOutputs) { if (ref.leafPos < MEMBIT && ref.cheapHash == cheapHash) { if (ref.leafPos == leafHint) hintFound = true; else diskRefs.push_back(ref.leafPos); } } std::sort(diskRefs.begin(), diskRefs.end()); if (hintFound) diskRefs.insert(diskRefs.begin(), leafHint); // check the hint first. for (size_t i = diskRefs.size(); i > 0; --i) { const uint32_t pos = diskRefs.at(i - 1); // we do this all without any locking on a copy of the bucket because we know that stuff written to // m_buffer is immutable. Streaming::ConstBuffer buf(m_buffer, m_buffer.get() + pos, m_buffer.get() + m_file.size()); if (matchesOutput(buf, txid, index)) { // found the leaf I want to remove! const OutputRef ref(cheapHash, pos); uint32_t newBucketId; do { bucket.unlock(); { std::lock_guard lock(m_lock); newBucketId = m_jumptables[shortHash]; if (newBucketId == 0) // since deleted return answer; } if (newBucketId < MEMBIT) // not in memory break; bucket = m_buckets.lock(static_cast(newBucketId & MEMMASK)); } while (*bucket == nullptr); if (*bucket) { DEBUGUTXO << "remove" << txid << index << "from (now) in-mem bucket, id:" << (newBucketId & MEMMASK) << "leaf disk-pos:" << pos << "shortHash:" << Log::Hex << shortHash; bool found = false; for (auto iter = bucket->unspentOutputs.begin(); iter != bucket->unspentOutputs.end(); ++iter) { if (*iter == ref) { found = true; bucket->unspentOutputs.erase(iter); break; } } if (!found) return answer; if (bucket->unspentOutputs.empty()) { // remove if empty bucket.deleteBucket(); bucket.unlock(); std::lock_guard lock(m_lock); m_jumptables[shortHash] = 0; } else { bucket->saveAttempt = 0; bucket.unlock(); } std::lock_guard lock(m_lock); if ((bucketId & MEMMASK) <= m_lastCommittedBucketIndex) { m_bucketsToNotSave.insert(newBucketId); m_leafIdsBackup.push_back(ref); } } else { // bucket not in memory (now). So it has to come from disk. (this is the very slow path) if (newBucketId != bucketId) { DEBUGUTXO << " +r reload bucket from disk"; // ugh, it got saved and probably changed. Load it again :( memBucket.fillFromDisk(Streaming::ConstBuffer(m_buffer, m_buffer.get() + newBucketId, m_buffer.get() + m_file.size()), static_cast(newBucketId)); } bool found = false; // detect double spend for (auto refIter = memBucket.unspentOutputs.begin(); refIter != memBucket.unspentOutputs.end(); ++refIter) { if (*refIter == ref) { memBucket.unspentOutputs.erase(refIter); found = true; break; } } if (!found) return answer; FlexLockGuard lock(m_lock); if (m_jumptables[shortHash] != newBucketId) { // If this is the case then it got loaded into m_buckets in parallel. // We can recurse and make the top of this method handle the removing then. lock.unlock(); return remove(priv, txid, index, leafHint); } m_committedBucketLocations.insert(std::make_pair(shortHash, newBucketId)); // We just loaded it from disk, should we insert the bucket into m_buckets? if (memBucket.unspentOutputs.empty()) { // no, just delete from jumptable DEBUGUTXO << " +r bucket now empty, zero'd jumptable. Shorthash:" << Log::Hex <(bucketIndex) + MEMBIT; } } UnspentOutput uo(cheapHash, buf); answer.blockHeight = uo.blockHeight(); answer.offsetInBlock = uo.offsetInBlock(); assert(answer.isValid()); addChange(); break; } } return answer; } int DataFile::fragmentationLevel() { const auto now = boost::posix_time::second_clock::universal_time(); if ((now - m_fragmentationCalcTimestamp).total_seconds() < 100) return m_fragmentationLevel; // return cached m_fragmentationCalcTimestamp = now; uint32_t lowestOffset = m_file.size(); uint32_t highestOffset = 0; for (int i = 0; i < 0x100000; ++i) { uint32_t bucketId = m_jumptables[i]; if (bucketId < MEMBIT && bucketId > 0) { lowestOffset = std::min(lowestOffset, bucketId); highestOffset = std::max(highestOffset, bucketId); } } if (lowestOffset < highestOffset) { m_fragmentationLevel = highestOffset - lowestOffset; if (m_fragmentationLevel < m_initialBucketSize) m_fragmentationLevel = 0; else m_fragmentationLevel -= m_initialBucketSize; logDebug() << "Datafile" << m_path.string() << "fragmentation check" << m_fragmentationLevel << "aka" << (m_fragmentationLevel / 1000000) << "MB"; } return m_fragmentationLevel; } void DataFile::flushSomeNodesToDisk_callback() { flushSomeNodesToDisk(NormalSave); m_flushScheduled = false; } void DataFile::flushSomeNodesToDisk(ForceBool force) { LockGuard delLock(this); // in the rare case of flushAll() this may cause this method to be called from two // threads simultaniously. The below lock avoids this being an issue. std::lock_guard saveLock(m_saveLock); uint32_t lastCommittedBucketIndex; std::set bucketsToNotSave; { std::lock_guard lock(m_lock); lastCommittedBucketIndex = m_lastCommittedBucketIndex; bucketsToNotSave = m_bucketsToNotSave; } const int changeCountAtStart = m_changeCount; int32_t flushedToDiskCount = 0; int32_t leafsFlushedToDisk = 0; std::list savedBuckets; /* * Iterate over m_buckets * if save counter is at 1, flush to disk unsaved leafs and update the bucket and the m_leafs * if save counter is >= 4, save bucket and make a copy of it. Don't delete it from m_buckets. * increase save count */ for (auto iter = m_buckets.begin(); iter != m_buckets.end(); ++iter) { const uint32_t bucketId = static_cast(iter.key()); Bucket *bucket = &iter.value(); assert(bucket); assert(!bucket->unspentOutputs.empty()); bool allLeafsSaved = false; if (force == ForceSave || bucket->saveAttempt >= 1) { assert(!bucket->unspentOutputs.empty()); // save any leafs not yet on disk allLeafsSaved = true; for (auto refIter = bucket->unspentOutputs.begin(); refIter != bucket->unspentOutputs.end(); ++refIter) { if (refIter->leafPos >= MEMBIT) { if ((refIter->leafPos & MEMMASK) <= m_lastCommittedLeafIndex) { assert(refIter->unspentOutput); UnspentOutput *output = refIter->unspentOutput; refIter->leafPos = static_cast(saveLeaf(output)); refIter->unspentOutput = nullptr; delete output; leafsFlushedToDisk++; assert((refIter->leafPos & MEMBIT) == 0); } else { assert(force == NormalSave); allLeafsSaved = false; } } } } if (allLeafsSaved && (force == ForceSave || bucket->saveAttempt >= 4)) { const bool saveBucket = bucketId <= lastCommittedBucketIndex && bucketsToNotSave.find(bucketId + MEMBIT) == bucketsToNotSave.end(); if (saveBucket) { flushedToDiskCount++; assert(!bucket->unspentOutputs.empty()); int offset = bucket->saveToDisk(m_writeBuffer); assert(static_cast(offset) < MEMBIT && offset >= 0); savedBuckets.push_back(SavedBucket(bucket->unspentOutputs, static_cast(offset), bucket->saveAttempt)); assert(!bucket->unspentOutputs.empty()); } } ++bucket->saveAttempt; } flushedToDiskCount += leafsFlushedToDisk; if (flushedToDiskCount == 0) return; /* * Iterate over saved buckets and check if they are unchanged in m_buckets, if so then * update the now locked m_jumptable and delete it from m_buckets */ for (const SavedBucket &savedBucket : savedBuckets) { assert(!savedBucket.unspentOutputs.empty()); const auto shortHash = createShortHash(savedBucket.unspentOutputs.begin()->cheapHash); assert(shortHash < 0x100000); uint32_t bucketId; bool saveBucket; BucketHolder bucketHolder; do { bucketHolder.unlock(); // get bucketId of transactions we just saved, notice that this may give a different // bucket than we just saved as the one we saved may have been deleted and a new one // (with a different bucketId) was created. { // to avoid a race condition where the jumptables and the m_buckets are out of sync // we unlock and try multiple times. std::lock_guard lock(m_lock); bucketId = m_jumptables[shortHash]; if (bucketId == 0) // deleted saveBucket = false; else // check if we are still allowed to save saveBucket = force == ForceSave || ((bucketId & MEMMASK) <= m_lastCommittedBucketIndex && m_bucketsToNotSave.find(bucketId) == m_bucketsToNotSave.end()); } if (bucketId < MEMBIT) { // race condition, lets not wait for the data to settle, just keep in mem // to save properly next flush saveBucket = false; break; } if (saveBucket) bucketHolder = m_buckets.lock(static_cast(bucketId & MEMMASK)); } while (saveBucket && *bucketHolder == nullptr); if (!saveBucket) continue; Bucket *bucket = *bucketHolder; assert(bucket); assert(!bucket->unspentOutputs.empty()); // the remove code should have removed empty buckets. if (bucket->unspentOutputs.size() != savedBucket.unspentOutputs.size()) // it got changed.. continue; bool identical = true; for (size_t i = 0; identical && i < savedBucket.unspentOutputs.size(); ++i) { const OutputRef &savedItem = savedBucket.unspentOutputs.at(i); const OutputRef &item = bucket->unspentOutputs.at(i); identical = savedItem.leafPos == item.leafPos && savedItem.cheapHash == item.cheapHash; } if (identical) { // save was Ok and successful assert(savedBucket.offsetInFile < MEMBIT); bucketHolder.deleteBucket(); bucketHolder.unlock(); std::lock_guard lock(m_lock); m_jumptables[shortHash] = savedBucket.offsetInFile; } } logInfo() << "Flushed" << flushedToDiskCount << "to disk." << m_path.filename().string() << "Filesize now:" << m_writeBuffer->offset(); m_changeCount.fetch_sub(std::min(changeCountAtStart, flushedToDiskCount * 4)); m_needsSave = true; if (m_writeBuffer->offset() > UODBPrivate::limits.FileFull) { int notFull = 0; // only change if its still the default value. m_fileFull.compare_exchange_strong(notFull, 1); } m_changesSinceJumptableWritten += flushedToDiskCount; m_changesSincePrune += flushedToDiskCount; } std::string DataFile::flushAll() { LockGuard delLock(this); assert(m_bucketsToNotSave.empty()); while (true) { /* * The jumptables and the buckets are updated separately which may * lead to they being out of sync. In all methods we take care to * avoid this causing issues, but the main effect here is that a * ForceSave may actually skip items because the buckets and the * jumptable don't agree at the time of saving. * Trying a second time with a bit of a wait will effectively solve this. */ flushSomeNodesToDisk(ForceSave); if (m_buckets.begin() == m_buckets.end()) // no buckets left to save break; MilliSleep(10); } #ifndef NDEBUG for (int i = 0; i < 0x100000; ++i) { assert(m_jumptables[i] < MEMBIT); } #endif m_nextBucketIndex = 1; m_nextLeafIndex = 1; m_memBuffers->clear(); commit(nullptr); DataFileCache cache(m_path); auto infoFilename = cache.writeInfoFile(this); m_needsSave = false; return infoFilename; } int32_t DataFile::saveLeaf(const UnspentOutput *uo) { const int32_t offset = m_writeBuffer->offset(); assert(uo->data().size() > 0); memcpy(m_writeBuffer->begin(), uo->data().begin(), static_cast(uo->data().size())); m_writeBuffer->commit(uo->data().size()); return offset; } void DataFile::commit(const UODBPrivate *priv) { // mutex already locked by caller. const int nextBucketIndex = m_nextBucketIndex.load(); assert(nextBucketIndex > 0); m_lastCommittedBucketIndex = static_cast(nextBucketIndex) - 1; m_lastCommittedLeafIndex = static_cast(m_nextLeafIndex.load()) - 1; for (UnspentOutput *output : m_leafsBackup) { delete output; } m_leafsBackup.clear(); for (const OutputRef &ref : m_leafIdsBackup) { delete ref.unspentOutput; } m_leafIdsBackup.clear(); m_bucketsToNotSave.clear(); m_committedBucketLocations.clear(); const int move = m_changeCountBlock.load(); m_changeCountBlock.fetch_sub(move); m_changeCount.fetch_add(move); const int cc = m_changeCount.load(); m_needsSave |= cc > 0; if (priv && !priv->memOnly && cc > UODBPrivate::limits.ChangesToSave) { if (m_flushScheduled && cc > UODBPrivate::limits.ChangesToSave * 2 && move < UODBPrivate::limits.ChangesToSave) { // Saving is too slow! We are more than an entire chunk-size behind. // forcefully slow down adding data into memory. logWarning() << m_path.string() << "saving too slow. Count:" << cc << "sleeping a little"; boost::this_thread::sleep_for(boost::chrono::microseconds(std::min(cc, 100000))); } bool old = false; if (m_flushScheduled.compare_exchange_strong(old, true)) priv->ioService.post(std::bind(&DataFile::flushSomeNodesToDisk_callback, this)); } } void DataFile::rollback() { LockGuard delLock(this); std::lock_guard mutex_lock(m_lock); DEBUGUTXO << "Rollback" << m_path.string(); // inserted new stuff is mostly irrelevant for rollback, we haven't been saving them, // all we need to do is remove them from memory. for (auto iter = m_buckets.begin(); iter != m_buckets.end();) { #ifndef NDEBUG { const int bucketId = iter.key(); assert(static_cast(bucketId) <= MEMBIT); Bucket *bucket = &iter.value(); assert(!bucket->unspentOutputs.empty()); const auto shortHash = createShortHash(bucket->unspentOutputs.begin()->cheapHash); assert(shortHash < 0x100000); assert(m_jumptables[shortHash] >= MEMBIT); assert(m_jumptables[shortHash] == static_cast(bucketId) + MEMBIT); } #endif if (static_cast(iter.key()) <= m_lastCommittedBucketIndex) { ++iter; continue; } assert (!iter.value().unspentOutputs.empty()); const uint32_t shortHash = createShortHash(iter.value().unspentOutputs.front().cheapHash); #ifndef NDEBUG DEBUGUTXO << "Rolling back adding a bucket" << iter.key() << "shortHash" << Log::Hex < MEMBIT) { assert(outRef.unspentOutput); DEBUGUTXO << " + " << outRef.unspentOutput->prevTxId() << outRef.unspentOutput->outIndex(); } else { DEBUGUTXO << " + saved leaf" << outRef.leafPos; } } #endif // Newly inserted buckets can be totally new, or inserted because they were retrieved // from disk, changed and scheduled to be saved. // In case its new, just set the jumptable to zero. Otherwise fetch the previous // known on-disk position from m_committedJumptable uint32_t newBucketPos = 0; auto jti = m_committedBucketLocations.find(shortHash); if (jti != m_committedBucketLocations.end()) // before commit() it was a fully on-disk bucket. newBucketPos = jti->second; assert(newBucketPos < MEMBIT); if (newBucketPos > 0) DEBUGUTXO << " + Restoring old buckets disk pos" << newBucketPos << "shortHash" << Log::Hex <= 0); for (const OutputRef &ref : iter.value().unspentOutputs) { delete ref.unspentOutput; } m_buckets.erase(iter); } #ifndef NDEBUG // validate state of jumptable for (int i = 0; i < 0x100000; ++i) { uint32_t bucketId = m_jumptables[i]; assert((bucketId < MEMBIT) || (bucketId & MEMMASK) <= m_lastCommittedBucketIndex); if (bucketId >= MEMBIT) { auto holder = m_buckets.lock(bucketId & MEMMASK); assert(*holder); } } for (auto iter = m_buckets.begin(); iter != m_buckets.end(); ++iter) { const int bucketId = iter.key(); assert(static_cast(bucketId) <= MEMBIT); Bucket *bucket = &iter.value(); assert(!bucket->unspentOutputs.empty()); const auto shortHash = createShortHash(bucket->unspentOutputs.begin()->cheapHash); assert(shortHash < 0x100000); assert(m_jumptables[shortHash] >= MEMBIT); assert(m_jumptables[shortHash] == static_cast(bucketId) + MEMBIT); } #endif for (auto jti = m_committedBucketLocations.begin(); jti != m_committedBucketLocations.end(); ++jti) { if (m_jumptables[jti->first] == 0) { DEBUGUTXO << "Restoring jumptable to on-disk bucket" << jti->first << jti->second; m_jumptables[jti->first] = jti->second; } } for (auto iter = m_buckets.begin(); iter != m_buckets.end(); ++iter) { Bucket &bucket = iter.value(); uint32_t lastCommittedLeafIndex = m_lastCommittedLeafIndex | MEMBIT; for (auto refIter = bucket.unspentOutputs.begin(); refIter != bucket.unspentOutputs.end();) { if (refIter->leafPos > lastCommittedLeafIndex) { DEBUGUTXO << "Rolling back adding a leaf:" << (refIter->leafPos & MEMMASK) << refIter->unspentOutput->prevTxId() << refIter->unspentOutput->outIndex() << Log::Hex <<"shortHash"; refIter = bucket.unspentOutputs.erase(refIter); } else { ++refIter; } } } for (auto leaf : m_leafsBackup) { // reinsert deleted leafs const uint256 txid = leaf->prevTxId(); const uint32_t shortHash = createShortHash(txid); const std::int32_t leafPos = m_nextLeafIndex.fetch_add(1); DEBUGUTXO << "Rolling back removing a leaf:" << txid << leaf->outIndex() << "ShortHash:" << Log::Hex <= MEMBIT) { // add leaf BucketHolder bh = m_buckets.lock(static_cast(bucketId & MEMMASK)); bucket = *bh; assert(bucket); } else { // bucket is not in memory DEBUGUTXO << " + reloading a bucket from disk for this"; Bucket memBucket; memBucket.fillFromDisk(Streaming::ConstBuffer(m_buffer, m_buffer.get() + bucketId, m_buffer.get() + m_file.size()), static_cast(bucketId)); const int bucketIndex = m_nextBucketIndex.fetch_add(1); m_jumptables[shortHash] = static_cast(bucketIndex) + MEMBIT; BucketHolder bh = m_buckets.lock(bucketIndex); assert(*bh == nullptr); bh.insertBucket(bucketIndex, std::move(memBucket)); bucket = *bh; } bucket->unspentOutputs.push_back( OutputRef(txid.GetCheapHash(), static_cast(leafPos) + MEMBIT, leaf)); bucket->saveAttempt = 0; } for (auto outRef : m_leafIdsBackup) { // reinsert deleted leafs-ids (aka pos-on-disk for saved ones) const uint32_t shortHash = createShortHash(outRef.cheapHash); DEBUGUTXO << "Rolling back removing a leaf (from disk). pos:" << outRef.leafPos << "ShortHash:" << Log::Hex <= MEMBIT) { // add leaf BucketHolder bh = m_buckets.lock(static_cast(bucketId & MEMMASK)); bucket = *bh; assert(bucket); } else { // bucket is not in memory DEBUGUTXO << " + reloading a bucket from disk for this"; Bucket memBucket; memBucket.fillFromDisk(Streaming::ConstBuffer(m_buffer, m_buffer.get() + bucketId, m_buffer.get() + m_file.size()), static_cast(bucketId)); const int bucketIndex = m_nextBucketIndex.fetch_add(1); m_jumptables[shortHash] = static_cast(bucketIndex) + MEMBIT; BucketHolder bh = m_buckets.lock(bucketIndex); bh.insertBucket(bucketIndex, std::move(memBucket)); bucket = *bh; } bucket->unspentOutputs.push_back(outRef); bucket->saveAttempt = 0; } #ifndef NDEBUG // make sure that the newly inserted leafs are reachable for (auto leaf : m_leafsBackup) { const uint256 txid = leaf->prevTxId(); const uint32_t shortHash = createShortHash(txid); assert(shortHash < 0x100000); assert (m_jumptables[shortHash]); uint32_t bucketId = m_jumptables[shortHash]; assert (bucketId >= MEMBIT); BucketHolder bh = m_buckets.lock(bucketId & MEMMASK); assert(*bh); bool found = false; for (OutputRef rev : bh->unspentOutputs) { if (rev.leafPos > MEMBIT) { assert(rev.unspentOutput); found = rev.unspentOutput == leaf; if (found) break; } } assert (found); } // validate state of jumptable for (int i = 0; i < 0x100000; ++i) { uint32_t bucketId = m_jumptables[i]; assert(bucketId < MEMBIT || static_cast(bucketId & MEMMASK) < m_nextBucketIndex); if (bucketId >= MEMBIT) { auto holder = m_buckets.lock(bucketId & MEMMASK); assert(*holder); } } for (auto iter = m_buckets.begin(); iter != m_buckets.end(); ++iter) { const int bucketId = iter.key(); assert(bucketId >= 0); assert(static_cast(bucketId) <= MEMBIT); Bucket *bucket = &iter.value(); assert(!bucket->unspentOutputs.empty()); const auto shortHash = createShortHash(bucket->unspentOutputs.begin()->cheapHash); assert(shortHash < 0x100000); assert(m_jumptables[shortHash] >= MEMBIT); assert(m_jumptables[shortHash] == static_cast(bucketId) + MEMBIT); } #endif m_leafsBackup.clear(); // clear these as the pointers have moved ownership m_leafIdsBackup.clear(); m_changeCountBlock.store(0); commit(nullptr); } void DataFile::addChange(int count) { m_changeCountBlock.fetch_add(count); } bool DataFile::openInfo(int targetHeight) { DataFileCache cache(m_path); DataFileCache::InfoFile candidate; for (auto info : cache.m_validInfoFiles) { if (info.lastBlockHeight <= targetHeight && info.lastBlockHeight > candidate.lastBlockHeight) candidate = info; } if (candidate.lastBlockHeight > 0) return cache.load(candidate, this); return false; } DataFile *DataFile::createDatafile(const boost::filesystem::path &filename, int firstBlockHeight, const uint256 &firstHash) { auto dbFile = filename; dbFile.concat(".db"); auto status = boost::filesystem::status(dbFile); if (status.type() != boost::filesystem::regular_file) { // doens't exist (yet) if (status.type() != boost::filesystem::file_not_found) { // removing non-file in its place. We don't delete directories, though. That sounds too dangerous. bool removed = boost::filesystem::remove(dbFile); if (!removed) { logFatal() << "Failed to create datafile, removing non-file failed"; throw UTXOInternalError("Failed to replace non-file"); } } // now create the file. assert(!filename.parent_path().string().empty()); boost::system::error_code error; boost::filesystem::create_directories(filename.parent_path(), error); boost::filesystem::ofstream file(dbFile); file.close(); boost::filesystem::resize_file(dbFile, UODBPrivate::limits.DBFileSize); } DataFile *df = new DataFile(filename); df->m_initialBlockHeight = firstBlockHeight; df->m_lastBlockHeight = firstBlockHeight; df->m_lastBlockHash = firstHash; df->m_dbIsTip = true; return df; } ///////////////////////////////////////////////////////////////////////// DataFileCache::DataFileCache(const boost::filesystem::path &baseFilename) : m_baseFilename(baseFilename) { for (int i = 1; i < MAX_INFO_NUM; ++i) { InfoFile info = parseInfoFile(i); if (info.initialBlockHeight >= 0) { m_validInfoFiles.push_back(info); } } } DataFileCache::InfoFile DataFileCache::parseInfoFile(int index) const { assert(index >= 0); std::stringstream ss; ss << '.' << index << ".info"; std::ifstream in(m_baseFilename.string() + ss.str(), std::ios::binary | std::ios::in); InfoFile answer; std::shared_ptr buf(new char[32], std::default_delete()); answer.index = index; if (in.is_open()) { in.read(buf.get(), 32); Streaming::MessageParser parser(Streaming::ConstBuffer(buf, buf.get(), buf.get() + 32)); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == UODB::LastBlockHeight) answer.lastBlockHeight = parser.intData(); else if (parser.tag() == UODB::FirstBlockHeight) answer.initialBlockHeight = parser.intData(); else break; } } return answer; } std::string DataFileCache::writeInfoFile(DataFile *source) { assert(source->m_writeBuffer.get()); // if number of m_validInfoFiles are more than MAX_INFO_FILES // delete the one with the lowest / oldest 'lastBlockHeight' while (m_validInfoFiles.size() > MAX_INFO_FILES) { auto iter = m_validInfoFiles.begin(); auto lowest = iter++; while (iter != m_validInfoFiles.end()) { if (iter->lastBlockHeight < lowest->lastBlockHeight) lowest = iter; ++iter; } boost::filesystem::remove(filenameFor(lowest->index)); m_validInfoFiles.erase(lowest); } int newIndex = 1; // the index we use for the new info file for (auto i : m_validInfoFiles) { newIndex = std::max(newIndex, i.index); } if (++newIndex >= MAX_INFO_NUM) { // ah, we have to loop around and start from 1. newIndex = 1; bool found; do { found = false; // then find the first unused index for (auto i : m_validInfoFiles) { if (i.index == newIndex) { found = true; ++newIndex; break; } } } while (found); } assert(newIndex > 0); assert(newIndex < MAX_INFO_NUM); boost::filesystem::remove(filenameFor(newIndex)); std::string outFile = filenameFor(newIndex).string(); std::ofstream out(outFile, std::ios::binary | std::ios::out | std::ios::trunc); if (!out.is_open()) throw UTXOInternalError("Failed to open UTXO info file for writing"); Streaming::MessageBuilder builder(Streaming::NoHeader, 150 + source->m_rejectedBlocks.size() * 34); builder.add(UODB::FirstBlockHeight, source->m_initialBlockHeight); builder.add(UODB::LastBlockHeight, source->m_lastBlockHeight); builder.add(UODB::LastBlockId, source->m_lastBlockHash); assert(source->m_writeBuffer.get()); builder.add(UODB::PositionInFile, source->m_writeBuffer->offset()); builder.add(UODB::ChangesSincePrune, source->m_changesSincePrune); if (source->m_initialBucketSize > 0) builder.add(UODB::InitialBucketSegmentSize, source->m_initialBucketSize); builder.add(UODB::IsTip, source->m_dbIsTip); if (source->m_dbIsTip) { for (auto blockId : source->m_rejectedBlocks) { builder.add(UODB::InvalidBlockHash, blockId); } } CHash256 ctx; ctx.write(reinterpret_cast(source->m_jumptables), sizeof(source->m_jumptables)); uint256 result; ctx.finalize(reinterpret_cast(&result)); builder.add(UODB::JumpTableHash, result); builder.add(UODB::Separator, true); Streaming::ConstBuffer header = builder.buffer(); out.write(header.constData(), header.size()); out.write(reinterpret_cast(source->m_jumptables), sizeof(source->m_jumptables)); out.flush(); return outFile; } bool DataFileCache::load(const DataFileCache::InfoFile &info, DataFile *target) { logInfo() << "Loading" << filenameFor(info.index).string(); assert(info.index >= 0); std::ifstream in(filenameFor(info.index).string(), std::ios::binary | std::ios::in); if (!in.is_open()) return false; int posOfJumptable = 0; uint256 checksum; { std::shared_ptr buf(new char[256], std::default_delete()); in.read(buf.get(), 256); Streaming::MessageParser parser(Streaming::ConstBuffer(buf, buf.get(), buf.get() + 256)); while (parser.next() == Streaming::FoundTag) { if (parser.tag() == UODB::LastBlockHeight) target->m_lastBlockHeight = parser.intData(); else if (parser.tag() == UODB::FirstBlockHeight) target->m_initialBlockHeight = parser.intData(); else if (parser.tag() == UODB::LastBlockId) target->m_lastBlockHash = parser.uint256Data(); else if (parser.tag() == UODB::JumpTableHash) checksum = parser.uint256Data(); else if (parser.tag() == UODB::ChangesSincePrune) target->m_changesSincePrune = parser.intData(); else if (parser.tag() == UODB::InitialBucketSegmentSize) target->m_initialBucketSize = parser.intData(); else if (parser.tag() == UODB::PositionInFile) { target->m_writeBuffer = std::make_shared(target->m_buffer, static_cast(target->m_file.size()), true); target->m_writeBuffer->markUsed(parser.intData()); target->m_writeBuffer->forget(parser.intData()); } else if (parser.tag() == UODB::InvalidBlockHash) { assert(parser.isByteArray() && parser.dataLength() == 32); target->m_rejectedBlocks.insert(parser.uint256Data()); } else if (parser.tag() == UODB::Separator) break; else if (parser.tag() != UODB::IsTip) // isTip is purely for external tools, we don't trust that one. logDebug() << "UTXO info file has unrecognized tag" << parser.tag(); } posOfJumptable = parser.consumed(); } in.seekg(posOfJumptable); in.read(reinterpret_cast(target->m_jumptables), sizeof(target->m_jumptables)); logDebug() << "Loaded" << filenameFor(info.index).string(); logDebug() << "Block from" << target->m_initialBlockHeight << "to" << target->m_lastBlockHeight << "changes since prune" << target->m_changesSincePrune; CHash256 ctx; ctx.write(reinterpret_cast(target->m_jumptables), sizeof(target->m_jumptables)); uint256 result; ctx.finalize(reinterpret_cast(&result)); return result == checksum; } boost::filesystem::path DataFileCache::filenameFor(int index) const { std::stringstream ss; ss << '.' << index << ".info"; auto result (m_baseFilename); result.concat(ss.str()); return result; }