Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 114 additions & 22 deletions src/host/ledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ namespace asynchost
FILE* file = nullptr;
ccf::pal::Mutex file_lock;

static constexpr uint64_t truncation_marker_size =
(1ULL << ccf::kv::SerialisedEntryHeader::BITS_FOR_SIZE) - 1;

size_t start_idx = 1;
size_t total_len = 0; // Points to end of last written entry
std::vector<uint32_t> positions;
Expand All @@ -104,6 +107,91 @@ namespace asynchost
// checked against the existing ones, until a divergence is found.
bool from_existing_file = false;

[[nodiscard]] size_t get_physical_file_size()
{
if (fseeko(file, 0, SEEK_END) != 0)
{
throw std::logic_error(fmt::format(
"Failed to seek to end of ledger file {}: {}",
file_name,
ccf::nonstd::strerror(errno)));
}

const auto physical_size = ftello(file);
if (physical_size < 0)
{
throw std::logic_error(fmt::format(
"Failed to read size of ledger file {}: {}",
file_name,
ccf::nonstd::strerror(errno)));
}

return static_cast<size_t>(physical_size);
}

void truncate_physical_file(size_t size)
{
const auto physical_size = get_physical_file_size();
if (physical_size == size)
{
return;
}

const auto fd = fileno(file);
if (fd == -1)
{
throw std::logic_error(fmt::format(
"Failed to get file descriptor for ledger file {}: {}",
file_name,
ccf::nonstd::strerror(errno)));
}

TimeBoundLogger log_if_slow(
fmt::format("Truncating ledger file - ftruncate({})", file_name));
if (ftruncate(fd, size) != 0)
{
throw std::logic_error(fmt::format(
"Failed to truncate ledger: {}", ccf::nonstd::strerror(errno)));
}
}

void write_truncation_marker(size_t physical_size)
{
// If there is no complete entry header beyond the logical end, recovery
// will already stop at total_len.
if (physical_size < total_len + ccf::kv::serialised_entry_header_size)
{
return;
}

if (fseeko(file, total_len, SEEK_SET) != 0)
{
throw std::logic_error(fmt::format(
"Failed to seek to truncation marker at logical end {} in ledger "
"file {}: {}",
total_len,
file_name,
ccf::nonstd::strerror(errno)));
}

ccf::kv::SerialisedEntryHeader marker;
// Use the largest encodable entry size so recovery sees a complete
// header whose payload cannot fit in the remaining file tail, and stops
// before recovering stale entries.
marker.set_size(truncation_marker_size);

TimeBoundLogger log_if_slow(fmt::format(
"Writing ledger truncation marker - fwrite({})", file_name));
if (fwrite(&marker, sizeof(marker), 1, file) != 1)
{
throw std::logic_error(fmt::format(
"Failed to write {}-byte truncation marker to ledger file {}: {}",
sizeof(marker),
file_name,
ccf::nonstd::strerror(errno)));
}
}

public:
// Used when creating a new (empty) ledger file
LedgerFile(const fs::path& dir, size_t start_idx, bool recovery = false) :
Expand Down Expand Up @@ -564,6 +652,9 @@ namespace asynchost
positions.resize(idx - start_idx + 1);
}

const auto physical_size = get_physical_file_size();
write_truncation_marker(physical_size);

{
TimeBoundLogger log_if_slow(
fmt::format("Flushing truncated ledger - fflush({})", file_name));
Expand All @@ -574,17 +665,13 @@ namespace asynchost
}
}

if (fseeko(file, total_len, SEEK_SET) != 0)
{
TimeBoundLogger log_if_slow(
fmt::format("Truncating ledger file - ftruncate({})", file_name));
if (ftruncate(fileno(file), total_len) != 0)
{
throw std::logic_error(fmt::format(
"Failed to truncate ledger: {}", ccf::nonstd::strerror(errno)));
}
throw std::logic_error(fmt::format(
"Failed to seek to logical end of ledger file {}: {}",
file_name,
ccf::nonstd::strerror(errno)));
}

