Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
b373d2a
feat(flow): implement adaptive throttler with resource monitoring and…
kxrxh Nov 18, 2025
bf9fa92
docs(README): add AdaptiveThrottler description to documentation
kxrxh Nov 19, 2025
55c5fe1
fix(flow): emit partial sliding window on early close (#192)
reugn Nov 15, 2025
2bc6a86
fix: resolve all golangci-lint issues
kxrxh Nov 19, 2025
fbc0df7
refactor(adaptive_throttler): streamline main function by removing co…
kxrxh Nov 19, 2025
0d97643
refactor: rename CPUUsageModeReal to CPUUsageModeMeasured
kxrxh Nov 19, 2025
837e3d6
feat(adaptive_throttler): enhance resource monitoring for containeriz…
kxrxh Nov 20, 2025
edfb728
Merge pull request #1 from reugn/master
kxrxh Nov 20, 2025
fa6e81f
feat(adaptive_throttler): add demo for adaptive throttling pipeline
kxrxh Nov 20, 2025
d1d210c
refactor(adaptive_throttler): update config validation message
kxrxh Nov 20, 2025
238ca8b
docs(README): fix punctuation in AdaptiveThrottler description
kxrxh Nov 20, 2025
5c7bc28
refactor(adaptive_throttler): rename resourceMonitorInterface to reso…
kxrxh Nov 20, 2025
754d39e
refactor(resource_monitor): move stats collection to initSampler
kxrxh Nov 20, 2025
135139f
refactor(adaptive_throttler): extract system monitoring into internal…
kxrxh Nov 21, 2025
a224484
refactor(sysmonitor): improve CPU time retrieval for current process
kxrxh Nov 21, 2025
04586f7
feat(adaptive_throttler): enhance demo with CPU workload simulation
kxrxh Nov 21, 2025
6ec1760
docs(adaptive_throttler): add clarification comment for constrained v…
kxrxh Nov 21, 2025
0633447
fix(sysmonitor): add safety checks for CPU count and handle memory st…
kxrxh Nov 22, 2025
fe90168
refactor(sysmonitor): unify memory reading functions for cgroup v1 an…
kxrxh Nov 22, 2025
5ab4ea1
refactor(sysmonitor): streamline memory reading logic and remove unus…
kxrxh Nov 22, 2025
a92689d
refactor(sysmonitor): additional context for errors was added
kxrxh Nov 22, 2025
c460ee3
refactor(sysmonitor): remove outdated validation comment in getProces…
kxrxh Nov 22, 2025
4eb9507
fix(sysmonitor): add overflow check for memory value conversion
kxrxh Nov 22, 2025
2e8e86a
fix(sysmonitor): enhance error handling in memory status retrieval
kxrxh Nov 22, 2025
5df55d2
docs(adaptive_throttler): update demo comments to clarify adaptive th…
kxrxh Nov 22, 2025
aaf9377
refactor(resource_monitor_test): remove unnecessary helper call in te…
kxrxh Nov 22, 2025
9b7cba6
Merge branch 'feature/adaptive-throttler' of https://github.com/kxrxh…
kxrxh Nov 22, 2025
a437518
refactor(resource_monitor): simplify comment for division by zero check
kxrxh Nov 22, 2025
08f6012
chore(dependencies): downgrade Go version to 1.23.0 and update indire…
kxrxh Nov 22, 2025
e7a1c81
feat(adaptive_throttler): enforce minimum sample interval to reduce C…
kxrxh Nov 22, 2025
226098e
fix(adaptive_throttler): prevent negative target rate in rate adaptation
kxrxh Nov 22, 2025
0707bb0
docs(adaptive_throttler): clarify comment on MemoryReader function re…
kxrxh Nov 22, 2025
df7560a
docs(adaptive_throttler): format comment for CPU usage sampling modes
kxrxh Nov 22, 2025
2964497
feat(resource_monitor): add comment to indicate start of periodic res…
kxrxh Nov 22, 2025
a5fb0d4
docs(adaptive_throttler): update MemoryReader comment for clarity on …
kxrxh Nov 22, 2025
95ce780
chore(dependencies): remove go.sum and update go.mod
kxrxh Nov 22, 2025
8fa0e0f
refactor(resource_monitor): move clampPercent and validatePercent fun…
kxrxh Nov 22, 2025
77b55f2
refactor(flow): rename util.go to operators.go for better clarity
kxrxh Nov 22, 2025
9b5f438
refactor(adaptive_throttler): remove unnecessary pointer dereference …
kxrxh Nov 22, 2025
55ca0c5
refactor(resource_monitor): change stats from atomic.Value to atomic.…
kxrxh Nov 22, 2025
d1c811e
refactor(adaptive_throttler): remove unnecessary mutex for rate adapt…
kxrxh Nov 22, 2025
5a46a4f
refactor(adaptive_throttler): reorder functions and improved adaptRat…
kxrxh Nov 22, 2025
e097d6b
feat(adaptive_throttler): add MaxBufferSize configuration and validat…
kxrxh Nov 22, 2025
cd8ad61
refactor(sysmonitor): simplify memory initialization by removing unne…
kxrxh Nov 22, 2025
809f604
fix(sysmonitor): ensure available memory does not exceed total memory
kxrxh Nov 22, 2025
904d9e9
refactor(adaptive_throttler): fix golangci-lint errors
kxrxh Nov 23, 2025
60b20a4
refactor(adaptive_throttler): change validateConfig function to a met…
kxrxh Nov 23, 2025
ab90a2c
refactor(adaptive_throttler, resource_monitor): enhance configuration…
kxrxh Nov 23, 2025
7d4abd4
test: add comprehensive test coverage for adaptive throttler and syst…
kxrxh Nov 23, 2025
604b5b5
test(resource_monitor): add tests for clamping and validation of perc…
kxrxh Nov 23, 2025
7dfff93
test(sysmonitor): refactor memory tests to use a helper function to f…
kxrxh Nov 23, 2025
6e24e80
refactor(adaptive_throttler, resource_monitor): implement shared moni…
kxrxh Nov 24, 2025
0028afc
refactor(sysmonitor): remove SetMemoryReader function from memory pla…
kxrxh Nov 27, 2025
7f375ba
refactor: enhance system monitoring and adaptive throttling
kxrxh Dec 3, 2025
153c015
test(sysmonitor): add comprehensive tests for Windows CPU and memory …
kxrxh Dec 3, 2025
583a92b
fix(adaptive_throttler): make close public and ensure proper resource…
kxrxh Dec 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ processing operations. These building blocks can be used to transform and manipu
- **Flatten<sup>1</sup>:** Flattens a stream of slices of elements into a stream of elements.
- **Batch:** Breaks a stream of elements into batches based on size or timing.
- **Throttler:** Limits the rate at which elements are processed.
- **AdaptiveThrottler:** Limits the rate at which elements are processed based on the current system resource utilization (CPU and memory usage).
- **SlidingWindow:** Creates overlapping windows of elements.
- **TumblingWindow:** Creates non-overlapping, fixed-size windows of elements.
- **SessionWindow:** Creates windows based on periods of activity and inactivity.
Expand Down
154 changes: 154 additions & 0 deletions examples/adaptive_throttler/demo/demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"fmt"
"math/rand"
"sync/atomic"
"time"

ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
)

