Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class l1_reader_fixture : public seastar_test {
.value();
}

_metastore.add_objects(std::move(meta_builder), term_map).get().value();
_metastore.add_objects(meta_builder, term_map).get().value();
}

model::record_batch_reader make_reader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ TEST_F(l1_reader_test, missing_object) {
.first_offset = kafka::offset{0},
});

_metastore.add_objects(std::move(builder), term_map).get().value();
_metastore.add_objects(builder, term_map).get().value();

auto reader = make_reader(ntp, tidp);
EXPECT_THROW(read_all(std::move(reader)), std::runtime_error);
Expand Down Expand Up @@ -364,7 +364,7 @@ TEST_F(l1_reader_test, empty_offset_range) {
.term = model::term_id{1},
.first_offset = high_watermark,
});
_metastore.add_objects(std::move(meta_builder), term_map).get().value();
_metastore.add_objects(meta_builder, term_map).get().value();

// Write some objects after the empty object.
auto final_batches
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_topics/level_one/metastore/metastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class metastore {
// response should be examined to determine if subsequent add_objects()
// requests need to be re-aligned to a different offset.
virtual ss::future<std::expected<add_response, errc>> add_objects(
std::unique_ptr<object_metadata_builder>, const term_offset_map_t&)
const std::unique_ptr<object_metadata_builder>&, const term_offset_map_t&)
= 0;

