Skip to content

Commit b9fbbd0

Browse files
denismakogonmartinpinto
authored andcommitted
Add idle_timeout to routes API (iron-io#603)
* Add inactivity_timeout to routes API Closes: iron-io#544 * Fix failing datastore tests * Rename inactivity_timeout to idle_timeout * Update swagger doc * Update hot fn doc * Fix json tags * Add function timeouts docs * Rewording
1 parent 8c56e20 commit b9fbbd0

File tree

12 files changed

+148
-46
lines changed

12 files changed

+148
-46
lines changed

api/datastore/mysql/mysql.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const routesTableCreate = `CREATE TABLE IF NOT EXISTS routes (
2323
maxc int NOT NULL,
2424
memory int NOT NULL,
2525
timeout int NOT NULL,
26+
idle_timeout int NOT NULL,
2627
type varchar(16) NOT NULL,
2728
headers text NOT NULL,
2829
config text NOT NULL,
@@ -39,7 +40,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
3940
value varchar(256) NOT NULL
4041
);`
4142

42-
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, headers, config FROM routes`
43+
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes`
4344

4445
type rowScanner interface {
4546
Scan(dest ...interface{}) error
@@ -302,10 +303,11 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route)
302303
memory,
303304
type,
304305
timeout,
306+
idle_timeout,
305307
headers,
306308
config
307309
)
308-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`,
310+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`,
309311
route.AppName,
310312
route.Path,
311313
route.Image,
@@ -314,6 +316,7 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route)
314316
route.Memory,
315317
route.Type,
316318
route.Timeout,
319+
route.IdleTimeout,
317320
string(hbyte),
318321
string(cbyte),
319322
)
@@ -359,6 +362,7 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout
359362
memory = ?,
360363
type = ?,
361364
timeout = ?,
365+
idle_timeout = ?,
362366
headers = ?,
363367
config = ?
364368
WHERE app_name = ? AND path = ?;`,
@@ -368,6 +372,7 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout
368372
route.Memory,
369373
route.Type,
370374
route.Timeout,
375+
route.IdleTimeout,
371376
string(hbyte),
372377
string(cbyte),
373378
route.AppName,
@@ -431,6 +436,7 @@ func scanRoute(scanner rowScanner, route *models.Route) error {
431436
&route.Memory,
432437
&route.Type,
433438
&route.Timeout,
439+
&route.IdleTimeout,
434440
&headerStr,
435441
&configStr,
436442
)

api/datastore/postgres/postgres.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS routes (
2525
maxc integer NOT NULL,
2626
memory integer NOT NULL,
2727
timeout integer NOT NULL,
28+
idle_timeout integer NOT NULL,
2829
type character varying(16) NOT NULL,
2930
headers text NOT NULL,
3031
config text NOT NULL,
@@ -41,7 +42,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
4142
value character varying(256) NOT NULL
4243
);`
4344

44-
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, headers, config FROM routes`
45+
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes`
4546