// main demonstrates adaptive throttling with simulated resource pressure.
// The demo:
// 1. Produces 250 elements in bursts
// 2. Processes elements with CPU-intensive work (50ms each)
// 3. Simulates memory pressure that increases then decreases (creating throttling-recovery cycle)
// 4. The adaptive throttler adjusts throughput based on CPU/memory usage
// 5. Shows throttling down to ~1/sec during high memory, then recovery back to 40/sec
// 6. Stats are logged every 500ms showing rate adaptation
func main() {
var elementsProcessed atomic.Int64

// Set up demo configuration with memory simulation
throttler := setupDemoThrottler(&elementsProcessed)

in := make(chan any)
out := make(chan any) // Unbuffered channel to prevent apparent bursts

source := ext.NewChanSource(in)
sink := ext.NewChanSink(out)

statsDone := make(chan struct{})
go logThrottlerStats(throttler, statsDone)
defer close(statsDone)

go func() {
source.
Via(throttler).
Via(flow.NewPassThrough()).
To(sink)
}()

go produceBurst(in, 250)

var cpuWorkChecksum uint64

// Process the output
elementsReceived := 0
for element := range sink.Out {
fmt.Printf("consumer received %v\n", element)
elementsProcessed.Add(1)
elementsReceived++

// Perform CPU-intensive work
burnCPU(50*time.Millisecond, &cpuWorkChecksum)

time.Sleep(25 * time.Millisecond)
}

fmt.Printf("CPU work checksum: %d\n", cpuWorkChecksum)
fmt.Printf("Total elements produced: 250, Total elements received: %d\n", elementsReceived)
if elementsReceived == 250 {
fmt.Println("✅ SUCCESS: All elements processed without dropping!")
} else {
fmt.Printf("❌ FAILURE: %d elements were dropped!\n", 250-elementsReceived)
}

throttler.Close()

fmt.Println("adaptive throttling pipeline completed")
}

