Skip to content

Commit 9017147

Browse files
committed
cbc: add replica support to cbc-get
1 parent 1a32030 commit 9017147

File tree

1 file changed

+173
-16
lines changed

1 file changed

+173
-16
lines changed

tools/get.cxx

Lines changed: 173 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
#include <couchbase/cluster.hxx>
2323
#include <couchbase/codec/raw_binary_transcoder.hxx>
2424
#include <couchbase/codec/tao_json_serializer.hxx>
25+
#include <couchbase/get_all_replicas_options.hxx>
26+
#include <couchbase/get_any_replica_options.hxx>
27+
#include <couchbase/get_replica_result.hxx>
28+
#include <couchbase/read_preference.hxx>
2529

2630
#include <spdlog/fmt/bin_to_hex.h>
2731
#include <spdlog/fmt/bundled/chrono.h>
@@ -61,6 +65,28 @@ class get_app : public CLI::App
6165
fmt::format("Return only part of the document, that corresponds given JSON-pointer "
6266
"(could be used multiple times)."))
6367
->allow_extra_args(false);
68+
69+
std::vector<std::string> allowed_replica_modes{
70+
"none",
71+
"all",
72+
"any",
73+
};
74+
add_option("--use-replica", use_replica_, "Use replica nodes to retrieve the document.")
75+
->transform(CLI::IsMember(allowed_replica_modes))
76+
->default_val(use_replica_);
77+
std::vector<std::string> allowed_replica_read_modes{
78+
"no_preference",
79+
"selected_server_group",
80+
"selected_server_group_or_all_available",
81+
};
82+
add_option("--replica-read-mode", replica_read_mode_, "A hint for replica selection mechanism.")
83+
->transform(CLI::IsMember(allowed_replica_read_modes))
84+
->default_val(replica_read_mode_);
85+
add_option("--replica-server-group",
86+
replica_server_group_,
87+
"Server group name for --replica-read-mode=selected_server_group*.")
88+
->transform(CLI::IsMember(allowed_replica_read_modes));
89+
6490
add_flag("--hexdump",
6591
hexdump_,
6692
"Print value using hexdump encoding (safe for binary data on STDOUT).");
@@ -81,12 +107,8 @@ class get_app : public CLI::App
81107

82108
auto cluster_options = build_cluster_options(common_options_);
83109

