@@ -14,13 +14,17 @@ import (
14
14
"sync"
15
15
16
16
"github.com/gitpod-io/gitpod/common-go/log"
17
+ lru "github.com/hashicorp/golang-lru"
17
18
"golang.org/x/xerrors"
18
19
)
19
20
20
21
type IOLimiterV1 struct {
21
22
limits limits
22
23
23
- cond * sync.Cond
24
+ cond * sync.Cond
25
+ cache * lru.Cache
26
+
27
+ devices []string
24
28
}
25
29
26
30
type limits struct {
@@ -30,16 +34,20 @@ type limits struct {
30
34
ReadIOPS int64
31
35
}
32
36
33
- func NewIOLimiterV1 (writeBytesPerSecond , readBytesPerSecond , writeIOPs , readIOPs int64 ) * IOLimiterV1 {
34
- return & IOLimiterV1 {
35
- limits : limits {
36
- WriteBytesPerSecond : writeBytesPerSecond ,
37
- ReadBytesPerSecond : readBytesPerSecond ,
38
- WriteIOPS : writeIOPs ,
39
- ReadIOPS : readIOPs ,
40
- },
41
- cond : sync .NewCond (& sync.Mutex {}),
37
+ func NewIOLimiterV1 (writeBytesPerSecond , readBytesPerSecond , writeIOPs , readIOPs int64 ) (* IOLimiterV1 , error ) {
38
+ cache , err := lru .New (10 )
39
+ if err != nil {
40
+ return nil , xerrors .Errorf ("cannot build I/O limit cache: %w" , err )
41
+ }
42
+
43
+ limiter := & IOLimiterV1 {
44
+ limits : buildLimits (writeBytesPerSecond , readBytesPerSecond , writeIOPs , readIOPs ),
45
+ cond : sync .NewCond (& sync.Mutex {}),
46
+ cache : cache ,
47
+ devices : buildDevices (),
42
48
}
49
+
50
+ return limiter , nil
43
51
}
44
52
45
53
func (c * IOLimiterV1 ) Name () string { return "iolimiter-v1" }
@@ -55,80 +63,41 @@ const (
55
63
// TODO: enable custom configuration
56
64
var blockDevices = []string {"sd*" , "md*" , "nvme0n*" }
57
65
58
- func (c * IOLimiterV1 ) Update (writeBytesPerSecond , readBytesPerSecond , writeIOPs , readIOPs int64 ) {
59
- c .cond .L .Lock ()
60
- defer c .cond .L .Unlock ()
61
-
62
- c .limits = limits {
66
+ func buildLimits (writeBytesPerSecond , readBytesPerSecond , writeIOPs , readIOPs int64 ) limits {
67
+ return limits {
63
68
WriteBytesPerSecond : writeBytesPerSecond ,
64
69
ReadBytesPerSecond : readBytesPerSecond ,
65
70
WriteIOPS : writeIOPs ,
66
71
ReadIOPS : readIOPs ,
67
72
}
68
- c .cond .Broadcast ()
69
73
}
70
74
71
- func (c * IOLimiterV1 ) Apply (ctx context.Context , basePath , cgroupPath string ) error {
72
- var devices []string
73
- for _ , wc := range blockDevices {
74
- matches , err := filepath .Glob (filepath .Join ("/sys/block" , wc , "dev" ))
75
- if err != nil {
76
- log .WithField ("cgroupPath" , cgroupPath ).WithField ("wc" , wc ).Warn ("cannot glob devices" )
77
- continue
78
- }
79
-
80
- for _ , dev := range matches {
81
- fc , err := os .ReadFile (dev )
82
- if err != nil {
83
- log .WithField ("dev" , dev ).WithError (err ).Error ("cannot read device file" )
84
- }
85
- devices = append (devices , strings .TrimSpace (string (fc )))
86
- }
87
- }
88
- log .WithField ("devices" , devices ).Debug ("found devices" )
89
-
90
- produceLimits := func (value int64 ) []string {
91
- lines := make ([]string , 0 , len (devices ))
92
- for _ , dev := range devices {
93
- lines = append (lines , fmt .Sprintf ("%s %d" , dev , value ))
94
- }
95
- return lines
96
- }
75
+ func (c * IOLimiterV1 ) Update (writeBytesPerSecond , readBytesPerSecond , writeIOPs , readIOPs int64 ) {
76
+ c .cond .L .Lock ()
77
+ defer c .cond .L .Unlock ()
97
78
98
- writeLimit := func (limitPath string , content []string ) error {
99
- for _ , l := range content {
100
- _ , err := os .Stat (limitPath )
101
- if errors .Is (err , os .ErrNotExist ) {
102
- log .WithError (err ).WithField ("limitPath" , limitPath ).Debug ("blkio file does not exist" )
103
- continue
104
- }
79
+ c .limits = buildLimits (writeBytesPerSecond , readBytesPerSecond , writeIOPs , readIOPs )
105
80
106
- err = os .WriteFile (limitPath , []byte (l ), 0644 )
107
- if err != nil {
108
- log .WithError (err ).WithField ("limitPath" , limitPath ).WithField ("line" , l ).Warn ("cannot write limit" )
109
- continue
110
- }
111
- }
112
- return nil
113
- }
81
+ c .cond .Broadcast ()
82
+ }
114
83
115
- writeLimits := func ( l limits ) error {
116
- base := filepath .Join (basePath , "blkio" , cgroupPath )
84
+ func ( c * IOLimiterV1 ) Apply ( ctx context. Context , basePath , cgroupPath string ) error {
85
+ baseCgroupPath := filepath .Join (basePath , "blkio" , cgroupPath )
117
86
118
- var err error
119
- err = writeLimit (filepath .Join (base , fnBlkioThrottleWriteBps ), produceLimits (l .WriteBytesPerSecond ))
87
+ writeLimits := func ( l limits , fromCache bool ) error {
88
+ err : = writeLimit (filepath .Join (baseCgroupPath , fnBlkioThrottleWriteBps ), c . produceLimits (fnBlkioThrottleWriteBps , l .WriteBytesPerSecond , fromCache ))
120
89
if err != nil {
121
90
return xerrors .Errorf ("cannot write %s: %w" , fnBlkioThrottleWriteBps , err )
122
91
}
123
- err = writeLimit (filepath .Join (base , fnBlkioThrottleReadBps ), produceLimits (l .ReadBytesPerSecond ))
92
+ err = writeLimit (filepath .Join (baseCgroupPath , fnBlkioThrottleReadBps ), c . produceLimits (fnBlkioThrottleReadBps , l .ReadBytesPerSecond , fromCache ))
124
93
if err != nil {
125
94
return xerrors .Errorf ("cannot write %s: %w" , fnBlkioThrottleReadBps , err )
126
95
}
127
- err = writeLimit (filepath .Join (base , fnBlkioThrottleWriteIOPS ), produceLimits (l .WriteIOPS ))
96
+ err = writeLimit (filepath .Join (baseCgroupPath , fnBlkioThrottleWriteIOPS ), c . produceLimits (fnBlkioThrottleWriteIOPS , l .WriteIOPS , fromCache ))
128
97
if err != nil {
129
98
return xerrors .Errorf ("cannot write %s: %w" , fnBlkioThrottleWriteIOPS , err )
130
99
}
131
- err = writeLimit (filepath .Join (base , fnBlkioThrottleReadIOPS ), produceLimits (l .ReadIOPS ))
100
+ err = writeLimit (filepath .Join (baseCgroupPath , fnBlkioThrottleReadIOPS ), c . produceLimits (fnBlkioThrottleReadIOPS , l .ReadIOPS , fromCache ))
132
101
if err != nil {
133
102
return xerrors .Errorf ("cannot write %s: %w" , fnBlkioThrottleReadIOPS , err )
134
103
}
@@ -151,9 +120,10 @@ func (c *IOLimiterV1) Apply(ctx context.Context, basePath, cgroupPath string) er
151
120
update <- struct {}{}
152
121
}
153
122
}()
123
+
154
124
go func () {
155
125
log .WithField ("cgroupPath" , cgroupPath ).Debug ("starting IO limiting" )
156
- err := writeLimits (c .limits )
126
+ err := writeLimits (c .limits , false )
157
127
if err != nil {
158
128
log .WithError (err ).WithField ("cgroupPath" , cgroupPath ).Error ("cannot write IO limits" )
159
129
}
@@ -162,23 +132,75 @@ func (c *IOLimiterV1) Apply(ctx context.Context, basePath, cgroupPath string) er
162
132
select {
163
133
case <- update :
164
134
log .WithField ("cgroupPath" , cgroupPath ).WithField ("l" , c .limits ).Debug ("writing new IO limiting" )
165
- err := writeLimits (c .limits )
135
+ err := writeLimits (c .limits , false )
166
136
if err != nil {
167
137
log .WithError (err ).WithField ("cgroupPath" , cgroupPath ).Error ("cannot write IO limits" )
168
138
}
169
139
case <- ctx .Done ():
170
140
// Prior to shutting down though, we need to reset the IO limits to ensure we don't have
171
141
// processes stuck in the uninterruptable "D" (disk sleep) state. This would prevent the
172
142
// workspace pod from shutting down.
173
- err = writeLimits (limits {})
143
+ err = writeLimits (limits {}, false )
174
144
if err != nil {
175
145
log .WithError (err ).WithField ("cgroupPath" , cgroupPath ).Error ("cannot reset IO limits" )
176
146
}
177
147
log .WithField ("cgroupPath" , cgroupPath ).Debug ("stopping IO limiting" )
148
+ return
178
149
}
179
150
}
180
-
181
151
}()
182
152
183
153
return nil
184
154
}
155
+
156
+ func buildDevices () []string {
157
+ var devices []string
158
+ for _ , wc := range blockDevices {
159
+ matches , err := filepath .Glob (filepath .Join ("/sys/block" , wc , "dev" ))
160
+ if err != nil {
161
+ log .WithField ("wc" , wc ).Warn ("cannot glob devices" )
162
+ continue
163
+ }
164
+
165
+ for _ , dev := range matches {
166
+ fc , err := os .ReadFile (dev )
167
+ if err != nil {
168
+ log .WithField ("dev" , dev ).WithError (err ).Error ("cannot read device file" )
169
+ }
170
+ devices = append (devices , strings .TrimSpace (string (fc )))
171
+ }
172
+ }
173
+
174
+ return devices
175
+ }
176
+
177
+ func (c * IOLimiterV1 ) produceLimits (kind string , value int64 , useCache bool ) []string {
178
+ if val , exists := c .cache .Get (kind ); exists && useCache {
179
+ return val .([]string )
180
+ }
181
+
182
+ lines := make ([]string , len (c .devices ))
183
+ for _ , dev := range c .devices {
184
+ lines = append (lines , fmt .Sprintf ("%s %d" , dev , value ))
185
+ }
186
+
187
+ c .cache .Add (kind , lines )
188
+
189
+ return lines
190
+ }
191
+
192
+ func writeLimit (limitPath string , content []string ) error {
193
+ _ , err := os .Stat (limitPath )
194
+ if errors .Is (err , os .ErrNotExist ) {
195
+ return nil
196
+ }
197
+
198
+ for _ , l := range content {
199
+ err = os .WriteFile (limitPath , []byte (l ), 0644 )
200
+ if err != nil {
201
+ log .WithError (err ).WithField ("limitPath" , limitPath ).WithField ("line" , l ).Warn ("cannot write limit" )
202
+ continue
203
+ }
204
+ }
205
+ return nil
206
+ }
0 commit comments