Skip to content

Commit 9e9a461

Browse files
authored
Worker Heartbeat: Plumb metrics and migrate to runtime/namespace/client level (#1038)
* Runtime/namespace/client wide worker heartbeat (#983) * worker heartbeat * Address Spencer's comments * wip use client_identity_override as part of key, added test * Refactor almost complete, need to plumb through telemetry to SharedNamespaceWorker * Verified client replacement works, need to update tests and cleanup * formating * clean up * forgot to remove new() now that using builder pattern * Switch to worker_set_key * Replace client test passes, need to write unit tests in worker_registry * cargo test-lint * limit nexus to 1 poller, add tests for worker_registry for heartbeat * PR comments * new test helper * Return error on multi worker register for same namespace and task queue on same client * cargo fmt * Fix registration order, unique task queue for test worker * Remove TEST_Q variable * Missing quotes * CI lint and docker test fix, rename worker_set_key to worker_grouping_key * clippy bug * Worker heartbeat: New in-memory metrics mechism, plumb rest of heartbeat data (#1023) * plumb in memory metrics * simplify worker::new(), fix some heartbeat metrics, new test file * CounterImpl, final_heartbeat, more specific metric label dbg_panic msg, counter_with_in_mem and and_then() * Support in-mem metrics when metrics aren't configured * Move sys_info refresh to dedicated thread, use tuner's existing sys info * Format, AtomicCell * Fix unit test * Set dynamic config for WorkerHeartbeatsEnabled and ListWorkersEnabled, remove stale metric previously added * Should not expect heartbeat nexus worker in metrics for non-heartbeating integ test * recv_timeout instead of thread::sleep, use WorkflowService::list_workers directly, WithLabel API improvement * MetricAttributes::NoOp, add mechanism to ignore dupe workers for testing, more tests * More tests, sticky cache miss, plugins * Formatting, fix skip_client_worker_set_check * Cursor found a bug * Lower sleep time, add print for debugging * more prints * use semaphores for worker_heartbeat_failure_metrics * skip_client_worker_set_check for all integ workers * Can't use tokio semaphore in workflow code * use signal to test workflow_slots.last_interval_failure_tasks * Use Notify instead of semaphores, fix test flake * Use eventually() instead of a manual sleep * max_outstanding_workflow_tasks 2 * merge * Forgot to commit format fixes * Fix test
1 parent 561ca79 commit 9e9a461

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+3271
-689
lines changed

.cargo/config.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[env]
22
# This temporarily overrides the version of the CLI used for integration tests, locally and in CI
3-
CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"
3+
#CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"
44

55
[alias]
66
# Not sure why --all-features doesn't work

client/src/lib.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ pub use temporal_sdk_core_protos::temporal::api::{
3434
},
3535
};
3636
pub use tonic;
37-
pub use worker_registry::{Slot, SlotManager, SlotProvider, WorkerKey};
37+
pub use worker_registry::{
38+
ClientWorker, ClientWorkerSet, HeartbeatCallback, SharedNamespaceWorkerTrait, Slot,
39+
};
3840
pub use workflow_handle::{
3941
GetWorkflowResultOpts, WorkflowExecutionInfo, WorkflowExecutionResult, WorkflowHandle,
4042
};
@@ -390,7 +392,7 @@ pub struct ConfiguredClient<C> {
390392
headers: Arc<RwLock<ClientHeaders>>,
391393
/// Capabilities as read from the `get_system_info` RPC call made on client connection
392394
capabilities: Option<get_system_info_response::Capabilities>,
393-
workers: Arc<SlotManager>,
395+
workers: Arc<ClientWorkerSet>,
394396
}
395397

396398
impl<C> ConfiguredClient<C> {
@@ -440,9 +442,14 @@ impl<C> ConfiguredClient<C> {
440442
}
441443

442444
/// Returns a cloned reference to a registry with workers using this client instance
443-
pub fn workers(&self) -> Arc<SlotManager> {
445+
pub fn workers(&self) -> Arc<ClientWorkerSet> {
444446
self.workers.clone()
445447
}
448+
449+
/// Returns the worker grouping key, this should be unique across each client
450+
pub fn worker_grouping_key(&self) -> Uuid {
451+
self.workers.worker_grouping_key()
452+
}
446453
}
447454

448455
#[derive(Debug)]
@@ -584,7 +591,7 @@ impl ClientOptions {
584591
client: TemporalServiceClient::new(svc),
585592
options: Arc::new(self.clone()),
586593
capabilities: None,
587-
workers: Arc::new(SlotManager::new()),
594+
workers: Arc::new(ClientWorkerSet::new()),
588595
};
589596
if !self.skip_get_system_info {
590597
match client
@@ -866,6 +873,11 @@ impl Client {
866873
pub fn into_inner(self) -> ConfiguredClient<TemporalServiceClient> {
867874
self.inner
868875
}
876+
877+
/// Returns the client-wide key
878+
pub fn worker_grouping_key(&self) -> Uuid {
879+
self.inner.worker_grouping_key()
880+
}
869881
}
870882

871883
impl NamespacedClient for Client {

client/src/raw.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
Client, ConfiguredClient, LONG_POLL_TIMEOUT, RequestExt, RetryClient, SharedReplaceableClient,
77
TEMPORAL_NAMESPACE_HEADER_KEY, TemporalServiceClient,
88
metrics::namespace_kv,
9-
worker_registry::{Slot, SlotManager},
9+
worker_registry::{ClientWorkerSet, Slot},
1010
};
1111
use dyn_clone::DynClone;
1212
use futures_util::{FutureExt, TryFutureExt, future::BoxFuture};
@@ -33,7 +33,7 @@ use tonic::{
3333
trait RawClientProducer {
3434
/// Returns information about workers associated with this client. Implementers outside of
3535
/// core can safely return `None`.
36-
fn get_workers_info(&self) -> Option<Arc<SlotManager>>;
36+
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>>;
3737

3838
/// Return a workflow service client instance
3939
fn workflow_client(&mut self) -> Box<dyn WorkflowService>;
@@ -175,7 +175,7 @@ impl<RC> RawClientProducer for RetryClient<RC>
175175
where
176176
RC: RawClientProducer + 'static,
177177
{
178-
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
178+
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
179179
self.get_client().get_workers_info()
180180
}
181181

@@ -253,7 +253,7 @@ impl<RC> RawClientProducer for SharedReplaceableClient<RC>
253253
where
254254
RC: RawClientProducer + Clone + Send + Sync + 'static,
255255
{
256-
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
256+
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
257257
self.inner_cow().get_workers_info()
258258
}
259259
fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
@@ -284,7 +284,7 @@ impl<RC> RawGrpcCaller for SharedReplaceableClient<RC> where
284284
}
285285

286286
impl RawClientProducer for TemporalServiceClient {
287-
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
287+
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
288288
None
289289
}
290290

@@ -312,7 +312,7 @@ impl RawClientProducer for TemporalServiceClient {
312312
impl RawGrpcCaller for TemporalServiceClient {}
313313

314314
impl RawClientProducer for ConfiguredClient<TemporalServiceClient> {
315-
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
315+
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
316316
Some(self.workers())
317317
}
318318

@@ -340,7 +340,7 @@ impl RawClientProducer for ConfiguredClient<TemporalServiceClient> {
340340
impl RawGrpcCaller for ConfiguredClient<TemporalServiceClient> {}
341341

342342
impl RawClientProducer for Client {
343-
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
343+
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
344344
self.inner.get_workers_info()
345345
}
346346

@@ -491,7 +491,7 @@ macro_rules! proxy_impl {
491491
mut request: tonic::Request<$req>,
492492
) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
493493
type_closure_arg(&mut request, $closure_request);
494-
let data = type_closure_two_arg(&mut request, Option::<Arc<SlotManager>>::None,
494+
let data = type_closure_two_arg(&mut request, Option::<Arc<ClientWorkerSet>>::None,
495495
$closure_before);
496496
async move {
497497
type_closure_two_arg(<$client_type<_>>::$method(self, request).await,
@@ -1601,6 +1601,7 @@ mod tests {
16011601
operatorservice::v1::DeleteNamespaceRequest, workflowservice::v1::ListNamespacesRequest,
16021602
};
16031603
use tonic::IntoRequest;
1604+
use uuid::Uuid;
16041605

16051606
// Just to help make sure some stuff compiles. Not run.
16061607
#[allow(dead_code)]
@@ -1791,7 +1792,7 @@ mod tests {
17911792
#[case::without_versioning(false)]
17921793
#[tokio::test]
17931794
async fn eager_reservations_attach_deployment_options(#[case] use_worker_versioning: bool) {
1794-
use crate::worker_registry::{MockSlot, MockSlotProvider};
1795+
use crate::worker_registry::{MockClientWorker, MockSlot};
17951796
use temporal_sdk_core_api::worker::{WorkerDeploymentOptions, WorkerDeploymentVersion};
17961797
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode;
17971798

@@ -1803,13 +1804,13 @@ mod tests {
18031804

18041805
#[derive(Clone)]
18051806
struct MyFakeServices {
1806-
slot_manager: Arc<SlotManager>,
1807+
client_worker_set: Arc<ClientWorkerSet>,
18071808
expected_mode: WorkerVersioningMode,
18081809
}
18091810
impl RawGrpcCaller for MyFakeServices {}
18101811
impl RawClientProducer for MyFakeServices {
1811-
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
1812-
Some(self.slot_manager.clone())
1812+
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
1813+
Some(self.client_worker_set.clone())
18131814
}
18141815
fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
18151816
Box::new(MyFakeWfClient {
@@ -1839,7 +1840,7 @@ mod tests {
18391840
default_versioning_behavior: None,
18401841
};
18411842

1842-
let mut mock_provider = MockSlotProvider::new();
1843+
let mut mock_provider = MockClientWorker::new();
18431844
mock_provider
18441845
.expect_namespace()
18451846
.return_const("test-namespace".to_string());
@@ -1854,9 +1855,16 @@ mod tests {
18541855
mock_provider
18551856
.expect_deployment_options()
18561857
.return_const(Some(deployment_opts.clone()));
1858+
mock_provider.expect_heartbeat_enabled().return_const(false);
1859+
let uuid = Uuid::new_v4();
1860+
mock_provider
1861+
.expect_worker_instance_key()
1862+
.return_const(uuid);
18571863

1858-
let slot_manager = Arc::new(SlotManager::new());
1859-
slot_manager.register(Box::new(mock_provider));
1864+
let client_worker_set = Arc::new(ClientWorkerSet::new());
1865+
client_worker_set
1866+
.register_worker(Arc::new(mock_provider), true)
1867+
.unwrap();
18601868

18611869
#[derive(Clone)]
18621870
struct MyFakeWfClient {
@@ -1886,7 +1894,7 @@ mod tests {
18861894
}
18871895

18881896
let mut mfs = MyFakeServices {
1889-
slot_manager,
1897+
client_worker_set,
18901898
expected_mode,
18911899
};
18921900

0 commit comments

Comments
 (0)