Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 23 additions & 26 deletions core/app_telemetry_reporter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -671,30 +671,29 @@ class app_telemetry_reporter_impl
{
public:
app_telemetry_reporter_impl(std::shared_ptr<app_telemetry_meter> meter,
cluster_options options,
cluster_credentials credentials,
origin& origin,
asio::io_context& ctx,
asio::ssl::context& tls)
: meter_{ std::move(meter) }
, options_{ std::move(options) }
, credentials_{ std::move(credentials) }
, origin_{ origin }
, ctx_{ ctx }
, tls_{ tls }
, backoff_{ ctx }
, exponential_backoff_calculator_{
std::chrono::milliseconds{ 100 },
options_.app_telemetry_backoff_interval,
origin_.options().app_telemetry_backoff_interval,
2 /* backoff factor */,
0.5 /* jitter factor */,
}
{
if (options_.enable_app_telemetry) {
if (!options_.app_telemetry_endpoint.empty()) {
auto url = couchbase::core::utils::string_codec::url_parse(options_.app_telemetry_endpoint);
if (origin_.options().enable_app_telemetry) {
if (!origin_.options().app_telemetry_endpoint.empty()) {
auto url =
couchbase::core::utils::string_codec::url_parse(origin_.options().app_telemetry_endpoint);
if (url.host.empty() || url.scheme != "ws") {
CB_LOG_WARNING(
"unable to use \"{}\" as a app telemetry endpoint (expected ws:// and hostname)",
options_.app_telemetry_endpoint);
origin_.options().app_telemetry_endpoint);
return;
}
addresses_.push_back({
Expand Down Expand Up @@ -750,12 +749,12 @@ class app_telemetry_reporter_impl
}));
websocket_session_ = websocket_session::start(ctx_,
address,
credentials_,
origin_.credentials(),
std::move(stream),
meter_,
shared_from_this(),
options_.app_telemetry_ping_interval,
options_.app_telemetry_ping_timeout);
origin_.options().app_telemetry_ping_interval,
origin_.options().app_telemetry_ping_timeout);
retry_backoff_calculator_ = &no_backoff_calculator_;
++next_address_index_;
}
Expand Down Expand Up @@ -813,25 +812,27 @@ class app_telemetry_reporter_impl
return;
}
if (self->websocket_state_ == connection_state::disconnected) {
self->dialer_ =
telemetry_dialer::dial(next_address, self->options_, self->ctx_, self->tls_, self);
self->dialer_ = telemetry_dialer::dial(
next_address, self->origin_.options(), self->ctx_, self->tls_, self);
}
});
return;
}
dialer_ = telemetry_dialer::dial(next_address, options_, ctx_, tls_, shared_from_this());
dialer_ =
telemetry_dialer::dial(next_address, origin_.options(), ctx_, tls_, shared_from_this());
}

void update_config(topology::configuration&& config)
{
if (!options_.enable_app_telemetry) {
if (!origin_.options().enable_app_telemetry) {
meter_->disable();
return;
}
meter_->update_config(config);

if (options_.app_telemetry_endpoint.empty()) {
addresses_ = get_app_telemetry_addresses(config, options_.enable_tls, options_.network);
if (origin_.options().app_telemetry_endpoint.empty()) {
addresses_ = get_app_telemetry_addresses(
config, origin_.options().enable_tls, origin_.options().network);
next_address_index_ = 0;
}

Expand All @@ -841,15 +842,14 @@ class app_telemetry_reporter_impl
meter_->enable();
if (websocket_state_ == connection_state::disconnected) {
dialer_ = telemetry_dialer::dial(
addresses_[next_address_index_], options_, ctx_, tls_, shared_from_this());
addresses_[next_address_index_], origin_.options(), ctx_, tls_, shared_from_this());
}
}
}

private:
std::shared_ptr<app_telemetry_meter> meter_;
cluster_options options_;
cluster_credentials credentials_;
origin& origin_;
asio::io_context& ctx_;
asio::ssl::context& tls_;
asio::steady_timer backoff_;
Expand All @@ -868,13 +868,10 @@ class app_telemetry_reporter_impl
};

app_telemetry_reporter::app_telemetry_reporter(std::shared_ptr<app_telemetry_meter> meter,
const cluster_options& options,
const cluster_credentials& credentials,
origin& origin,
asio::io_context& ctx,
asio::ssl::context& tls)
: impl_{
std::make_shared<app_telemetry_reporter_impl>(std::move(meter), options, credentials, ctx, tls)
}
: impl_{ std::make_shared<app_telemetry_reporter_impl>(std::move(meter), origin, ctx, tls) }
{
}

Expand Down
4 changes: 2 additions & 2 deletions core/app_telemetry_reporter.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#pragma once

#include "cluster.hxx"
#include "config_listener.hxx"

#include <memory>
Expand Down Expand Up @@ -45,8 +46,7 @@ public:
auto operator=(const app_telemetry_reporter&) -> app_telemetry_reporter& = delete;

