Skip to content

Commit 44b8801

Browse files
bk2204Chris Darroch
and
Chris Darroch
committed
ssh: add support for lazy spawning of pure SSH connections
Right now, we spawn potentially several connections without necessarily needing to. For example, if we're transferring two objects, we can practically use at most three connections: one for the batch and one for each object. To make this more efficient and avoid needless overhead, let's not actually create the connection until we attempt to acquire it. At that point, if it doesn't exist, we'll spawn a new one (using the control path socket if possible) and then start sending data. Otherwise, unless it's the initial connection, let's just stub it out until we actually need it. Note that we still create the connection before it's needed because we create all workers up front, but that will change in a future commit now that we have this change in place. Co-authored-by: Chris Darroch <[email protected]>
1 parent 82f58ad commit 44b8801

File tree

3 files changed

+65
-25
lines changed

3 files changed

+65
-25
lines changed

locking/ssh.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type sshLockClient struct {
1717
*lfsapi.Client
1818
}
1919

20-
func (c *sshLockClient) connection() *ssh.PktlineConnection {
20+
func (c *sshLockClient) connection() (*ssh.PktlineConnection, error) {
2121
return c.transfer.Connection(0)
2222
}
2323

@@ -146,10 +146,13 @@ func (c *sshLockClient) Lock(remote string, lockReq *lockRequest) (*lockResponse
146146
if lockReq.Ref != nil {
147147
args = append(args, fmt.Sprintf("refname=%s", lockReq.Ref.Name))
148148
}
149-
conn := c.connection()
149+
conn, err := c.connection()
150+
if err != nil {
151+
return nil, 0, err
152+
}
150153
conn.Lock()
151154
defer conn.Unlock()
152-
err := conn.SendMessage("lock", args)
155+
err = conn.SendMessage("lock", args)
153156
if err != nil {
154157
return nil, 0, err
155158
}
@@ -168,10 +171,13 @@ func (c *sshLockClient) Unlock(ref *git.Ref, remote, id string, force bool) (*un
168171
if ref != nil {
169172
args = append(args, fmt.Sprintf("refname=%s", ref.Name))
170173
}
171-
conn := c.connection()
174+
conn, err := c.connection()
175+
if err != nil {
176+
return nil, 0, err
177+
}
172178
conn.Lock()
173179
defer conn.Unlock()
174-
err := conn.SendMessage(fmt.Sprintf("unlock %s", id), args)
180+
err = conn.SendMessage(fmt.Sprintf("unlock %s", id), args)
175181
if err != nil {
176182
return nil, 0, err
177183
}
@@ -191,10 +197,13 @@ func (c *sshLockClient) Search(remote string, searchReq *lockSearchRequest) (*lo
191197
for key, value := range values {
192198
args = append(args, fmt.Sprintf("%s=%s", key, value))
193199
}
194-
conn := c.connection()
200+
conn, err := c.connection()
201+
if err != nil {
202+
return nil, 0, err
203+
}
195204
conn.Lock()
196205
defer conn.Unlock()
197-
err := conn.SendMessage("list-lock", args)
206+
err = conn.SendMessage("list-lock", args)
198207
if err != nil {
199208
return nil, 0, err
200209
}
@@ -225,10 +234,13 @@ func (c *sshLockClient) SearchVerifiable(remote string, vreq *lockVerifiableRequ
225234
if vreq.Limit > 0 {
226235
args = append(args, fmt.Sprintf("limit=%d", vreq.Limit))
227236
}
228-
conn := c.connection()
237+
conn, err := c.connection()
238+
if err != nil {
239+
return nil, 0, err
240+
}
229241
conn.Lock()
230242
defer conn.Unlock()
231-
err := conn.SendMessage("list-lock", args)
243+
err = conn.SendMessage("list-lock", args)
232244
if err != nil {
233245
return nil, 0, err
234246
}

ssh/connection.go

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,28 @@ func (tr *SSHTransfer) IsMultiplexingEnabled() bool {
8686
}
8787

8888
// Connection returns the nth connection (starting from 0) in this transfer
89-
// instance or nil if there is no such item.
90-
func (tr *SSHTransfer) Connection(n int) *PktlineConnection {
89+
// instance if it is initialized and otherwise initializes a new connection and
90+
// saves it in the nth position. In all cases, nil is returned if n is greater
91+
// than the maximum number of connections.
92+
func (tr *SSHTransfer) Connection(n int) (*PktlineConnection, error) {
9193
tr.lock.RLock()
92-
defer tr.lock.RUnlock()
9394
if n >= len(tr.conn) {
94-
return nil
95+
tr.lock.RUnlock()
96+
return nil, nil
97+
}
98+
if tr.conn[n] != nil {
99+
defer tr.lock.RUnlock()
100+
return tr.conn[n], nil
101+
}
102+
tr.lock.RUnlock()
103+
104+
tr.lock.Lock()
105+
defer tr.lock.Unlock()
106+
if tr.conn[n] != nil {
107+
return tr.conn[n], nil
95108
}
96-
return tr.conn[n]
109+
conn, _, err := tr.spawnConnection(n)
110+
return conn, err
97111
}
98112

99113
// ConnectionCount returns the number of connections this object has.
@@ -122,6 +136,15 @@ func (tr *SSHTransfer) SetConnectionCountAtLeast(n int) error {
122136
return tr.setConnectionCount(n)
123137
}
124138

139+
func (tr *SSHTransfer) spawnConnection(n int) (*PktlineConnection, string, error) {
140+
conn, _, controlPath, err := startConnection(n, tr.osEnv, tr.gitEnv, tr.meta, tr.operation, tr.controlPath)
141+
if err != nil {
142+
tracerx.Printf("failed to spawn pure SSH connection: %s", err)
143+
return nil, "", err
144+
}
145+
return conn, controlPath, err
146+
}
147+
125148
func (tr *SSHTransfer) setConnectionCount(n int) error {
126149
count := len(tr.conn)
127150
if n < count {
@@ -130,6 +153,10 @@ func (tr *SSHTransfer) setConnectionCount(n int) error {
130153
tn = 1
131154
}
132155
for _, item := range tr.conn[tn:count] {
156+
if item == nil {
157+
tracerx.Printf("skipping uninitialized lazy pure SSH connection (%d -> %d)", count, n)
158+
continue
159+
}
133160
tracerx.Printf("terminating pure SSH connection (%d -> %d)", count, n)
134161
if err := item.End(); err != nil {
135162
return err
@@ -138,14 +165,15 @@ func (tr *SSHTransfer) setConnectionCount(n int) error {
138165
tr.conn = tr.conn[0:tn]
139166
} else if n > count {
140167
for i := count; i < n; i++ {
141-
conn, _, controlPath, err := startConnection(i, tr.osEnv, tr.gitEnv, tr.meta, tr.operation, tr.controlPath)
142-
if err != nil {
143-
tracerx.Printf("failed to spawn pure SSH connection: %s", err)
144-
return err
145-
}
146-
tr.conn = append(tr.conn, conn)
147168
if i == 0 {
169+
conn, controlPath, err := tr.spawnConnection(i)
170+
if err != nil {
171+
return err
172+
}
173+
tr.conn = append(tr.conn, conn)
148174
tr.controlPath = controlPath
175+
} else {
176+
tr.conn = append(tr.conn, nil)
149177
}
150178
}
151179
}

tq/ssh.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ type SSHBatchClient struct {
2525
}
2626

2727
func (a *SSHBatchClient) batchInternal(args []string, batchLines []string) (int, []string, []string, error) {
28-
conn := a.transfer.Connection(0)
29-
if conn == nil {
30-
return 0, nil, nil, errors.Errorf(tr.Tr.Get("could not get connection for batch request"))
28+
conn, err := a.transfer.Connection(0)
29+
if err != nil {
30+
return 0, nil, nil, errors.Wrap(err, tr.Tr.Get("could not get connection for batch request"))
3131
}
3232
conn.Lock()
3333
defer conn.Unlock()
34-
err := conn.SendMessageWithLines("batch", args, batchLines)
34+
err = conn.SendMessageWithLines("batch", args, batchLines)
3535
if err != nil {
3636
return 0, nil, nil, errors.Wrap(err, tr.Tr.Get("batch request"))
3737
}
@@ -158,7 +158,7 @@ type SSHAdapter struct {
158158
// Implementations can run some startup logic here & return some context if needed
159159
func (a *SSHAdapter) WorkerStarting(workerNum int) (interface{}, error) {
160160
a.transfer.SetConnectionCountAtLeast(workerNum + 1)
161-
return a.transfer.Connection(workerNum), nil
161+
return a.transfer.Connection(workerNum)
162162
}
163163

164164
// WorkerEnding is called when a worker goroutine is shutting down

0 commit comments

Comments
 (0)