fseeko(file, total_len, SEEK_SET);
LOG_TRACE_FMT("Truncated ledger file {} at seqno {}", file_name, idx);
return false;
}
Expand All @@ -595,21 +682,24 @@ namespace asynchost
{
return;
}
// It may happen (e.g. during recovery) that the incomplete ledger gets
// truncated on the primary, so we have to make sure that whenever we
// complete the file it doesn't contain anything past the last_idx, which
// can happen on the follower unless explicitly truncated before
// completion. This is only necessary when the file was recovered from an
// existing file on disk (from_existing_file is true). For fresh files,
// total_len always matches the physical file size, so avoid a potentially
// expensive truncate.
if (from_existing_file)
if (fseeko(file, total_len, SEEK_SET) != 0)
{
throw std::logic_error(fmt::format(
"Failed to seek to positions table offset in ledger file {}: {}",
file_name,
ccf::nonstd::strerror(errno)));
}
const auto raw_table_offset = ftello(file);
if (raw_table_offset < 0)
{
truncate(get_last_idx(), /* remove_file_if_empty = */ false);
throw std::logic_error(fmt::format(
"Failed to read positions table offset in ledger file {}: {}",
file_name,
ccf::nonstd::strerror(errno)));
}

fseeko(file, total_len, SEEK_SET);
size_t table_offset = ftello(file);
const auto table_offset = static_cast<size_t>(raw_table_offset);
const auto completed_file_size =
table_offset + positions.size() * sizeof(positions.at(0));

{
TimeBoundLogger log_if_slow(fmt::format(
Expand Down Expand Up @@ -653,6 +743,8 @@ namespace asynchost
}
}

truncate_physical_file(completed_file_size);

LOG_TRACE_FMT("Completed ledger file {}", file_name);

completed = true;
Expand Down
53 changes: 53 additions & 0 deletions src/host/test/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,27 @@ size_t number_of_recovery_files_in_ledger_dir()
return recovery_file_count;
}

fs::path require_single_ledger_file_path()
{
fs::path ledger_file;
size_t file_count = 0;
for (auto const& f : fs::directory_iterator(ledger_dir))
{
if (file_count == 0)
{
ledger_file = f.path();
}
file_count++;
if (file_count > 1)
{
break;
}
}

REQUIRE(file_count == 1);
return ledger_file;
}

void verify_framed_entries_range(
const asynchost::LedgerReadResult& read_result, size_t from, size_t to)
{
Expand Down Expand Up @@ -899,6 +920,38 @@ TEST_CASE("Truncation")
}
}

TEST_CASE("Truncation defers physical file shrink")
{
auto dir = AutoDeleteFolder(ledger_dir);

Ledger ledger(ledger_dir, wf);
TestEntrySubmitter entry_submitter(ledger, 1024);

for (size_t i = 0; i < 5; ++i)
{
entry_submitter.write(true);
}

const auto ledger_file_path = require_single_ledger_file_path();
const auto original_file_size = fs::file_size(ledger_file_path);

entry_submitter.truncate(2);
REQUIRE(fs::file_size(ledger_file_path) == original_file_size);

{
Ledger restored_ledger(ledger_dir, wf);
read_entries_range_from_ledger(restored_ledger, 1, 2);
REQUIRE(restored_ledger.get_last_idx() == 2);
}

TestEntrySubmitter post_truncation_submitter(ledger, 1024, 2);
post_truncation_submitter.write(true, ccf::kv::FORCE_LEDGER_CHUNK_AFTER);
ledger.commit(3);

REQUIRE(
fs::file_size(require_single_ledger_file_path()) < original_file_size);
}

TEST_CASE("Commit")
{
auto dir = AutoDeleteFolder(ledger_dir);
Expand Down
Loading