// Adds the given objects to the metastore, expecting that the new extents
Expand All @@ -183,7 +183,7 @@ class metastore {
// correctness, these simplify accounting and makes it easier to validate
// that we haven't lost data.
virtual ss::future<std::expected<void, errc>>
replace_objects(std::unique_ptr<object_metadata_builder>) = 0;
replace_objects(const std::unique_ptr<object_metadata_builder>&) = 0;

// Moves the start offset of the given partition's log to the given offset.
virtual ss::future<std::expected<void, errc>>
Expand Down Expand Up @@ -278,7 +278,7 @@ class metastore {
// compaction metadata. See get_compaction_offsets() for more details on
// expected usage.
virtual ss::future<std::expected<void, errc>> compact_objects(
std::unique_ptr<object_metadata_builder>, const compaction_map_t&)
const std::unique_ptr<object_metadata_builder>&, const compaction_map_t&)
= 0;

// Returns metadata required to determine what to compact for the given
Expand Down
119 changes: 46 additions & 73 deletions src/v/cloud_topics/level_one/metastore/replicated_metastore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ class replicated_object_builder : public metastore::object_metadata_builder {

private:
friend class cloud_topics::l1::replicated_metastore;
std::expected<
chunked_hash_map<
model::partition_id,
chunked_vector<metastore::object_metadata>>,
error>
release();

struct partitioned_objects {
chunked_hash_map<
Expand Down Expand Up @@ -196,33 +190,6 @@ replicated_object_builder::finish(
return {};
}

std::expected<
chunked_hash_map<
model::partition_id,
chunked_vector<metastore::object_metadata>>,
replicated_object_builder::error>
replicated_object_builder::release() {
for (const auto& [partition_id, partition_objects] : partitions_) {
if (!partition_objects.pending_objects_.empty()) {
return std::unexpected(error{"Unfinished objects remain"});
}
}

chunked_hash_map<
model::partition_id,
chunked_vector<metastore::object_metadata>>
result;
for (auto& [partition_id, partition_objects] : partitions_) {
chunked_vector<metastore::object_metadata> metas;
for (auto& obj : partition_objects.finished_objects_) {
metas.push_back(std::move(obj));
}
result.emplace(partition_id, std::move(metas));
}

partitions_.clear();
return result;
}
} // anonymous namespace

replicated_metastore::replicated_metastore(frontend& fe)
Expand Down Expand Up @@ -258,18 +225,19 @@ replicated_metastore::get_offsets(const model::topic_id_partition& tidp) {

ss::future<std::expected<metastore::add_response, metastore::errc>>
replicated_metastore::add_objects(
std::unique_ptr<metastore::object_metadata_builder> builder,
const std::unique_ptr<metastore::object_metadata_builder>& builder,
const metastore::term_offset_map_t& terms) {
auto replicated_builder = std::unique_ptr<replicated_object_builder>(
static_cast<replicated_object_builder*>(builder.release()));

auto objects_result = replicated_builder->release();
if (!objects_result.has_value()) {
vlog(
cd_log.error,
"Error while sending request: {}",
objects_result.error());
co_return std::unexpected(metastore::errc::invalid_request);
auto* replicated_builder = static_cast<replicated_object_builder*>(
builder.get());

for (const auto& [partition_id, partition_objects] :
replicated_builder->partitions_) {
if (!partition_objects.pending_objects_.empty()) {
vlog(
cd_log.error,
"Error while sending request: unfinished objects remain");
co_return std::unexpected(metastore::errc::invalid_request);
}
}
chunked_hash_map<model::partition_id, term_state_update_t>
partitioned_terms;
Expand All @@ -286,7 +254,8 @@ replicated_metastore::add_objects(
}
}
add_response resp;
for (auto& [partition_id, partition_objects] : objects_result.value()) {
for (auto& [partition_id, partition_objects] :
replicated_builder->partitions_) {
auto terms_it = partitioned_terms.find(partition_id);
if (terms_it == partitioned_terms.end()) {
// TODO: consider making this less strict, down to the STM layer?
Expand All @@ -300,7 +269,7 @@ replicated_metastore::add_objects(
req.metastore_partition = partition_id;
req.new_terms = std::move(terms_it->second);
chunked_vector<new_object> new_objects;
for (auto& obj : partition_objects) {
for (auto& obj : partition_objects.finished_objects_) {
new_objects.emplace_back(meta_to_rpc_obj(obj));
}
req.new_objects = std::move(new_objects);
Expand All @@ -325,23 +294,25 @@ replicated_metastore::add_objects(

ss::future<std::expected<void, metastore::errc>>
replicated_metastore::replace_objects(
std::unique_ptr<metastore::object_metadata_builder> builder) {
auto replicated_builder = std::unique_ptr<replicated_object_builder>(
static_cast<replicated_object_builder*>(builder.release()));

auto objects_result = replicated_builder->release();
if (!objects_result) {
vlog(
cd_log.error,
"Error while sending request: {}",
objects_result.error());
co_return std::unexpected(metastore::errc::invalid_request);
const std::unique_ptr<metastore::object_metadata_builder>& builder) {
auto* replicated_builder = static_cast<replicated_object_builder*>(
builder.get());

for (const auto& [partition_id, partition_objects] :
replicated_builder->partitions_) {
if (!partition_objects.pending_objects_.empty()) {
vlog(
cd_log.error,
"Error while sending request: unfinished objects remain");
co_return std::unexpected(metastore::errc::invalid_request);
}
}
for (auto& [partition_id, partition_objects] : objects_result.value()) {
for (auto& [partition_id, partition_objects] :
replicated_builder->partitions_) {
rpc::replace_objects_request req;
req.metastore_partition = partition_id;
chunked_vector<new_object> new_objects;
for (auto& obj : partition_objects) {
for (auto& obj : partition_objects.finished_objects_) {
new_objects.emplace_back(meta_to_rpc_obj(obj));
}
req.new_objects = std::move(new_objects);
Expand Down Expand Up @@ -514,18 +485,19 @@ replicated_metastore::get_end_offset_for_term(

ss::future<std::expected<void, metastore::errc>>
replicated_metastore::compact_objects(
std::unique_ptr<metastore::object_metadata_builder> builder,
const std::unique_ptr<metastore::object_metadata_builder>& builder,
const metastore::compaction_map_t& compaction_updates) {
auto replicated_builder = std::unique_ptr<replicated_object_builder>(
static_cast<replicated_object_builder*>(builder.release()));

auto objects_result = replicated_builder->release();
if (!objects_result) {
vlog(
cd_log.error,
"Error while sending request: {}",
objects_result.error());
co_return std::unexpected(metastore::errc::invalid_request);
auto* replicated_builder = static_cast<replicated_object_builder*>(
builder.get());

for (const auto& [partition_id, partition_objects] :
replicated_builder->partitions_) {
if (!partition_objects.pending_objects_.empty()) {
vlog(
cd_log.error,
"Error while sending request: unfinished objects remain");
co_return std::unexpected(metastore::errc::invalid_request);
}
}
chunked_hash_map<
model::partition_id,
Expand All @@ -537,7 +509,7 @@ replicated_metastore::compact_objects(
vlog(cd_log.warn, "Unable to get metastore partition for {}", tp);
co_return std::unexpected(errc::transport_error);
}
if (!objects_result->contains(*metastore_partition)) {
if (!replicated_builder->partitions_.contains(*metastore_partition)) {
vlog(
cd_log.error,
"Expected objects for partition {}",
Expand All @@ -547,11 +519,12 @@ replicated_metastore::compact_objects(
compaction_updates_by_partition[*metastore_partition].emplace(
tp, meta_to_rpc_compact_update(update));
}
for (auto& [partition_id, partition_objects] : objects_result.value()) {
for (auto& [partition_id, partition_objects] :
replicated_builder->partitions_) {
rpc::replace_objects_request req;
req.metastore_partition = partition_id;
chunked_vector<new_object> new_objects;
for (auto& obj : partition_objects) {
for (auto& obj : partition_objects.finished_objects_) {
new_objects.emplace_back(meta_to_rpc_obj(obj));
}
req.new_objects = std::move(new_objects);
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_topics/level_one/metastore/replicated_metastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class replicated_metastore : public metastore {
get_offsets(const model::topic_id_partition&) override;

ss::future<std::expected<add_response, errc>> add_objects(
std::unique_ptr<object_metadata_builder>,
const std::unique_ptr<object_metadata_builder>&,
const term_offset_map_t&) override;

ss::future<std::expected<void, errc>>
replace_objects(std::unique_ptr<object_metadata_builder>) override;
replace_objects(const std::unique_ptr<object_metadata_builder>&) override;

ss::future<std::expected<void, errc>>
set_start_offset(const model::topic_id_partition&, kafka::offset) override;
Expand All @@ -53,7 +53,7 @@ class replicated_metastore : public metastore {
const model::topic_id_partition&, kafka::offset) override;

ss::future<std::expected<void, errc>> compact_objects(
std::unique_ptr<object_metadata_builder>,
const std::unique_ptr<object_metadata_builder>&,
const compaction_map_t&) override;
ss::future<std::expected<void, errc>> compact_objects(
const chunked_vector<object_metadata>&, const compaction_map_t&);
Expand Down
40 changes: 24 additions & 16 deletions src/v/cloud_topics/level_one/metastore/simple_metastore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ simple_object_builder::release() {
if (!pending_objects_.empty()) {
return std::unexpected(
error{fmt::format(
"Builder still has {} pending object", pending_objects_.size())});
"Builder still has {} pending object(s)",
pending_objects_.size())});
}
return std::exchange(finished_objects_, {});
}
Expand Down Expand Up @@ -148,15 +149,17 @@ simple_metastore::get_offsets(

ss::future<std::expected<metastore::add_response, metastore::errc>>
simple_metastore::add_objects(
std::unique_ptr<metastore::object_metadata_builder> builder,
const std::unique_ptr<metastore::object_metadata_builder>& builder,
const term_offset_map_t& terms) {
auto* simple_builder = dynamic_cast<simple_object_builder*>(builder.get());
auto objects_res = simple_builder->release();
if (!objects_res.has_value()) {
vlog(cd_log.error, "Failed to add: {}", objects_res.error());
if (!simple_builder->pending_objects_.empty()) {
vlog(
cd_log.error,
"Failed to add: builder still has {} pending object(s)",
simple_builder->pending_objects_.size());
co_return std::unexpected(metastore::errc::invalid_request);
}
co_return co_await add_objects(objects_res.value(), terms);
co_return co_await add_objects(simple_builder->finished_objects_, terms);
}

ss::future<std::expected<metastore::add_response, metastore::errc>>
Expand Down Expand Up @@ -188,14 +191,16 @@ simple_metastore::add_objects(

ss::future<std::expected<void, metastore::errc>>
simple_metastore::replace_objects(
std::unique_ptr<metastore::object_metadata_builder> builder) {
const std::unique_ptr<metastore::object_metadata_builder>& builder) {
auto* simple_builder = dynamic_cast<simple_object_builder*>(builder.get());
auto objects_res = simple_builder->release();
if (!objects_res.has_value()) {
vlog(cd_log.error, "Failed to replace: {}", objects_res.error());
if (!simple_builder->pending_objects_.empty()) {
vlog(
cd_log.error,
"Failed to replace: builder still has {} pending object(s)",
simple_builder->pending_objects_.size());
co_return std::unexpected(metastore::errc::invalid_request);
}
co_return co_await replace_objects(objects_res.value());
co_return co_await replace_objects(simple_builder->finished_objects_);
}

ss::future<std::expected<void, metastore::errc>>
Expand Down Expand Up @@ -455,15 +460,18 @@ simple_metastore::compact_objects(

ss::future<std::expected<void, metastore::errc>>
simple_metastore::compact_objects(
std::unique_ptr<metastore::object_metadata_builder> builder,
const std::unique_ptr<metastore::object_metadata_builder>& builder,
const compaction_map_t& compaction_metas) {
auto* simple_builder = dynamic_cast<simple_object_builder*>(builder.get());
auto objects_res = simple_builder->release();
if (!objects_res.has_value()) {
vlog(cd_log.error, "Failed to compact: {}", objects_res.error());
if (!simple_builder->pending_objects_.empty()) {
vlog(
cd_log.error,
"Failed to compact: builder still has {} pending object(s)",
simple_builder->pending_objects_.size());
co_return std::unexpected(metastore::errc::invalid_request);
}
co_return co_await compact_objects(objects_res.value(), compaction_metas);
co_return co_await compact_objects(
simple_builder->finished_objects_, compaction_metas);
}

ss::future<
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_topics/level_one/metastore/simple_metastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ class simple_metastore : public metastore {
get_offsets(const model::topic_id_partition&) override;

ss::future<std::expected<add_response, errc>> add_objects(
std::unique_ptr<object_metadata_builder>,
const std::unique_ptr<object_metadata_builder>&,
const term_offset_map_t&) override;
ss::future<std::expected<add_response, errc>> add_objects(
const chunked_vector<object_metadata>&, const term_offset_map_t&);

ss::future<std::expected<void, errc>>
replace_objects(std::unique_ptr<object_metadata_builder>) override;
replace_objects(const std::unique_ptr<object_metadata_builder>&) override;
ss::future<std::expected<void, errc>>
replace_objects(const chunked_vector<object_metadata>&);

Expand All @@ -86,7 +86,7 @@ class simple_metastore : public metastore {
const model::topic_id_partition&, kafka::offset) override;

ss::future<std::expected<void, errc>> compact_objects(
std::unique_ptr<object_metadata_builder>,
const std::unique_ptr<object_metadata_builder>&,
const compaction_map_t&) override;
ss::future<std::expected<void, errc>> compact_objects(
const chunked_vector<object_metadata>&, const compaction_map_t&);
Expand Down
Loading