Skip to content

Commit 36708a1

Browse files
BornToGrilljackc
authored andcommitted
Eager initialize minpoolsize on connect
1 parent 044ba47 commit 36708a1

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

pgxpool/pool.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,10 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
225225
go p.backgroundHealthCheck()
226226

227227
if !config.LazyConnect {
228+
if err := p.createIdleResources(ctx, int(p.minConns)); err != nil {
229+
return nil, err
230+
}
231+
228232
// Initially establish one connection
229233
res, err := p.p.Acquire(ctx)
230234
if err != nil {
@@ -377,6 +381,29 @@ func (p *Pool) checkMinConns() {
377381
}
378382
}
379383

384+
func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
385+
ctx, cancel := context.WithCancel(parentCtx)
386+
defer cancel()
387+
388+
errs := make(chan error)
389+
390+
for i := 0; i < targetResources; i++ {
391+
go func() {
392+
err := p.p.CreateResource(ctx)
393+
errs <- err
394+
}()
395+
}
396+
397+
for i := 0; i < targetResources; i++ {
398+
if err := <-errs; err != nil {
399+
cancel()
400+
return err
401+
}
402+
}
403+
404+
return nil
405+
}
406+
380407
// Acquire returns a connection (*Conn) from the Pool
381408
func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
382409
for {

pgxpool/pool_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"os"
8+
"sync/atomic"
89
"testing"
910
"time"
1011

@@ -800,3 +801,104 @@ func TestIdempotentPoolClose(t *testing.T) {
800801
// Close the already closed pool.
801802
require.NotPanics(t, func() { pool.Close() })
802803
}
804+
805+
func TestConnectCreatesMinPool(t *testing.T) {
806+
t.Parallel()
807+
808+
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
809+
require.NoError(t, err)
810+
811+
config.MinConns = int32(12)
812+
config.MaxConns = int32(15)
813+
config.LazyConnect = false
814+
815+
acquireAttempts := int64(0)
816+
connectAttempts := int64(0)
817+
818+
config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
819+
atomic.AddInt64(&acquireAttempts, 1)
820+
return true
821+
}
822+
config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
823+
atomic.AddInt64(&connectAttempts, 1)
824+
return nil
825+
}
826+
827+
pool, err := pgxpool.ConnectConfig(context.Background(), config)
828+
require.NoError(t, err)
829+
defer pool.Close()
830+
831+
stat := pool.Stat()
832+
require.Equal(t, int32(12), stat.IdleConns())
833+
require.Equal(t, int64(1), stat.AcquireCount())
834+
require.Equal(t, int32(12), stat.TotalConns())
835+
require.Equal(t, int64(0), acquireAttempts)
836+
require.Equal(t, int64(12), connectAttempts)
837+
}
838+
func TestConnectSkipMinPoolWithLazy(t *testing.T) {
839+
t.Parallel()
840+
841+
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
842+
require.NoError(t, err)
843+
844+
config.MinConns = int32(12)
845+
config.MaxConns = int32(15)
846+
config.LazyConnect = true
847+
848+
acquireAttempts := int64(0)
849+
connectAttempts := int64(0)
850+
851+
config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
852+
atomic.AddInt64(&acquireAttempts, 1)
853+
return true
854+
}
855+
config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
856+
atomic.AddInt64(&connectAttempts, 1)
857+
return nil
858+
}
859+
860+
pool, err := pgxpool.ConnectConfig(context.Background(), config)
861+
require.NoError(t, err)
862+
defer pool.Close()
863+
864+
stat := pool.Stat()
865+
require.Equal(t, int32(0), stat.IdleConns())
866+
require.Equal(t, int64(0), stat.AcquireCount())
867+
require.Equal(t, int32(0), stat.TotalConns())
868+
require.Equal(t, int64(0), acquireAttempts)
869+
require.Equal(t, int64(0), connectAttempts)
870+
}
871+
872+
func TestConnectMinPoolZero(t *testing.T) {
873+
t.Parallel()
874+
875+
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
876+
require.NoError(t, err)
877+
878+
config.MinConns = int32(0)
879+
config.MaxConns = int32(15)
880+
config.LazyConnect = false
881+
882+
acquireAttempts := int64(0)
883+
connectAttempts := int64(0)
884+
885+
config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
886+
atomic.AddInt64(&acquireAttempts, 1)
887+
return true
888+
}
889+
config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
890+
atomic.AddInt64(&connectAttempts, 1)
891+
return nil
892+
}
893+
894+
pool, err := pgxpool.ConnectConfig(context.Background(), config)
895+
require.NoError(t, err)
896+
defer pool.Close()
897+
898+
stat := pool.Stat()
899+
require.Equal(t, int32(1), stat.IdleConns())
900+
require.Equal(t, int64(1), stat.AcquireCount())
901+
require.Equal(t, int32(1), stat.TotalConns())
902+
require.Equal(t, int64(0), acquireAttempts)
903+
require.Equal(t, int64(1), connectAttempts)
904+
}

0 commit comments

Comments
 (0)