From 98299104381315ee0b2f2879214f4864fb3a847c Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Sat, 26 Oct 2024 13:14:36 +0200 Subject: [PATCH 1/5] add: exponential backoff for CAS operations of floats Signed-off-by: Ivan Goncharov --- prometheus/atomic_update.go | 48 +++++++++++ prometheus/atomic_update_test.go | 133 +++++++++++++++++++++++++++++++ prometheus/counter.go | 10 +-- prometheus/gauge.go | 10 +-- prometheus/histogram.go | 10 +-- prometheus/summary.go | 25 +++--- 6 files changed, 200 insertions(+), 36 deletions(-) create mode 100644 prometheus/atomic_update.go create mode 100644 prometheus/atomic_update_test.go diff --git a/prometheus/atomic_update.go b/prometheus/atomic_update.go new file mode 100644 index 000000000..29539d844 --- /dev/null +++ b/prometheus/atomic_update.go @@ -0,0 +1,48 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "math" + "sync/atomic" + "time" +) + +// atomicUpdateFloat atomically updates the float64 value pointed to by bits +// using the provided updateFunc, with an exponential backoff on contention. +func atomicUpdateFloat(bits *uint64, updateFunc func(float64) float64) { + const ( + maxBackoff = 320 * time.Millisecond + initialBackoff = 10 * time.Millisecond + ) + var backoff = initialBackoff + + for { + loadedBits := atomic.LoadUint64(bits) + oldFloat := math.Float64frombits(loadedBits) + newFloat := updateFunc(oldFloat) + newBits := math.Float64bits(newFloat) + + if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { + break + } else { + // Exponential backoff with sleep and cap to avoid infinite wait + time.Sleep(backoff) + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} diff --git a/prometheus/atomic_update_test.go b/prometheus/atomic_update_test.go new file mode 100644 index 000000000..86a8dd9b4 --- /dev/null +++ b/prometheus/atomic_update_test.go @@ -0,0 +1,133 @@ +package prometheus + +import ( + "math" + "sync" + "sync/atomic" + "testing" + "unsafe" +) + +var output float64 + +func TestAtomicUpdateFloat(t *testing.T) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + var wg sync.WaitGroup + numGoroutines := 100000 + increment := 1.0 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + atomicUpdateFloat(bits, func(f float64) float64 { + return f + increment + }) + }() + } + + wg.Wait() + expected := float64(numGoroutines) * increment + if val != expected { + t.Errorf("Expected %f, got %f", expected, val) + } +} + +// Benchmark for atomicUpdateFloat with single goroutine (no contention). +func BenchmarkAtomicUpdateFloat_SingleGoroutine(b *testing.B) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + + for i := 0; i < b.N; i++ { + atomicUpdateFloat(bits, func(f float64) float64 { + return f + 1.0 + }) + } + + output = val +} + +// Benchmark for old implementation with single goroutine (no contention) -> to check overhead of backoff +func BenchmarkAtomicNoBackoff_SingleGoroutine(b *testing.B) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + + for i := 0; i < b.N; i++ { + for { + loadedBits := atomic.LoadUint64(bits) + newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0) + if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { + break + } + } + } + + output = val +} + +// Benchmark varying the number of goroutines. +func benchmarkAtomicUpdateFloatConcurrency(b *testing.B, numGoroutines int) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + b.SetParallelism(numGoroutines) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + atomicUpdateFloat(bits, func(f float64) float64 { + return f + 1.0 + }) + } + }) + + output = val +} + +func benchmarkAtomicNoBackoffFloatConcurrency(b *testing.B, numGoroutines int) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + b.SetParallelism(numGoroutines) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for { + loadedBits := atomic.LoadUint64(bits) + newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0) + if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { + break + } + } + } + }) + + output = val +} + +func BenchmarkAtomicUpdateFloat_1Goroutine(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 1) +} +func BenchmarkAtomicNoBackoff_1Goroutine(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 1) +} +func BenchmarkAtomicUpdateFloat_2Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 2) +} +func BenchmarkAtomicNoBackoff_2Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 2) +} +func BenchmarkAtomicUpdateFloat_4Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 4) +} + +func BenchmarkAtomicNoBackoff_4Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 4) +} +func BenchmarkAtomicUpdateFloat_8Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 8) +} + +func BenchmarkAtomicNoBackoff_8Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 8) +} diff --git a/prometheus/counter.go b/prometheus/counter.go index 4ce84e7a8..2996aef6a 100644 --- a/prometheus/counter.go +++ b/prometheus/counter.go @@ -134,13 +134,9 @@ func (c *counter) Add(v float64) { return } - for { - oldBits := atomic.LoadUint64(&c.valBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + v) - if atomic.CompareAndSwapUint64(&c.valBits, oldBits, newBits) { - return - } - } + atomicUpdateFloat(&c.valBits, func(oldVal float64) float64 { + return oldVal + v + }) } func (c *counter) AddWithExemplar(v float64, e Labels) { diff --git a/prometheus/gauge.go b/prometheus/gauge.go index dd2eac940..aa1846365 100644 --- a/prometheus/gauge.go +++ b/prometheus/gauge.go @@ -120,13 +120,9 @@ func (g *gauge) Dec() { } func (g *gauge) Add(val float64) { - for { - oldBits := atomic.LoadUint64(&g.valBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + val) - if atomic.CompareAndSwapUint64(&g.valBits, oldBits, newBits) { - return - } - } + atomicUpdateFloat(&g.valBits, func(oldVal float64) float64 { + return oldVal + val + }) } func (g *gauge) Sub(val float64) { diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 08bef3d87..24fcab3ab 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -1621,13 +1621,9 @@ func waitForCooldown(count uint64, counts *histogramCounts) { // atomicAddFloat adds the provided float atomically to another float // represented by the bit pattern the bits pointer is pointing to. func atomicAddFloat(bits *uint64, v float64) { - for { - loadedBits := atomic.LoadUint64(bits) - newBits := math.Float64bits(math.Float64frombits(loadedBits) + v) - if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { - break - } - } + atomicUpdateFloat(bits, func(oldVal float64) float64 { + return oldVal + v + }) } // atomicDecUint32 atomically decrements the uint32 p points to. See diff --git a/prometheus/summary.go b/prometheus/summary.go index 1ab0e4796..fb2bf2b25 100644 --- a/prometheus/summary.go +++ b/prometheus/summary.go @@ -468,13 +468,9 @@ func (s *noObjectivesSummary) Observe(v float64) { n := atomic.AddUint64(&s.countAndHotIdx, 1) hotCounts := s.counts[n>>63] - for { - oldBits := atomic.LoadUint64(&hotCounts.sumBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + v) - if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { - break - } - } + atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 { + return oldVal + v + }) // Increment count last as we take it as a signal that the observation // is complete. atomic.AddUint64(&hotCounts.count, 1) @@ -516,14 +512,13 @@ func (s *noObjectivesSummary) Write(out *dto.Metric) error { // Finally add all the cold counts to the new hot counts and reset the cold counts. atomic.AddUint64(&hotCounts.count, count) atomic.StoreUint64(&coldCounts.count, 0) - for { - oldBits := atomic.LoadUint64(&hotCounts.sumBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum()) - if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { - atomic.StoreUint64(&coldCounts.sumBits, 0) - break - } - } + + // Use atomicUpdateFloat to update hotCounts.sumBits atomically. + atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 { + return oldVal + sum.GetSampleSum() + }) + atomic.StoreUint64(&coldCounts.sumBits, 0) + return nil } From 27067c1c6ef031063e080f949c547d35815d84f8 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Sat, 26 Oct 2024 14:09:07 +0200 Subject: [PATCH 2/5] add: some more benchmark use cases (higher contention) Signed-off-by: Ivan Goncharov --- prometheus/atomic_update_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/prometheus/atomic_update_test.go b/prometheus/atomic_update_test.go index 86a8dd9b4..e822857a9 100644 --- a/prometheus/atomic_update_test.go +++ b/prometheus/atomic_update_test.go @@ -131,3 +131,19 @@ func BenchmarkAtomicUpdateFloat_8Goroutines(b *testing.B) { func BenchmarkAtomicNoBackoff_8Goroutines(b *testing.B) { benchmarkAtomicNoBackoffFloatConcurrency(b, 8) } + +func BenchmarkAtomicUpdateFloat_16Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 16) +} + +func BenchmarkAtomicNoBackoff_16Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 16) +} + +func BenchmarkAtomicUpdateFloat_32Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 32) +} + +func BenchmarkAtomicNoBackoff_32Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 32) +} From d06c03ff00a32a60cde2793537443a42de49d693 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Sat, 26 Oct 2024 14:47:10 +0200 Subject: [PATCH 3/5] fmt: fumpted some files Signed-off-by: Ivan Goncharov --- prometheus/atomic_update.go | 2 +- prometheus/atomic_update_test.go | 5 +++++ prometheus/process_collector_cgo_darwin.go | 4 +--- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/prometheus/atomic_update.go b/prometheus/atomic_update.go index 29539d844..964738da0 100644 --- a/prometheus/atomic_update.go +++ b/prometheus/atomic_update.go @@ -26,7 +26,7 @@ func atomicUpdateFloat(bits *uint64, updateFunc func(float64) float64) { maxBackoff = 320 * time.Millisecond initialBackoff = 10 * time.Millisecond ) - var backoff = initialBackoff + backoff := initialBackoff for { loadedBits := atomic.LoadUint64(bits) diff --git a/prometheus/atomic_update_test.go b/prometheus/atomic_update_test.go index e822857a9..b5cb10569 100644 --- a/prometheus/atomic_update_test.go +++ b/prometheus/atomic_update_test.go @@ -108,15 +108,19 @@ func benchmarkAtomicNoBackoffFloatConcurrency(b *testing.B, numGoroutines int) { func BenchmarkAtomicUpdateFloat_1Goroutine(b *testing.B) { benchmarkAtomicUpdateFloatConcurrency(b, 1) } + func BenchmarkAtomicNoBackoff_1Goroutine(b *testing.B) { benchmarkAtomicNoBackoffFloatConcurrency(b, 1) } + func BenchmarkAtomicUpdateFloat_2Goroutines(b *testing.B) { benchmarkAtomicUpdateFloatConcurrency(b, 2) } + func BenchmarkAtomicNoBackoff_2Goroutines(b *testing.B) { benchmarkAtomicNoBackoffFloatConcurrency(b, 2) } + func BenchmarkAtomicUpdateFloat_4Goroutines(b *testing.B) { benchmarkAtomicUpdateFloatConcurrency(b, 4) } @@ -124,6 +128,7 @@ func BenchmarkAtomicUpdateFloat_4Goroutines(b *testing.B) { func BenchmarkAtomicNoBackoff_4Goroutines(b *testing.B) { benchmarkAtomicNoBackoffFloatConcurrency(b, 4) } + func BenchmarkAtomicUpdateFloat_8Goroutines(b *testing.B) { benchmarkAtomicUpdateFloatConcurrency(b, 8) } diff --git a/prometheus/process_collector_cgo_darwin.go b/prometheus/process_collector_cgo_darwin.go index 6f48e5845..fc4c24713 100644 --- a/prometheus/process_collector_cgo_darwin.go +++ b/prometheus/process_collector_cgo_darwin.go @@ -22,9 +22,7 @@ import "C" import "fmt" func getMemory() (*memoryInfo, error) { - var ( - rss, vsize C.ulonglong - ) + var rss, vsize C.ulonglong if err := C.get_memory_info(&rss, &vsize); err != 0 { return nil, fmt.Errorf("task_info() failed with 0x%x", int(err)) From 62d6f57b94d99adff015a0f261696d345a7c5a0e Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Sat, 26 Oct 2024 14:52:41 +0200 Subject: [PATCH 4/5] add: license header Signed-off-by: Ivan Goncharov --- prometheus/atomic_update_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/prometheus/atomic_update_test.go b/prometheus/atomic_update_test.go index b5cb10569..0233bc71f 100644 --- a/prometheus/atomic_update_test.go +++ b/prometheus/atomic_update_test.go @@ -1,3 +1,16 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package prometheus import ( From ba58fd6011d1cd19835588798196795427ff51a4 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Tue, 5 Nov 2024 09:17:45 +0100 Subject: [PATCH 5/5] add: comment explaining origin of backoff constants Signed-off-by: Ivan Goncharov --- prometheus/atomic_update.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/prometheus/atomic_update.go b/prometheus/atomic_update.go index 964738da0..b65896a31 100644 --- a/prometheus/atomic_update.go +++ b/prometheus/atomic_update.go @@ -23,6 +23,8 @@ import ( // using the provided updateFunc, with an exponential backoff on contention. func atomicUpdateFloat(bits *uint64, updateFunc func(float64) float64) { const ( + // both numbers are derived from empirical observations + // documented in this PR: https://github.com/prometheus/client_golang/pull/1661 maxBackoff = 320 * time.Millisecond initialBackoff = 10 * time.Millisecond )