84-
couchbase::get_options get_options{};
85-
if (with_expiry_) {
86-
get_options.with_expiry(true);
87-
}
88-
if (!projections_.empty()) {
89-
get_options.project(projections_);
110+
if (auto server_group = replica_server_group_; server_group) {
111+
cluster_options.network().preferred_server_group(server_group.value());
90112
}
91113

92114
const auto connection_string = common_options_.connection.connection_string;
@@ -116,11 +138,60 @@ class get_app : public CLI::App
116138

117139
auto collection = cluster.bucket(bucket_name).scope(scope_name).collection(collection_name);
118140

119-
auto [err, resp] = collection.get(document_id, get_options).get();
120-
if (json_lines_) {
121-
print_result_json_line(bucket_name, scope_name, collection_name, document_id, err, resp);
122-
} else {
123-
print_result(bucket_name, scope_name, collection_name, document_id, err, resp);
141+
if (use_replica_.empty() || use_replica_ == "none") {
142+
couchbase::get_options get_options{};
143+
if (with_expiry_) {
144+
get_options.with_expiry(true);
145+
}
146+
if (!projections_.empty()) {
147+
get_options.project(projections_);
148+
}
149+
150+
auto [err, resp] = collection.get(document_id, get_options).get();
151+
if (json_lines_) {
152+
print_result_json_line(bucket_name, scope_name, collection_name, document_id, err, resp);
153+
} else {
154+
print_result(bucket_name, scope_name, collection_name, document_id, err, resp);
155+
}
156+
} else if (use_replica_ == "any") {
157+
couchbase::get_any_replica_options get_options{};
158+
159+
if (replica_read_mode_.empty() || replica_read_mode_ == "no_preference") {
160+
get_options.read_preference(couchbase::read_preference::no_preference);
161+
} else if (replica_read_mode_ == "selected_server_group") {
162+
get_options.read_preference(couchbase::read_preference::selected_server_group);
163+
} else if (replica_read_mode_ == "selected_server_group_or_all_available") {
164+
get_options.read_preference(
165+
couchbase::read_preference::selected_server_group_or_all_available);
166+
}
167+
168+
auto [err, resp] = collection.get_any_replica(document_id, get_options).get();
169+
if (json_lines_) {
170+
print_result_json_line(bucket_name, scope_name, collection_name, document_id, err, resp);
171+
} else {
172+
print_result(bucket_name, scope_name, collection_name, document_id, err, resp);
173+
}
174+
} else if (use_replica_ == "all") {
175+
couchbase::get_all_replicas_options get_options{};
176+
177+
if (replica_read_mode_.empty() || replica_read_mode_ == "no_preference") {
178+
get_options.read_preference(couchbase::read_preference::no_preference);
179+
} else if (replica_read_mode_ == "selected_server_group") {
180+
get_options.read_preference(couchbase::read_preference::selected_server_group);
181+
} else if (replica_read_mode_ == "selected_server_group_or_all_available") {
182+
get_options.read_preference(
183+
couchbase::read_preference::selected_server_group_or_all_available);
184+
}
185+
186+
auto [err, responses] = collection.get_all_replicas(document_id, get_options).get();
187+
for (const auto& resp : responses) {
188+
if (json_lines_) {
189+
print_result_json_line(
190+
bucket_name, scope_name, collection_name, document_id, err, resp);
191+
} else {
192+
print_result(bucket_name, scope_name, collection_name, document_id, err, resp);
193+
}
194+
}
124195
}
125196
}
126197

@@ -187,22 +258,104 @@ class get_app : public CLI::App
187258
std::string verbose_cas = fmt::format(" ({})", cas_to_time_point(resp.cas()));
188259
if (const auto& exptime = resp.expiry_time(); exptime.has_value()) {
189260
fmt::print(stderr,
190-
"{}, size: {}, CAS: 0x{}{}, flags: 0x{:08x}, expiry: {}\n",
261+
"{}, size: {}, flags: 0x{:08x}, CAS: 0x{}{}, expiry: {}\n",
191262
prefix,
192263
value.size(),
264+
flags,
193265
resp.cas(),
194266
verbose_ ? verbose_cas : "",
195-
flags,
196267
exptime.value());
197268
} else {
198269
fmt::print(stderr,
199-
"{}, size: {}, CAS: 0x{}{}, flags: 0x{:08x}\n",
270+
"{}, size: {}, flags: 0x{:08x}, CAS: 0x{}{}\n",
200271
prefix,
201272
value.size(),
273+
flags,
202274
resp.cas(),
203-
verbose_ ? verbose_cas : "",
204-
flags);
275+
verbose_ ? verbose_cas : "");
276+
}
277+
(void)fflush(stderr);
278+
(void)fflush(stdout);
279+
if (hexdump_) {
280+
auto hex = fmt::format("{:a}", spdlog::to_hex(value));
281+
fmt::print(stdout, "{}\n", std::string_view(hex.data() + 1, hex.size() - 1));
282+
} else if (pretty_json_) {
283+
try {
284+
auto json = couchbase::core::utils::json::parse_binary(value);
285+
fmt::print(stdout, "{}\n", tao::json::to_string(json, 2));
286+
} catch (const tao::pegtl::parse_error&) {
287+
fmt::print(stdout,
288+
"{}\n",
289+
std::string_view(reinterpret_cast<const char*>(value.data()), value.size()));
290+
}
291+
} else {
292+
fmt::print(stdout,
293+
"{}\n",
294+
std::string_view(reinterpret_cast<const char*>(value.data()), value.size()));
295+
}
296+
(void)fflush(stdout);
297+
}
298+
}
299+
300+
void print_result_json_line(const std::string& bucket_name,
301+
const std::string& scope_name,
302+
const std::string& collection_name,
303+
const std::string& document_id,
304+
const couchbase::error& err,
305+
const couchbase::get_replica_result& resp) const
306+
{
307+
tao::json::value line = tao::json::empty_object;
308+
tao::json::value meta = {
309+
{ "bucket_name", bucket_name }, { "scope_name", scope_name },
310+
{ "collection_name", collection_name }, { "document_id", document_id },
311+
{ "is_replica", resp.is_replica() },
312+
};
313+
if (err.ec()) {
314+
line["error"] = fmt::format("{}", err);
315+
} else {
316+
auto [value, flags] = resp.content_as<passthrough_transcoder>();
317+
meta["cas"] = fmt::format("0x{}", resp.cas());
318+
meta["flags"] = flags;
319+
try {
320+
line["json"] = couchbase::core::utils::json::parse_binary(value);
321+
} catch (const tao::pegtl::parse_error&) {
322+
line["base64"] = value;
323+
}
324+
}
325+
line["meta"] = meta;
326+
fmt::print(stdout, "{}\n", tao::json::to_string<tao::json::events::binary_to_base64>(line));
327+
(void)fflush(stdout);
328+
}
329+
330+
void print_result(const std::string& bucket_name,
331+
const std::string& scope_name,
332+
const std::string& collection_name,
333+
const std::string& document_id,
334+
const couchbase::error& err,
335+
const couchbase::get_replica_result& resp) const
336+
{
337+
const std::string prefix = fmt::format("bucket: {}, collection: {}.{}, id: {}",
338+
bucket_name,
339+
scope_name,
340+
collection_name,
341+
document_id);
342+
(void)fflush(stderr);
343+
if (err.ec()) {
344+
fmt::print(stderr, "{}, error: {}\n", prefix, err.ec().message());
345+
if (verbose_) {
346+
fmt::print(stderr, "{}\n", err.ctx().to_json());
205347
}
348+
} else {
349+
auto [value, flags] = resp.content_as<passthrough_transcoder>();
350+
std::string verbose_cas = fmt::format(" ({})", cas_to_time_point(resp.cas()));
351+
fmt::print(stderr,
352+
"{}, size: {}, flags: 0x{:08x}, CAS: 0x{}{}, is_replica: {}\n",
353+
prefix,
354+
value.size(),
355+
flags,
356+
resp.cas(),
357+
verbose_ ? verbose_cas : "",
358+
resp.is_replica());
206359
(void)fflush(stderr);
207360
(void)fflush(stdout);
208361
if (hexdump_) {
@@ -239,6 +392,10 @@ class get_app : public CLI::App
239392
bool json_lines_{ false };
240393
bool verbose_{ false };
241394

395+
std::string use_replica_{ "none" };
396+
std::string replica_read_mode_{ "no_preference" };
397+
std::optional<std::string> replica_server_group_{};
398+
242399
std::vector<std::string> ids_{};
243400
};
244401
} // namespace

0 commit comments

Comments
 (0)