// setupDemoThrottler creates and configures an adaptive throttler with demo settings
func setupDemoThrottler(elementsProcessed *atomic.Int64) *flow.AdaptiveThrottler {
config := flow.DefaultAdaptiveThrottlerConfig()

config.MinRate = 1
config.MaxRate = 20
config.InitialRate = 20
config.SampleInterval = 200 * time.Millisecond

config.BackoffFactor = 0.5
config.RecoveryFactor = 1.5

config.MaxMemoryPercent = 35.0
config.RecoveryMemoryThreshold = 30.0

// Memory Reader Simulation - Creates a cycle: low -> high -> low memory usage
config.MemoryReader = func() (float64, error) {
elementCount := elementsProcessed.Load()

var memoryPercent float64
switch {
case elementCount <= 80: // Phase 1: Low memory, allow high throughput
memoryPercent = 5.0 + float64(elementCount)*0.1 // 5% to 13%
case elementCount <= 120: // Phase 2: Increasing memory pressure, cause throttling
memoryPercent = 15.0 + float64(elementCount-80)*0.6 // 15% to 43%
case elementCount <= 160: // Phase 3: High memory, keep throttled down to ~1/sec
memoryPercent = 30.0 + float64(elementCount-120)*0.3 // 30% to 42%
default: // Phase 4: Memory decreases, allow recovery back to 40/sec
memoryPercent = 25.0 - float64(elementCount-160)*1.5 // 25% down to ~5%
if memoryPercent < 5.0 {
memoryPercent = 5.0
}
}
return memoryPercent, nil
}

throttler, err := flow.NewAdaptiveThrottler(config)
if err != nil {
panic(fmt.Sprintf("failed to create adaptive throttler: %v", err))
}
return throttler
}

func produceBurst(in chan<- any, total int) {
defer close(in)

for i := range total {
in <- fmt.Sprintf("job-%02d", i)

if (i+1)%10 == 0 {
time.Sleep(180 * time.Millisecond)
continue
}
time.Sleep(time.Duration(2+rand.Intn(5)) * time.Millisecond)
}
}

func burnCPU(duration time.Duration, checksum *uint64) {
start := time.Now()
for time.Since(start) < duration {
for i := 0; i < 1000; i++ {
*checksum += uint64(i * i)
}
}
}

func logThrottlerStats(at *flow.AdaptiveThrottler, done <-chan struct{}) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-done:
return
case <-ticker.C:
stats := at.GetResourceStats()
fmt.Printf("[stats] Rate: %.1f/sec, CPU: %.1f%%, Memory: %.1f%%\n",
at.GetCurrentRate(), stats.CPUUsagePercent, stats.MemoryUsedPercent)
}
}
}
150 changes: 150 additions & 0 deletions examples/adaptive_throttler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"fmt"
"math/rand"
"strings"
"sync/atomic"
"time"

ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
)

// Demo of adaptive throttling with CPU-intensive work.
// T1 throttles on CPU > 0.01%, T2 throttles on Memory > 50%.

func editMessage(msg string) string {
return strings.ToUpper(msg)
}

func addTimestamp(msg string) string {
return fmt.Sprintf("[%s] %s", time.Now().Format("15:04:05"), msg)
}

// cpuIntensiveWork simulates processing load that affects CPU usage
func cpuIntensiveWork(msg string) string {
// Simulate CPU-intensive work (hashing-like computation)
var checksum uint64
for i := 0; i < 200000; i++ { // Increased from 50000 to 200000 for more CPU load
checksum += uint64(len(msg)) * uint64(i) //nolint:gosec
}
return msg
}

