diff --git a/src/v/cloud_topics/level_one/frontend_reader/tests/l1_reader_fixture.h b/src/v/cloud_topics/level_one/frontend_reader/tests/l1_reader_fixture.h index b6c4fc2300bb4..c8351cabdc4b9 100644 --- a/src/v/cloud_topics/level_one/frontend_reader/tests/l1_reader_fixture.h +++ b/src/v/cloud_topics/level_one/frontend_reader/tests/l1_reader_fixture.h @@ -125,7 +125,7 @@ class l1_reader_fixture : public seastar_test { .value(); } - _metastore.add_objects(std::move(meta_builder), term_map).get().value(); + _metastore.add_objects(meta_builder, term_map).get().value(); } model::record_batch_reader make_reader( diff --git a/src/v/cloud_topics/level_one/frontend_reader/tests/reader_test.cc b/src/v/cloud_topics/level_one/frontend_reader/tests/reader_test.cc index 65caf0a0870a2..71da53a0ce119 100644 --- a/src/v/cloud_topics/level_one/frontend_reader/tests/reader_test.cc +++ b/src/v/cloud_topics/level_one/frontend_reader/tests/reader_test.cc @@ -304,7 +304,7 @@ TEST_F(l1_reader_test, missing_object) { .first_offset = kafka::offset{0}, }); - _metastore.add_objects(std::move(builder), term_map).get().value(); + _metastore.add_objects(builder, term_map).get().value(); auto reader = make_reader(ntp, tidp); EXPECT_THROW(read_all(std::move(reader)), std::runtime_error); @@ -364,7 +364,7 @@ TEST_F(l1_reader_test, empty_offset_range) { .term = model::term_id{1}, .first_offset = high_watermark, }); - _metastore.add_objects(std::move(meta_builder), term_map).get().value(); + _metastore.add_objects(meta_builder, term_map).get().value(); // Write some objects after the empty object. auto final_batches diff --git a/src/v/cloud_topics/level_one/metastore/metastore.h b/src/v/cloud_topics/level_one/metastore/metastore.h index d89dcfcc5fae2..457d2748fc1db 100644 --- a/src/v/cloud_topics/level_one/metastore/metastore.h +++ b/src/v/cloud_topics/level_one/metastore/metastore.h @@ -171,7 +171,7 @@ class metastore { // response should be examined to determine if subsequent add_objects() // requests need to be re-aligned to a different offset. virtual ss::future> add_objects( - std::unique_ptr, const term_offset_map_t&) + const std::unique_ptr&, const term_offset_map_t&) = 0; // Adds the given objects to the metastore, expecting that the new extents @@ -183,7 +183,7 @@ class metastore { // correctness, these simplify accounting and makes it easier to validate // that we haven't lost data. virtual ss::future> - replace_objects(std::unique_ptr) = 0; + replace_objects(const std::unique_ptr&) = 0; // Moves the start offset of the given partition's log to the given offset. virtual ss::future> @@ -278,7 +278,7 @@ class metastore { // compaction metadata. See get_compaction_offsets() for more details on // expected usage. virtual ss::future> compact_objects( - std::unique_ptr, const compaction_map_t&) + const std::unique_ptr&, const compaction_map_t&) = 0; // Returns metadata required to determine what to compact for the given diff --git a/src/v/cloud_topics/level_one/metastore/replicated_metastore.cc b/src/v/cloud_topics/level_one/metastore/replicated_metastore.cc index dcccfdda7991e..2e8ab5650536e 100644 --- a/src/v/cloud_topics/level_one/metastore/replicated_metastore.cc +++ b/src/v/cloud_topics/level_one/metastore/replicated_metastore.cc @@ -102,12 +102,6 @@ class replicated_object_builder : public metastore::object_metadata_builder { private: friend class cloud_topics::l1::replicated_metastore; - std::expected< - chunked_hash_map< - model::partition_id, - chunked_vector>, - error> - release(); struct partitioned_objects { chunked_hash_map< @@ -196,33 +190,6 @@ replicated_object_builder::finish( return {}; } -std::expected< - chunked_hash_map< - model::partition_id, - chunked_vector>, - replicated_object_builder::error> -replicated_object_builder::release() { - for (const auto& [partition_id, partition_objects] : partitions_) { - if (!partition_objects.pending_objects_.empty()) { - return std::unexpected(error{"Unfinished objects remain"}); - } - } - - chunked_hash_map< - model::partition_id, - chunked_vector> - result; - for (auto& [partition_id, partition_objects] : partitions_) { - chunked_vector metas; - for (auto& obj : partition_objects.finished_objects_) { - metas.push_back(std::move(obj)); - } - result.emplace(partition_id, std::move(metas)); - } - - partitions_.clear(); - return result; -} } // anonymous namespace replicated_metastore::replicated_metastore(frontend& fe) @@ -258,18 +225,19 @@ replicated_metastore::get_offsets(const model::topic_id_partition& tidp) { ss::future> replicated_metastore::add_objects( - std::unique_ptr builder, + const std::unique_ptr& builder, const metastore::term_offset_map_t& terms) { - auto replicated_builder = std::unique_ptr( - static_cast(builder.release())); - - auto objects_result = replicated_builder->release(); - if (!objects_result.has_value()) { - vlog( - cd_log.error, - "Error while sending request: {}", - objects_result.error()); - co_return std::unexpected(metastore::errc::invalid_request); + auto* replicated_builder = static_cast( + builder.get()); + + for (const auto& [partition_id, partition_objects] : + replicated_builder->partitions_) { + if (!partition_objects.pending_objects_.empty()) { + vlog( + cd_log.error, + "Error while sending request: unfinished objects remain"); + co_return std::unexpected(metastore::errc::invalid_request); + } } chunked_hash_map partitioned_terms; @@ -286,7 +254,8 @@ replicated_metastore::add_objects( } } add_response resp; - for (auto& [partition_id, partition_objects] : objects_result.value()) { + for (auto& [partition_id, partition_objects] : + replicated_builder->partitions_) { auto terms_it = partitioned_terms.find(partition_id); if (terms_it == partitioned_terms.end()) { // TODO: consider making this less strict, down to the STM layer? @@ -300,7 +269,7 @@ replicated_metastore::add_objects( req.metastore_partition = partition_id; req.new_terms = std::move(terms_it->second); chunked_vector new_objects; - for (auto& obj : partition_objects) { + for (auto& obj : partition_objects.finished_objects_) { new_objects.emplace_back(meta_to_rpc_obj(obj)); } req.new_objects = std::move(new_objects); @@ -325,23 +294,25 @@ replicated_metastore::add_objects( ss::future> replicated_metastore::replace_objects( - std::unique_ptr builder) { - auto replicated_builder = std::unique_ptr( - static_cast(builder.release())); - - auto objects_result = replicated_builder->release(); - if (!objects_result) { - vlog( - cd_log.error, - "Error while sending request: {}", - objects_result.error()); - co_return std::unexpected(metastore::errc::invalid_request); + const std::unique_ptr& builder) { + auto* replicated_builder = static_cast( + builder.get()); + + for (const auto& [partition_id, partition_objects] : + replicated_builder->partitions_) { + if (!partition_objects.pending_objects_.empty()) { + vlog( + cd_log.error, + "Error while sending request: unfinished objects remain"); + co_return std::unexpected(metastore::errc::invalid_request); + } } - for (auto& [partition_id, partition_objects] : objects_result.value()) { + for (auto& [partition_id, partition_objects] : + replicated_builder->partitions_) { rpc::replace_objects_request req; req.metastore_partition = partition_id; chunked_vector new_objects; - for (auto& obj : partition_objects) { + for (auto& obj : partition_objects.finished_objects_) { new_objects.emplace_back(meta_to_rpc_obj(obj)); } req.new_objects = std::move(new_objects); @@ -514,18 +485,19 @@ replicated_metastore::get_end_offset_for_term( ss::future> replicated_metastore::compact_objects( - std::unique_ptr builder, + const std::unique_ptr& builder, const metastore::compaction_map_t& compaction_updates) { - auto replicated_builder = std::unique_ptr( - static_cast(builder.release())); - - auto objects_result = replicated_builder->release(); - if (!objects_result) { - vlog( - cd_log.error, - "Error while sending request: {}", - objects_result.error()); - co_return std::unexpected(metastore::errc::invalid_request); + auto* replicated_builder = static_cast( + builder.get()); + + for (const auto& [partition_id, partition_objects] : + replicated_builder->partitions_) { + if (!partition_objects.pending_objects_.empty()) { + vlog( + cd_log.error, + "Error while sending request: unfinished objects remain"); + co_return std::unexpected(metastore::errc::invalid_request); + } } chunked_hash_map< model::partition_id, @@ -537,7 +509,7 @@ replicated_metastore::compact_objects( vlog(cd_log.warn, "Unable to get metastore partition for {}", tp); co_return std::unexpected(errc::transport_error); } - if (!objects_result->contains(*metastore_partition)) { + if (!replicated_builder->partitions_.contains(*metastore_partition)) { vlog( cd_log.error, "Expected objects for partition {}", @@ -547,11 +519,12 @@ replicated_metastore::compact_objects( compaction_updates_by_partition[*metastore_partition].emplace( tp, meta_to_rpc_compact_update(update)); } - for (auto& [partition_id, partition_objects] : objects_result.value()) { + for (auto& [partition_id, partition_objects] : + replicated_builder->partitions_) { rpc::replace_objects_request req; req.metastore_partition = partition_id; chunked_vector new_objects; - for (auto& obj : partition_objects) { + for (auto& obj : partition_objects.finished_objects_) { new_objects.emplace_back(meta_to_rpc_obj(obj)); } req.new_objects = std::move(new_objects); diff --git a/src/v/cloud_topics/level_one/metastore/replicated_metastore.h b/src/v/cloud_topics/level_one/metastore/replicated_metastore.h index f0ee5afe88d4d..e0ce5b3a1774f 100644 --- a/src/v/cloud_topics/level_one/metastore/replicated_metastore.h +++ b/src/v/cloud_topics/level_one/metastore/replicated_metastore.h @@ -28,11 +28,11 @@ class replicated_metastore : public metastore { get_offsets(const model::topic_id_partition&) override; ss::future> add_objects( - std::unique_ptr, + const std::unique_ptr&, const term_offset_map_t&) override; ss::future> - replace_objects(std::unique_ptr) override; + replace_objects(const std::unique_ptr&) override; ss::future> set_start_offset(const model::topic_id_partition&, kafka::offset) override; @@ -53,7 +53,7 @@ class replicated_metastore : public metastore { const model::topic_id_partition&, kafka::offset) override; ss::future> compact_objects( - std::unique_ptr, + const std::unique_ptr&, const compaction_map_t&) override; ss::future> compact_objects( const chunked_vector&, const compaction_map_t&); diff --git a/src/v/cloud_topics/level_one/metastore/simple_metastore.cc b/src/v/cloud_topics/level_one/metastore/simple_metastore.cc index 3b892ab8c984d..c4d48e32d7589 100644 --- a/src/v/cloud_topics/level_one/metastore/simple_metastore.cc +++ b/src/v/cloud_topics/level_one/metastore/simple_metastore.cc @@ -97,7 +97,8 @@ simple_object_builder::release() { if (!pending_objects_.empty()) { return std::unexpected( error{fmt::format( - "Builder still has {} pending object", pending_objects_.size())}); + "Builder still has {} pending object(s)", + pending_objects_.size())}); } return std::exchange(finished_objects_, {}); } @@ -148,15 +149,17 @@ simple_metastore::get_offsets( ss::future> simple_metastore::add_objects( - std::unique_ptr builder, + const std::unique_ptr& builder, const term_offset_map_t& terms) { auto* simple_builder = dynamic_cast(builder.get()); - auto objects_res = simple_builder->release(); - if (!objects_res.has_value()) { - vlog(cd_log.error, "Failed to add: {}", objects_res.error()); + if (!simple_builder->pending_objects_.empty()) { + vlog( + cd_log.error, + "Failed to add: builder still has {} pending object(s)", + simple_builder->pending_objects_.size()); co_return std::unexpected(metastore::errc::invalid_request); } - co_return co_await add_objects(objects_res.value(), terms); + co_return co_await add_objects(simple_builder->finished_objects_, terms); } ss::future> @@ -188,14 +191,16 @@ simple_metastore::add_objects( ss::future> simple_metastore::replace_objects( - std::unique_ptr builder) { + const std::unique_ptr& builder) { auto* simple_builder = dynamic_cast(builder.get()); - auto objects_res = simple_builder->release(); - if (!objects_res.has_value()) { - vlog(cd_log.error, "Failed to replace: {}", objects_res.error()); + if (!simple_builder->pending_objects_.empty()) { + vlog( + cd_log.error, + "Failed to replace: builder still has {} pending object(s)", + simple_builder->pending_objects_.size()); co_return std::unexpected(metastore::errc::invalid_request); } - co_return co_await replace_objects(objects_res.value()); + co_return co_await replace_objects(simple_builder->finished_objects_); } ss::future> @@ -455,15 +460,18 @@ simple_metastore::compact_objects( ss::future> simple_metastore::compact_objects( - std::unique_ptr builder, + const std::unique_ptr& builder, const compaction_map_t& compaction_metas) { auto* simple_builder = dynamic_cast(builder.get()); - auto objects_res = simple_builder->release(); - if (!objects_res.has_value()) { - vlog(cd_log.error, "Failed to compact: {}", objects_res.error()); + if (!simple_builder->pending_objects_.empty()) { + vlog( + cd_log.error, + "Failed to compact: builder still has {} pending object(s)", + simple_builder->pending_objects_.size()); co_return std::unexpected(metastore::errc::invalid_request); } - co_return co_await compact_objects(objects_res.value(), compaction_metas); + co_return co_await compact_objects( + simple_builder->finished_objects_, compaction_metas); } ss::future< diff --git a/src/v/cloud_topics/level_one/metastore/simple_metastore.h b/src/v/cloud_topics/level_one/metastore/simple_metastore.h index 6ad1f09e15c62..7edfde7cd5dc3 100644 --- a/src/v/cloud_topics/level_one/metastore/simple_metastore.h +++ b/src/v/cloud_topics/level_one/metastore/simple_metastore.h @@ -57,13 +57,13 @@ class simple_metastore : public metastore { get_offsets(const model::topic_id_partition&) override; ss::future> add_objects( - std::unique_ptr, + const std::unique_ptr&, const term_offset_map_t&) override; ss::future> add_objects( const chunked_vector&, const term_offset_map_t&); ss::future> - replace_objects(std::unique_ptr) override; + replace_objects(const std::unique_ptr&) override; ss::future> replace_objects(const chunked_vector&); @@ -86,7 +86,7 @@ class simple_metastore : public metastore { const model::topic_id_partition&, kafka::offset) override; ss::future> compact_objects( - std::unique_ptr, + const std::unique_ptr&, const compaction_map_t&) override; ss::future> compact_objects( const chunked_vector&, const compaction_map_t&); diff --git a/src/v/cloud_topics/level_one/metastore/tests/replicated_metastore_test.cc b/src/v/cloud_topics/level_one/metastore/tests/replicated_metastore_test.cc index 771d3a20b8258..53ea9ca34b9c7 100644 --- a/src/v/cloud_topics/level_one/metastore/tests/replicated_metastore_test.cc +++ b/src/v/cloud_topics/level_one/metastore/tests/replicated_metastore_test.cc @@ -81,7 +81,7 @@ class ReplicatedMetastoreTest metastore::term_offset{ .term = model::term_id{0}, .first_offset = o{0}}); } - auto add_res = meta.add_objects(std::move(objs), terms).get(); + auto add_res = meta.add_objects(objs, terms).get(); ASSERT_TRUE_CORO(add_res.has_value()); } }; @@ -103,7 +103,7 @@ TEST_F(ReplicatedMetastoreTest, TestAddNotFinished) { .size = 500, }); ASSERT_TRUE(add_res.has_value()) << add_res.error(); - auto commit_res = meta.add_objects(std::move(obj_builder), {}).get(); + auto commit_res = meta.add_objects(obj_builder, {}).get(); ASSERT_FALSE(commit_res.has_value()); ASSERT_EQ(commit_res.error(), metastore::errc::invalid_request); } @@ -364,7 +364,7 @@ TEST_F(ReplicatedMetastoreTest, TestNotLeader) { terms[tp].emplace_back( metastore::term_offset{ .term = model::term_id{0}, .first_offset = next_to_send}); - auto commit_res = meta.add_objects(std::move(obj_builder), terms).get(); + auto commit_res = meta.add_objects(obj_builder, terms).get(); if (!commit_res.has_value()) { while (true) { if (ss::lowres_clock::now() > deadline) { @@ -429,7 +429,7 @@ TEST_F(ReplicatedMetastoreTest, TestInvalidTermRequest) { } // This constitutes an incorrectly formed request, and is not expected // ever, hence overall failure. - auto add_res = meta.add_objects(std::move(objs), terms).get(); + auto add_res = meta.add_objects(objs, terms).get(); ASSERT_FALSE(add_res.has_value()); ASSERT_EQ(add_res.error(), metastore::errc::invalid_request); } @@ -464,7 +464,7 @@ TEST_F(ReplicatedMetastoreTest, TestGetTermForOffset) { metastore::term_offset{ .term = model::term_id{2}, .first_offset = o{100}}); - auto add_res = meta.add_objects(std::move(obj_builder), terms).get(); + auto add_res = meta.add_objects(obj_builder, terms).get(); ASSERT_TRUE(add_res.has_value()); const auto assert_term_eq = [&](kafka::offset o, model::term_id t) { @@ -523,7 +523,7 @@ TEST_F(ReplicatedMetastoreTest, TestGetEndOffsetForTerm) { metastore::term_offset{ .term = model::term_id{2}, .first_offset = o{100}}); - auto add_res = meta.add_objects(std::move(obj_builder), terms).get(); + auto add_res = meta.add_objects(obj_builder, terms).get(); ASSERT_TRUE(add_res.has_value()); auto assert_end_offset_eq = [&]( diff --git a/src/v/cloud_topics/level_one/metastore/tests/simple_metastore_test.cc b/src/v/cloud_topics/level_one/metastore/tests/simple_metastore_test.cc index ed5007d5ce3ac..5514169467cdf 100644 --- a/src/v/cloud_topics/level_one/metastore/tests/simple_metastore_test.cc +++ b/src/v/cloud_topics/level_one/metastore/tests/simple_metastore_test.cc @@ -1160,7 +1160,7 @@ TEST(SimpleMetastoreTest, TestUpdateWithObjectBuilder) { ASSERT_TRUE(add_res.has_value()); auto fin_res = ob->finish(o_a, 0, 1000); ASSERT_TRUE(fin_res.has_value()); - auto replace_obj_res = m.replace_objects(std::move(ob)).get(); + auto replace_obj_res = m.replace_objects(ob).get(); ASSERT_TRUE(replace_obj_res.has_value()); auto offsets_res = m.get_offsets(tp_a).get(); diff --git a/src/v/cloud_topics/reconciler/BUILD b/src/v/cloud_topics/reconciler/BUILD index 33667ae7afe27..062725aeae5dd 100644 --- a/src/v/cloud_topics/reconciler/BUILD +++ b/src/v/cloud_topics/reconciler/BUILD @@ -53,6 +53,7 @@ redpanda_cc_library( "//src/v/model", "//src/v/random:generators", "//src/v/ssx:future_util", + "//src/v/utils:retry_chain_node", "@abseil-cpp//absl/container:btree", "@seastar", ], diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index 064365f85959d..d83288e818ab0 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -25,10 +25,15 @@ #include "model/namespace.h" #include "random/generators.h" #include "ssx/future-util.h" +#include "utils/retry_chain_node.h" #include #include +#include + +using namespace std::chrono_literals; + namespace { ss::logger lg("reconciler"); @@ -569,6 +574,36 @@ reconciler::add_object_metadata( co_return std::expected{}; } +ss::future> +reconciler::add_objects_with_retry( + std::unique_ptr meta_builder, + l1::metastore::term_offset_map_t terms) { + static constexpr auto timeout = 5s; + static constexpr auto backoff = 100ms; + + retry_chain_node rtc(_as, ss::lowres_clock::now() + timeout, backoff); + retry_chain_logger ctxlog(lg, rtc, "add_objects"); + for (auto permit = rtc.retry(); permit.is_allowed; permit = rtc.retry()) { + auto add_result = co_await _metastore->add_objects(meta_builder, terms); + + if (add_result.has_value()) { + co_return std::move(add_result).value(); + } + + if (add_result.error() != l1::metastore::errc::transport_error) { + vlog( + lg.error, + "Non-retryable error adding objects to the L1 metastore: {}", + add_result.error()); + co_return std::unexpected(add_result.error()); + } + + co_await ss::sleep_abortable(permit.delay, rtc.root_abort_source()); + } + + co_return std::unexpected(l1::metastore::errc::transport_error); +} + ss::future> reconciler::commit_objects( const chunked_vector& objects, std::unique_ptr meta_builder) { @@ -586,11 +621,11 @@ ss::future> reconciler::commit_objects( } } - auto add_objects_result = co_await _metastore->add_objects( - std::move(meta_builder), terms); + auto add_objects_result = co_await add_objects_with_retry( + std::move(meta_builder), std::move(terms)); if (!add_objects_result.has_value()) { vlog( - lg.error, + lg.warn, "Failed to add objects to the L1 metastore: {}", add_objects_result.error()); // TODO: The objects have been uploaded. The reconciler could diff --git a/src/v/cloud_topics/reconciler/reconciler.h b/src/v/cloud_topics/reconciler/reconciler.h index 95d0579345586..0b64f0844f240 100644 --- a/src/v/cloud_topics/reconciler/reconciler.h +++ b/src/v/cloud_topics/reconciler/reconciler.h @@ -269,6 +269,15 @@ class reconciler { ntp_to_topic_id_partition(const model::ntp& ntp) const; private: + /* + * Retry metastore add_objects calls on transport errors. + * Other metastore errors are not retried. + */ + ss::future> + add_objects_with_retry( + std::unique_ptr meta_builder, + l1::metastore::term_offset_map_t terms); + cluster::partition_manager* _partition_manager; data_plane_api* _data_plane; l1::io* _l1_io;