Skip to content

Commit eebea5e

Browse files
committed
ct/l1: change metastore *_objects signatures to take const refs
Change add_objects, replace_objects, and commit_objects method signatures to take const refs instead of unique_ptr by value. This enables retry scenarios where the builder needs to remain usable after the call. The replicated metastore's release method is no longer used and is removed. The simple metastore still uses this method for testing, so it remains.
1 parent 5eff435 commit eebea5e

File tree

10 files changed

+103
-130
lines changed

10 files changed

+103
-130
lines changed

src/v/cloud_topics/level_one/frontend_reader/tests/l1_reader_fixture.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class l1_reader_fixture : public seastar_test {
125125
.value();
126126
}
127127

128-
_metastore.add_objects(std::move(meta_builder), term_map).get().value();
128+
_metastore.add_objects(*meta_builder, term_map).get().value();
129129
}
130130

131131
model::record_batch_reader make_reader(

src/v/cloud_topics/level_one/frontend_reader/tests/reader_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ TEST_F(l1_reader_test, missing_object) {
304304
.first_offset = kafka::offset{0},
305305
});
306306

307-
_metastore.add_objects(std::move(builder), term_map).get().value();
307+
_metastore.add_objects(*builder, term_map).get().value();
308308

309309
auto reader = make_reader(ntp, tidp);
310310
EXPECT_THROW(read_all(std::move(reader)), std::runtime_error);
@@ -364,7 +364,7 @@ TEST_F(l1_reader_test, empty_offset_range) {
364364
.term = model::term_id{1},
365365
.first_offset = high_watermark,
366366
});
367-
_metastore.add_objects(std::move(meta_builder), term_map).get().value();
367+
_metastore.add_objects(*meta_builder, term_map).get().value();
368368

369369
// Write some objects after the empty object.
370370
auto final_batches