4647
type rowScanner interface {
4748
Scan(dest ...interface{}) error
@@ -274,10 +275,11 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
274275
memory,
275276
type,
276277
timeout,
278+
idle_timeout,
277279
headers,
278280
config
279281
)
280-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
282+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
281283
route.AppName,
282284
route.Path,
283285
route.Image,
@@ -286,6 +288,7 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
286288
route.Memory,
287289
route.Type,
288290
route.Timeout,
291+
route.IdleTimeout,
289292
string(hbyte),
290293
string(cbyte),
291294
)
@@ -329,8 +332,9 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R
329332
memory = $6,
330333
type = $7,
331334
timeout = $8,
332-
headers = $9,
333-
config = $10
335+
idle_timeout = $9,
336+
headers = $10,
337+
config = $11
334338
WHERE app_name = $1 AND path = $2;`,
335339
route.AppName,
336340
route.Path,
@@ -340,6 +344,7 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R
340344
route.Memory,
341345
route.Type,
342346
route.Timeout,
347+
route.IdleTimeout,
343348
string(hbyte),
344349
string(cbyte),
345350
)
@@ -398,6 +403,7 @@ func scanRoute(scanner rowScanner, route *models.Route) error {
398403
&route.Memory,
399404
&route.Type,
400405
&route.Timeout,
406+
&route.IdleTimeout,
401407
&headerStr,
402408
&configStr,
403409
)

api/models/new_task.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ type NewTask struct {
5454
5555
*/
5656
Timeout *int32 `json:"timeout,omitempty"`
57+
58+
/* Hot function idle timeout in seconds before termination.
59+
60+
*/
61+
IdleTimeout *int32 `json:"idle_timeout,omitempty"`
5762
}
5863

5964
// Validate validates this new task

api/models/route.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
const (
1414
defaultRouteTimeout = 30 // seconds
15+
htfnScaleDownTimeout = 30 // seconds
1516
)
1617

1718
var (
@@ -39,6 +40,7 @@ type Route struct {
3940
Format string `json:"format"`
4041
MaxConcurrency int `json:"max_concurrency"`
4142
Timeout int32 `json:"timeout"`
43+
IdleTimeout int32 `json:"idle_timeout"`
4244
Config `json:"config"`
4345
}
4446

@@ -54,6 +56,7 @@ var (
5456
ErrRoutesValidationMissingType = errors.New("Missing route Type")
5557
ErrRoutesValidationPathMalformed = errors.New("Path malformed")
5658
ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout")
59+
ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout")
5760
ErrRoutesValidationNegativeMaxConcurrency = errors.New("Negative MaxConcurrency")
5861
)
5962

@@ -86,6 +89,10 @@ func (r *Route) SetDefaults() {
8689
if r.Timeout == 0 {
8790
r.Timeout = defaultRouteTimeout
8891
}
92+
93+
//if r.IdleTimeout == 0 {
94+
// r.IdleTimeout = htfnScaleDownTimeout
95+
//}
8996
}
9097

9198
// Validate validates field values, skipping zeroed fields if skipZero is true.
@@ -141,6 +148,10 @@ func (r *Route) Validate(skipZero bool) error {
141148
res = append(res, ErrRoutesValidationNegativeTimeout)
142149
}
143150

151+
if r.IdleTimeout < 0 {
152+
res = append(res, ErrRoutesValidationNegativeIdleTimeout)
153+
}
154+
144155
if len(res) > 0 {
145156
return apiErrors.CompositeValidationError(res...)
146157
}
@@ -171,6 +182,9 @@ func (r *Route) Update(new *Route) {
171182
if new.Timeout != 0 {
172183
r.Timeout = new.Timeout
173184
}
185+
if new.IdleTimeout != 0 {
186+
r.IdleTimeout = new.IdleTimeout
187+
}
174188
if new.Format != "" {
175189
r.Format = new.Format
176190
}

api/runner/async_runner.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,18 @@ func getTask(ctx context.Context, url string) (*models.Task, error) {
4242
}
4343

4444
func getCfg(t *models.Task) *task.Config {
45+
timeout := int32(30)
4546
if t.Timeout == nil {
46-
timeout := int32(30)
4747
t.Timeout = &timeout
4848
}
49+
if t.IdleTimeout == nil {
50+
t.IdleTimeout = &timeout
51+
}
4952

5053
cfg := &task.Config{
5154
Image: *t.Image,
5255
Timeout: time.Duration(*t.Timeout) * time.Second,
56+
IdleTimeout: time.Duration(*t.IdleTimeout) * time.Second,
5357
ID: t.ID,
5458
AppName: t.AppName,
5559
Env: t.EnvVars,

api/runner/task.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ func (t *containerTask) Id() string { return t.cfg.ID }
3535
func (t *containerTask) Route() string { return "" }
3636
func (t *containerTask) Image() string { return t.cfg.Image }
3737
func (t *containerTask) Timeout() time.Duration { return t.cfg.Timeout }
38-
func (t *containerTask) Logger() (stdout, stderr io.Writer) { return t.cfg.Stdout, t.cfg.Stderr }
38+
func (t *containerTask) IdleTimeout() time.Duration { return t.cfg.IdleTimeout }
39+
func (t *containerTask) Logger() (io.Writer, io.Writer) { return t.cfg.Stdout, t.cfg.Stderr }
3940
func (t *containerTask) Volumes() [][2]string { return [][2]string{} }
4041
func (t *containerTask) WorkDir() string { return "" }
4142

api/runner/task/task.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@ import (
99
)
1010

1111
type Config struct {
12-
ID string
13-
Path string
14-
Image string
15-
Timeout time.Duration
16-
AppName string
17-
Memory uint64
18-
Env map[string]string
19-
Format string
20-
MaxConcurrency int
12+
ID string
13+
Path string
14+
Image string
15+
Timeout time.Duration
16+
IdleTimeout time.Duration
17+
AppName string
18+
Memory uint64
19+
Env map[string]string
20+
Format string
21+
MaxConcurrency int
2122

2223
Stdin io.Reader
2324
Stdout io.Writer

api/runner/worker.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,6 @@ import (
6161
// Terminate
6262
// (internal clock)
6363

64-
const (
65-
// Terminate hot function after this timeout
66-
htfnScaleDownTimeout = 30 * time.Second
67-
)
6864

6965
// RunTask helps sending a task.Request into the common concurrency stream.
7066
// Refer to StartWorkers() to understand what this is about.
@@ -264,17 +260,29 @@ func newhtfn(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Reques
264260
func (hc *htfn) serve(ctx context.Context) {
265261
lctx, cancel := context.WithCancel(ctx)
266262
var wg sync.WaitGroup
263+
cfg := *hc.cfg
264+
logger := logrus.WithFields(logrus.Fields{
265+
"app": cfg.AppName,
266+
"route": cfg.Path,
267+
"image": cfg.Image,
268+
"memory": cfg.Memory,
269+
"format": cfg.Format,
270+
"max_concurrency": cfg.MaxConcurrency,
271+
"idle_timeout": cfg.IdleTimeout,
272+
})
273+
267274
wg.Add(1)
268275
go func() {
269276
defer wg.Done()
270277
for {
271-
inactivity := time.After(htfnScaleDownTimeout)
278+
inactivity := time.After(cfg.IdleTimeout)
272279

273280
select {
274281
case <-lctx.Done():
275282
return
276283

277284
case <-inactivity:
285+
logger.Info("Canceling inactive hot function")
278286
cancel()
279287

280288
case t := <-hc.tasks:
@@ -295,7 +303,6 @@ func (hc *htfn) serve(ctx context.Context) {
295303
}
296304
}()
297305

298-
cfg := *hc.cfg
299306
cfg.Env["FN_FORMAT"] = cfg.Format
300307
cfg.Timeout = 0 // add a timeout to simulate ab.end. failure.
301308
cfg.Stdin = hc.containerIn
@@ -324,22 +331,14 @@ func (hc *htfn) serve(ctx context.Context) {
324331
defer wg.Done()
325332
scanner := bufio.NewScanner(errr)
326333
for scanner.Scan() {
327-
logrus.WithFields(logrus.Fields{
328-
"app": cfg.AppName,
329-
"route": cfg.Path,
330-
"image": cfg.Image,
331-
"memory": cfg.Memory,
332-
"format": cfg.Format,
333-
"max_concurrency": cfg.MaxConcurrency,
334-
}).Info(scanner.Text())
334+
logger.Info(scanner.Text())
335335
}
336336
}()
337337

338338
result, err := hc.rnr.Run(lctx, &cfg)
339339
if err != nil {
340340
logrus.WithError(err).Error("hot function failure detected")
341341
}
342-
cancel()
343342
errw.Close()
344343
wg.Wait()
345344
logrus.WithField("result", result).Info("hot function terminated")

api/server/runner.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -190,17 +190,18 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun
190190
}
191191

192192
cfg := &task.Config{
193-
AppName: appName,
194-
Path: found.Path,
195-
Env: envVars,
196-
Format: found.Format,
197-
ID: reqID,
198-
Image: found.Image,
199-
MaxConcurrency: found.MaxConcurrency,
200-
Memory: found.Memory,
201-
Stdin: payload,
202-
Stdout: &stdout,
203-
Timeout: time.Duration(found.Timeout) * time.Second,
193+
AppName: appName,
194+
Path: found.Path,
195+
Env: envVars,
196+
Format: found.Format,
197+
ID: reqID,
198+
Image: found.Image,
199+
MaxConcurrency: found.MaxConcurrency,
200+
Memory: found.Memory,
201+
Stdin: payload,
202+
Stdout: &stdout,
203+
Timeout: time.Duration(found.Timeout) * time.Second,
204+
IdleTimeout: time.Duration(found.IdleTimeout) * time.Second,
204205
}
205206

206207
s.Runner.Enqueue()

0 commit comments

Comments
 (0)