func main() {
var messagesProcessed atomic.Int64

// Configure first throttler - CPU-focused with higher initial rate
throttler1Config := flow.DefaultAdaptiveThrottlerConfig()
throttler1Config.MaxCPUPercent = 0.01 // Throttle when CPU > 0.01% (extremely low threshold)
throttler1Config.MaxMemoryPercent = 80.0 // Less strict memory limit
throttler1Config.InitialRate = 50 // Start at 50/sec
throttler1Config.MaxRate = 100 // Can go up to 100/sec
throttler1Config.MinRate = 5 // Minimum 5/sec
throttler1Config.SampleInterval = 200 * time.Millisecond
throttler1Config.BackoffFactor = 0.6 // Reduce to 60% when constrained
throttler1Config.RecoveryFactor = 1.4 // Increase by 40% during recovery

throttler1, err := flow.NewAdaptiveThrottler(throttler1Config)
if err != nil {
panic(fmt.Sprintf("failed to create throttler1: %v", err))
}

// Configure second throttler - Memory-focused with memory simulation
throttler2Config := flow.DefaultAdaptiveThrottlerConfig()
throttler2Config.MaxCPUPercent = 80.0 // Less strict CPU limit
throttler2Config.MaxMemoryPercent = 40.0 // Throttle when memory > 40%
throttler2Config.InitialRate = 30 // Start at 30/sec
throttler2Config.MaxRate = 80 // Can go up to 80/sec
throttler2Config.MinRate = 3 // Minimum 3/sec
throttler2Config.SampleInterval = 200 * time.Millisecond
throttler2Config.BackoffFactor = 0.5 // Reduce to 50% when constrained
throttler2Config.RecoveryFactor = 1.3 // Increase by 30% during recovery

// Use system memory but with lower threshold to demonstrate throttling
throttler2Config.MaxMemoryPercent = 50.0 // Lower threshold than T1

throttler2, err := flow.NewAdaptiveThrottler(throttler2Config)
if err != nil {
panic(fmt.Sprintf("failed to create throttler2: %v", err))
}

in := make(chan any)

source := ext.NewChanSource(in)
editMapFlow := flow.NewMap(editMessage, 1)
cpuWorkFlow := flow.NewMap(cpuIntensiveWork, 1) // Add CPU-intensive work
timestampFlow := flow.NewMap(addTimestamp, 1)
sink := ext.NewStdoutSink()

// Pipeline: Source -> Throttler1 (CPU) -> CPU Work -> Throttler2 (Memory) -> Edit -> Timestamp -> Sink
go func() {
source.
Via(throttler1). // First throttler monitors CPU
Via(cpuWorkFlow). // CPU-intensive processing
Via(throttler2). // Second throttler monitors memory
Via(editMapFlow). // Simple transformation
Via(timestampFlow). // Add timestamp
To(sink)
}()

// Enhanced stats logging showing throttling behavior
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for range ticker.C {
stats1 := throttler1.GetResourceStats()
stats2 := throttler2.GetResourceStats()

// Show current rates and resource usage
fmt.Printf("[stats] T1-CPU: %.1f/s (CPU:%.1f%%), T2-Mem: %.1f/s (Mem:%.1f%%), Total: %d msgs\n",
throttler1.GetCurrentRate(),
stats1.CPUUsagePercent,
throttler2.GetCurrentRate(),
stats2.MemoryUsedPercent,
messagesProcessed.Load())

// Show throttling status
throttleReason := ""
if stats1.CPUUsagePercent > throttler1Config.MaxCPUPercent {
throttleReason += "T1:CPU-high "
}
if stats2.MemoryUsedPercent > throttler2Config.MaxMemoryPercent {
throttleReason += "T2:Mem-high "
}
if throttleReason == "" {
throttleReason = "No throttling"
}
fmt.Printf("[throttle] %s\n", throttleReason)
}
}()

// Producer with bursty traffic to test throttling
go func() {
defer close(in)

for i := 1; i <= 100; i++ {
message := fmt.Sprintf("MESSAGE-%d", i)
in <- message
messagesProcessed.Add(1)

// Variable delay to create bursts (some fast, some slow)
var delay time.Duration
if i%20 == 0 {
delay = 100 * time.Millisecond // Burst pause every 20 messages
} else {
delay = time.Duration(5+rand.Intn(15)) * time.Millisecond //nolint:gosec // 5-20ms between messages
}
time.Sleep(delay)
}
}()

sink.AwaitCompletion()

throttler1.Close()
throttler2.Close()

fmt.Printf("Demo completed! Processed %d messages\n", messagesProcessed.Load())
}
1 change: 1 addition & 0 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ require (
)

replace (
github.com/reugn/go-streams => ..
github.com/reugn/go-streams/aerospike => ../aerospike
github.com/reugn/go-streams/aws => ../aws
github.com/reugn/go-streams/azure => ../azure
Expand Down
2 changes: 0 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,6 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI=
github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/reugn/go-streams v0.12.0 h1:SEfTZw5+iP0mQG0jWTxVOHMbqlbZjUB0W6dF3XbYd5Q=
github.com/reugn/go-streams v0.12.0/go.mod h1:dnXv6QgVTW62gEpILoLHRjI95Es7ECK2/+j9h17aIN8=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11DgpE4=
Expand Down
Loading
Loading