src/v/cloud_topics/level_one/metastore/metastore.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,8 @@ class metastore {
170170
// it does not imply that all extents were accepted by the metastore. The
171171
// response should be examined to determine if subsequent add_objects()
172172
// requests need to be re-aligned to a different offset.
173-
virtual ss::future<std::expected<add_response, errc>> add_objects(
174-
std::unique_ptr<object_metadata_builder>, const term_offset_map_t&)
175-
= 0;
173+
virtual ss::future<std::expected<add_response, errc>>
174+
add_objects(const object_metadata_builder&, const term_offset_map_t&) = 0;
176175

177176
// Adds the given objects to the metastore, expecting that the new extents
178177
// replace an extent or set of extents covering the same range.
@@ -183,7 +182,7 @@ class metastore {
183182
// correctness, these simplify accounting and makes it easier to validate
184183
// that we haven't lost data.
185184
virtual ss::future<std::expected<void, errc>>
186-
replace_objects(std::unique_ptr<object_metadata_builder>) = 0;
185+
replace_objects(const object_metadata_builder&) = 0;
187186

188187
// Moves the start offset of the given partition's log to the given offset.
189188
virtual ss::future<std::expected<void, errc>>
@@ -277,8 +276,8 @@ class metastore {
277276
// Similar to replace_objects(), but with additional constraints based on
278277
// compaction metadata. See get_compaction_offsets() for more details on
279278
// expected usage.
280-
virtual ss::future<std::expected<void, errc>> compact_objects(
281-
std::unique_ptr<object_metadata_builder>, const compaction_map_t&)
279+
virtual ss::future<std::expected<void, errc>>
280+
compact_objects(const object_metadata_builder&, const compaction_map_t&)
282281
= 0;
283282

284283
// Returns metadata required to determine what to compact for the given

src/v/cloud_topics/level_one/metastore/replicated_metastore.cc

Lines changed: 46 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,6 @@ class replicated_object_builder : public metastore::object_metadata_builder {
102102

103103
private:
104104
friend class cloud_topics::l1::replicated_metastore;
105-
std::expected<
106-
chunked_hash_map<
107-
model::partition_id,
108-
chunked_vector<metastore::object_metadata>>,
109-
error>
110-
release();
111105

112106
struct partitioned_objects {
113107
chunked_hash_map<
@@ -196,33 +190,6 @@ replicated_object_builder::finish(
196190
return {};
197191
}
198192

199-
std::expected<
200-
chunked_hash_map<
201-
model::partition_id,
202-
chunked_vector<metastore::object_metadata>>,
203-
replicated_object_builder::error>
204-
replicated_object_builder::release() {
205-
for (const auto& [partition_id, partition_objects] : partitions_) {
206-
if (!partition_objects.pending_objects_.empty()) {
207-
return std::unexpected(error{"Unfinished objects remain"});
208-
}
209-
}
210-
211-
chunked_hash_map<
212-
model::partition_id,
213-
chunked_vector<metastore::object_metadata>>
214-
result;
215-
for (auto& [partition_id, partition_objects] : partitions_) {
216-
chunked_vector<metastore::object_metadata> metas;
217-
for (auto& obj : partition_objects.finished_objects_) {
218-
metas.push_back(std::move(obj));
219-
}
220-
result.emplace(partition_id, std::move(metas));
221-
}
222-
223-
partitions_.clear();
224-
return result;
225-
}
226193
} // anonymous namespace
227194

228195
replicated_metastore::replicated_metastore(frontend& fe)
@@ -258,18 +225,19 @@ replicated_metastore::get_offsets(const model::topic_id_partition& tidp) {
258225

259226
ss::future<std::expected<metastore::add_response, metastore::errc>>
260227
replicated_metastore::add_objects(
261-
std::unique_ptr<metastore::object_metadata_builder> builder,
228+
const metastore::object_metadata_builder& builder,
262229
const metastore::term_offset_map_t& terms) {
263-
auto replicated_builder = std::unique_ptr<replicated_object_builder>(
264-
static_cast<replicated_object_builder*>(builder.release()));
265-
266-
auto objects_result = replicated_builder->release();
267-
if (!objects_result.has_value()) {
268-
vlog(
269-
cd_log.error,
270-
"Error while sending request: {}",
271-
objects_result.error());
272-
co_return std::unexpected(metastore::errc::invalid_request);
230+
auto& replicated_builder = static_cast<const replicated_object_builder&>(
231+
builder);
232+
233+
for (const auto& [partition_id, partition_objects] :
234+
replicated_builder.partitions_) {
235+
if (!partition_objects.pending_objects_.empty()) {
236+
vlog(
237+
cd_log.error,
238+
"Error while sending request: unfinished objects remain");
239+
co_return std::unexpected(metastore::errc::invalid_request);
240+
}
273241
}
274242
chunked_hash_map<model::partition_id, term_state_update_t>
275243
partitioned_terms;
@@ -286,7 +254,8 @@ replicated_metastore::add_objects(
286254
}
287255
}
288256
add_response resp;
289-
for (auto& [partition_id, partition_objects] : objects_result.value()) {
257+
for (auto& [partition_id, partition_objects] :
258+
replicated_builder.partitions_) {
290259
auto terms_it = partitioned_terms.find(partition_id);
291260
if (terms_it == partitioned_terms.end()) {
292261
// TODO: consider making this less strict, down to the STM layer?
@@ -300,7 +269,7 @@ replicated_metastore::add_objects(
300269
req.metastore_partition = partition_id;
301270
req.new_terms = std::move(terms_it->second);
302271
chunked_vector<new_object> new_objects;
303-
for (auto& obj : partition_objects) {
272+
for (auto& obj : partition_objects.finished_objects_) {
304273
new_objects.emplace_back(meta_to_rpc_obj(obj));
305274
}
306275
req.new_objects = std::move(new_objects);
@@ -325,23 +294,25 @@ replicated_metastore::add_objects(
325294

326295
ss::future<std::expected<void, metastore::errc>>
327296
replicated_metastore::replace_objects(
328-
std::unique_ptr<metastore::object_metadata_builder> builder) {
329-
auto replicated_builder = std::unique_ptr<replicated_object_builder>(
330-
static_cast<replicated_object_builder*>(builder.release()));
331-
332-
auto objects_result = replicated_builder->release();
333-
if (!objects_result) {
334-
vlog(
335-
cd_log.error,
336-
"Error while sending request: {}",
337-
objects_result.error());
338-
co_return std::unexpected(metastore::errc::invalid_request);
297+
const metastore::object_metadata_builder& builder) {
298+
auto& replicated_builder = static_cast<const replicated_object_builder&>(
299+
builder);
300+
301+
for (const auto& [partition_id, partition_objects] :
302+
replicated_builder.partitions_) {
303+
if (!partition_objects.pending_objects_.empty()) {
304+
vlog(
305+
cd_log.error,
306+
"Error while sending request: unfinished objects remain");
307+
co_return std::unexpected(metastore::errc::invalid_request);
308+
}
339309
}
340-
for (auto& [partition_id, partition_objects] : objects_result.value()) {
310+
for (auto& [partition_id, partition_objects] :
311+
replicated_builder.partitions_) {
341312
rpc::replace_objects_request req;
342313
req.metastore_partition = partition_id;
343314
chunked_vector<new_object> new_objects;
344-
for (auto& obj : partition_objects) {
315+
for (auto& obj : partition_objects.finished_objects_) {
345316
new_objects.emplace_back(meta_to_rpc_obj(obj));
346317
}
347318
req.new_objects = std::move(new_objects);
@@ -514,18 +485,19 @@ replicated_metastore::get_end_offset_for_term(
514485

515486
ss::future<std::expected<void, metastore::errc>>
516487
replicated_metastore::compact_objects(
517-
std::unique_ptr<metastore::object_metadata_builder> builder,
488+
const metastore::object_metadata_builder& builder,
518489
const metastore::compaction_map_t& compaction_updates) {
519-
auto replicated_builder = std::unique_ptr<replicated_object_builder>(
520-
static_cast<replicated_object_builder*>(builder.release()));
521-
522-
auto objects_result = replicated_builder->release();
523-
if (!objects_result) {
524-
vlog(
525-
cd_log.error,
526-
"Error while sending request: {}",
527-
objects_result.error());
528-
co_return std::unexpected(metastore::errc::invalid_request);
490+
auto& replicated_builder = static_cast<const replicated_object_builder&>(
491+
builder);
492+
493+
for (const auto& [partition_id, partition_objects] :
494+
replicated_builder.partitions_) {
495+
if (!partition_objects.pending_objects_.empty()) {
496+
vlog(
497+
cd_log.error,
498+
"Error while sending request: unfinished objects remain");
499+
co_return std::unexpected(metastore::errc::invalid_request);
500+
}
529501
}
530502
chunked_hash_map<
531503
model::partition_id,
@@ -537,7 +509,7 @@ replicated_metastore::compact_objects(
537509
vlog(cd_log.warn, "Unable to get metastore partition for {}", tp);
538510
co_return std::unexpected(errc::transport_error);
539511
}
540-
if (!objects_result->contains(*metastore_partition)) {
512+
if (!replicated_builder.partitions_.contains(*metastore_partition)) {
541513
vlog(
542514
cd_log.error,
543515
"Expected objects for partition {}",
@@ -547,11 +519,12 @@ replicated_metastore::compact_objects(
547519
compaction_updates_by_partition[*metastore_partition].emplace(
548520
tp, meta_to_rpc_compact_update(update));
549521
}
550-
for (auto& [partition_id, partition_objects] : objects_result.value()) {
522+
for (auto& [partition_id, partition_objects] :
523+
replicated_builder.partitions_) {
551524
rpc::replace_objects_request req;
552525
req.metastore_partition = partition_id;
553526
chunked_vector<new_object> new_objects;
554-
for (auto& obj : partition_objects) {
527+
for (auto& obj : partition_objects.finished_objects_) {
555528
new_objects.emplace_back(meta_to_rpc_obj(obj));
556529
}
557530
req.new_objects = std::move(new_objects);

src/v/cloud_topics/level_one/metastore/replicated_metastore.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@ class replicated_metastore : public metastore {
2828
get_offsets(const model::topic_id_partition&) override;
2929

3030
ss::future<std::expected<add_response, errc>> add_objects(
31-
std::unique_ptr<object_metadata_builder>,
32-
const term_offset_map_t&) override;
31+
const object_metadata_builder&, const term_offset_map_t&) override;
3332

3433
ss::future<std::expected<void, errc>>
35-
replace_objects(std::unique_ptr<object_metadata_builder>) override;
34+
replace_objects(const object_metadata_builder&) override;
3635

3736
ss::future<std::expected<void, errc>>
3837
set_start_offset(const model::topic_id_partition&, kafka::offset) override;
@@ -53,8 +52,7 @@ class replicated_metastore : public metastore {
5352
const model::topic_id_partition&, kafka::offset) override;
5453

5554
ss::future<std::expected<void, errc>> compact_objects(
56-
std::unique_ptr<object_metadata_builder>,
57-
const compaction_map_t&) override;
55+
const object_metadata_builder&, const compaction_map_t&) override;
5856
ss::future<std::expected<void, errc>> compact_objects(
5957
const chunked_vector<object_metadata>&, const compaction_map_t&);
6058

src/v/cloud_topics/level_one/metastore/simple_metastore.cc

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ simple_object_builder::release() {
9797
if (!pending_objects_.empty()) {
9898
return std::unexpected(
9999
error{fmt::format(
100-
"Builder still has {} pending object", pending_objects_.size())});
100+
"Builder still has {} pending object(s)",
101+
pending_objects_.size())});
101102
}
102103
return std::exchange(finished_objects_, {});
103104
}
@@ -148,15 +149,17 @@ simple_metastore::get_offsets(
148149

149150
ss::future<std::expected<metastore::add_response, metastore::errc>>
150151
simple_metastore::add_objects(
151-
std::unique_ptr<metastore::object_metadata_builder> builder,
152+
const metastore::object_metadata_builder& builder,
152153
const term_offset_map_t& terms) {
153-
auto* simple_builder = dynamic_cast<simple_object_builder*>(builder.get());
154-
auto objects_res = simple_builder->release();
155-
if (!objects_res.has_value()) {
156-
vlog(cd_log.error, "Failed to add: {}", objects_res.error());
154+
auto& simple_builder = dynamic_cast<const simple_object_builder&>(builder);
155+
if (!simple_builder.pending_objects_.empty()) {
156+
vlog(
157+
cd_log.error,
158+
"Failed to add: builder still has {} pending object(s)",
159+
simple_builder.pending_objects_.size());
157160
co_return std::unexpected(metastore::errc::invalid_request);
158161
}
159-
co_return co_await add_objects(objects_res.value(), terms);
162+
co_return co_await add_objects(simple_builder.finished_objects_, terms);
160163
}
161164

162165
ss::future<std::expected<metastore::add_response, metastore::errc>>
@@ -188,14 +191,16 @@ simple_metastore::add_objects(
188191

189192
ss::future<std::expected<void, metastore::errc>>
190193
simple_metastore::replace_objects(
191-
std::unique_ptr<metastore::object_metadata_builder> builder) {
192-
auto* simple_builder = dynamic_cast<simple_object_builder*>(builder.get());
193-
auto objects_res = simple_builder->release();
194-
if (!objects_res.has_value()) {
195-
vlog(cd_log.error, "Failed to replace: {}", objects_res.error());
194+
const metastore::object_metadata_builder& builder) {
195+
auto& simple_builder = dynamic_cast<const simple_object_builder&>(builder);
196+
if (!simple_builder.pending_objects_.empty()) {
197+
vlog(
198+
cd_log.error,
199+
"Failed to replace: builder still has {} pending object(s)",
200+
simple_builder.pending_objects_.size());
196201
co_return std::unexpected(metastore::errc::invalid_request);
197202
}
198-
co_return co_await replace_objects(objects_res.value());
203+
co_return co_await replace_objects(simple_builder.finished_objects_);
199204
}
200205

201206
ss::future<std::expected<void, metastore::errc>>
@@ -455,15 +460,18 @@ simple_metastore::compact_objects(
455460

456461
ss::future<std::expected<void, metastore::errc>>
457462
simple_metastore::compact_objects(
458-
std::unique_ptr<metastore::object_metadata_builder> builder,
463+
const metastore::object_metadata_builder& builder,
459464
const compaction_map_t& compaction_metas) {
460-
auto* simple_builder = dynamic_cast<simple_object_builder*>(builder.get());
461-
auto objects_res = simple_builder->release();
462-
if (!objects_res.has_value()) {
463-
vlog(cd_log.error, "Failed to compact: {}", objects_res.error());
465+
auto& simple_builder = dynamic_cast<const simple_object_builder&>(builder);
466+
if (!simple_builder.pending_objects_.empty()) {
467+
vlog(
468+
cd_log.error,
469+
"Failed to compact: builder still has {} pending object(s)",
470+
simple_builder.pending_objects_.size());
464471
co_return std::unexpected(metastore::errc::invalid_request);
465472
}
466-
co_return co_await compact_objects(objects_res.value(), compaction_metas);
473+
co_return co_await compact_objects(
474+
simple_builder.finished_objects_, compaction_metas);
467475
}
468476

469477
ss::future<

src/v/cloud_topics/level_one/metastore/simple_metastore.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,12 @@ class simple_metastore : public metastore {
5757
get_offsets(const model::topic_id_partition&) override;
5858

5959
ss::future<std::expected<add_response, errc>> add_objects(
60-
std::unique_ptr<object_metadata_builder>,
61-
const term_offset_map_t&) override;
60+
const object_metadata_builder&, const term_offset_map_t&) override;
6261
ss::future<std::expected<add_response, errc>> add_objects(
6362
const chunked_vector<object_metadata>&, const term_offset_map_t&);
6463

6564
ss::future<std::expected<void, errc>>
66-
replace_objects(std::unique_ptr<object_metadata_builder>) override;
65+
replace_objects(const object_metadata_builder&) override;
6766
ss::future<std::expected<void, errc>>
6867
replace_objects(const chunked_vector<object_metadata>&);
6968

@@ -86,8 +85,7 @@ class simple_metastore : public metastore {
8685
const model::topic_id_partition&, kafka::offset) override;
8786

8887
ss::future<std::expected<void, errc>> compact_objects(
89-
std::unique_ptr<object_metadata_builder>,
90-
const compaction_map_t&) override;
88+
const object_metadata_builder&, const compaction_map_t&) override;
9189
ss::future<std::expected<void, errc>> compact_objects(
9290
const chunked_vector<object_metadata>&, const compaction_map_t&);
9391

0 commit comments

Comments
 (0)