app_telemetry_reporter(std::shared_ptr<app_telemetry_meter> meter,
const cluster_options& options,
const cluster_credentials& credentials,
origin& origin,
asio::io_context& ctx,
asio::ssl::context& tls);
~app_telemetry_reporter() override;
Expand Down
29 changes: 27 additions & 2 deletions core/cluster.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "core/app_telemetry_meter.hxx"
#include "core/app_telemetry_reporter.hxx"
#include "core/diagnostics.hxx"
#include "core/error.hxx"
#include "core/impl/get_replica.hxx"
#include "core/impl/lookup_in_replica.hxx"
#include "core/impl/observe_seqno.hxx"
Expand Down Expand Up @@ -521,6 +522,21 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
return handler({});
}

auto update_credentials(cluster_credentials auth) -> core::error
{
if (stopped_) {
return { errc::network::cluster_closed, {} };
}

if (auth.requires_tls() && !origin_.options().enable_tls) {
return { errc::common::invalid_argument,
"TLS not enabled but the provided authenticator requires TLS" };
}

origin_.update_credentials(std::move(auth));
return {};
}

auto origin() const -> std::pair<std::error_code, couchbase::core::origin>
{
if (stopped_) {
Expand Down Expand Up @@ -1281,8 +1297,8 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>

app_telemetry_meter_->update_agent(origin_.options().user_agent_extra);
session_manager_->set_app_telemetry_meter(app_telemetry_meter_);
app_telemetry_reporter_ = std::make_shared<app_telemetry_reporter>(
app_telemetry_meter_, origin_.options(), origin_.credentials(), ctx_, tls_);
app_telemetry_reporter_ =
std::make_shared<app_telemetry_reporter>(app_telemetry_meter_, origin_, ctx_, tls_);

if (origin_.options().enable_orphan_reporting) {
orphan_reporter_ = std::make_shared<orphan_reporter>(ctx_, origin_.options().orphan_options);
Expand Down Expand Up @@ -1437,6 +1453,15 @@ cluster::origin() const -> std::pair<std::error_code, couchbase::core::origin>
return { errc::network::cluster_closed, {} };
}

auto
cluster::update_credentials(core::cluster_credentials auth) -> core::error
{
if (impl_) {
return impl_->update_credentials(std::move(auth));
}
return { errc::network::cluster_closed, {} };
}

auto
cluster::io_context() const -> asio::io_context&
{
Expand Down
3 changes: 3 additions & 0 deletions core/cluster.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include "diagnostics.hxx"
#include "error.hxx"
#include "operations_fwd.hxx"
#include "origin.hxx"
#include "topology/configuration.hxx"
Expand Down Expand Up @@ -85,6 +86,8 @@ public:
utils::movable_function<void(std::error_code, std::shared_ptr<topology::configuration>)>&&
handler) const;

[[nodiscard]] auto update_credentials(core::cluster_credentials auth) -> core::error;

void execute(o::analytics_request request, mf<void(o::analytics_response)>&& handler) const;
void execute(o::append_request request, mf<void(o::append_response)>&& handler) const;
void execute(o::decrement_request request, mf<void(o::decrement_response)>&& handler) const;
Expand Down
7 changes: 7 additions & 0 deletions core/cluster_credentials.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ cluster_credentials::uses_certificate() const -> bool
{
return !certificate_path.empty();
}

auto
cluster_credentials::requires_tls() const -> bool
{
return !certificate_path.empty() && !key_path.empty();
}

} // namespace couchbase::core
1 change: 1 addition & 0 deletions core/cluster_credentials.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct cluster_credentials {
std::optional<std::vector<std::string>> allowed_sasl_mechanisms{};

[[nodiscard]] auto uses_certificate() const -> bool;
[[nodiscard]] auto requires_tls() const -> bool;
};

} // namespace couchbase::core
32 changes: 32 additions & 0 deletions core/impl/public_cluster.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,15 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
});
}

auto set_authenticator(core::cluster_credentials auth) -> error
{
auto e = core_.update_credentials(std::move(auth));
if (e.ec) {
return core::impl::make_error(e);
}
return {};
}

void notify_fork(fork_event event)
{
if (event == fork_event::prepare) {
Expand Down Expand Up @@ -700,6 +709,29 @@ cluster::close() -> std::future<void>
return future;
}

auto
cluster::set_authenticator(const password_authenticator& authenticator) -> couchbase::error
{
core::cluster_credentials auth;
auth.username = authenticator.username_;
auth.password = authenticator.password_;
if (authenticator.ldap_compatible_) {
auth.allowed_sasl_mechanisms = { { "PLAIN" } };
}

return impl_->set_authenticator(std::move(auth));
}

auto
cluster::set_authenticator(const certificate_authenticator& authenticator) -> couchbase::error
{
core::cluster_credentials auth;
auth.certificate_path = authenticator.certificate_path_;
auth.key_path = authenticator.key_path_;

return impl_->set_authenticator(std::move(auth));
}

auto
cluster::query_indexes() const -> query_index_manager
{
Expand Down
Loading
Loading