|
| 1 | +// Copyright 2018 The Go Authors. All rights reserved. |
| 2 | +// Use of this source code is governed by a BSD-style |
| 3 | +// license that can be found in the LICENSE file. |
| 4 | + |
| 5 | +package poll |
| 6 | + |
| 7 | +import "syscall" |
| 8 | + |
| 9 | +const ( |
| 10 | + // spliceNonblock makes calls to splice(2) non-blocking. |
| 11 | + spliceNonblock = 0x2 |
| 12 | + |
| 13 | + // maxSpliceSize is the maximum amount of data Splice asks |
| 14 | + // the kernel to move in a single call to splice(2). |
| 15 | + maxSpliceSize = 4 << 20 |
| 16 | +) |
| 17 | + |
| 18 | +// Splice transfers at most remain bytes of data from src to dst, using the |
| 19 | +// splice system call to minimize copies of data from and to userspace. |
| 20 | +// |
| 21 | +// Splice creates a temporary pipe, to serve as a buffer for the data transfer. |
| 22 | +// src and dst must both be stream-oriented sockets. |
| 23 | +// |
| 24 | +// If err != nil, sc is the system call which caused the error. |
| 25 | +func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { |
| 26 | + prfd, pwfd, sc, err := newTempPipe() |
| 27 | + if err != nil { |
| 28 | + return 0, false, sc, err |
| 29 | + } |
| 30 | + defer destroyTempPipe(prfd, pwfd) |
| 31 | + // From here on, the operation should be considered handled, |
| 32 | + // even if Splice doesn't transfer any data. |
| 33 | + if err := src.readLock(); err != nil { |
| 34 | + return 0, true, "splice", err |
| 35 | + } |
| 36 | + defer src.readUnlock() |
| 37 | + if err := dst.writeLock(); err != nil { |
| 38 | + return 0, true, "splice", err |
| 39 | + } |
| 40 | + defer dst.writeUnlock() |
| 41 | + if err := src.pd.prepareRead(src.isFile); err != nil { |
| 42 | + return 0, true, "splice", err |
| 43 | + } |
| 44 | + if err := dst.pd.prepareWrite(dst.isFile); err != nil { |
| 45 | + return 0, true, "splice", err |
| 46 | + } |
| 47 | + var inPipe, n int |
| 48 | + for err == nil && remain > 0 { |
| 49 | + max := maxSpliceSize |
| 50 | + if int64(max) > remain { |
| 51 | + max = int(remain) |
| 52 | + } |
| 53 | + inPipe, err = spliceDrain(pwfd, src, max) |
| 54 | + // spliceDrain should never return EAGAIN, so if err != nil, |
| 55 | + // Splice cannot continue. If inPipe == 0 && err == nil, |
| 56 | + // src is at EOF, and the transfer is complete. |
| 57 | + if err != nil || (inPipe == 0 && err == nil) { |
| 58 | + break |
| 59 | + } |
| 60 | + n, err = splicePump(dst, prfd, inPipe) |
| 61 | + if n > 0 { |
| 62 | + written += int64(n) |
| 63 | + remain -= int64(n) |
| 64 | + } |
| 65 | + } |
| 66 | + if err != nil { |
| 67 | + return written, true, "splice", err |
| 68 | + } |
| 69 | + return written, true, "", nil |
| 70 | +} |
| 71 | + |
| 72 | +// spliceDrain moves data from a socket to a pipe. |
| 73 | +// |
| 74 | +// Invariant: when entering spliceDrain, the pipe is empty. It is either in its |
| 75 | +// initial state, or splicePump has emptied it previously. |
| 76 | +// |
| 77 | +// Given this, spliceDrain can reasonably assume that the pipe is ready for |
| 78 | +// writing, so if splice returns EAGAIN, it must be because the socket is not |
| 79 | +// ready for reading. |
| 80 | +// |
| 81 | +// If spliceDrain returns (0, nil), src is at EOF. |
| 82 | +func spliceDrain(pipefd int, sock *FD, max int) (int, error) { |
| 83 | + for { |
| 84 | + n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock) |
| 85 | + if err != syscall.EAGAIN { |
| 86 | + return n, err |
| 87 | + } |
| 88 | + if err := sock.pd.waitRead(sock.isFile); err != nil { |
| 89 | + return n, err |
| 90 | + } |
| 91 | + } |
| 92 | +} |
| 93 | + |
| 94 | +// splicePump moves all the buffered data from a pipe to a socket. |
| 95 | +// |
| 96 | +// Invariant: when entering splicePump, there are exactly inPipe |
| 97 | +// bytes of data in the pipe, from a previous call to spliceDrain. |
| 98 | +// |
| 99 | +// By analogy to the condition from spliceDrain, splicePump |
| 100 | +// only needs to poll the socket for readiness, if splice returns |
| 101 | +// EAGAIN. |
| 102 | +// |
| 103 | +// If splicePump cannot move all the data in a single call to |
| 104 | +// splice(2), it loops over the buffered data until it has written |
| 105 | +// all of it to the socket. This behavior is similar to the Write |
| 106 | +// step of an io.Copy in userspace. |
| 107 | +func splicePump(sock *FD, pipefd int, inPipe int) (int, error) { |
| 108 | + written := 0 |
| 109 | + for inPipe > 0 { |
| 110 | + n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock) |
| 111 | + // Here, the condition n == 0 && err == nil should never be |
| 112 | + // observed, since Splice controls the write side of the pipe. |
| 113 | + if n > 0 { |
| 114 | + inPipe -= n |
| 115 | + written += n |
| 116 | + continue |
| 117 | + } |
| 118 | + if err != syscall.EAGAIN { |
| 119 | + return written, err |
| 120 | + } |
| 121 | + if err := sock.pd.waitWrite(sock.isFile); err != nil { |
| 122 | + return written, err |
| 123 | + } |
| 124 | + } |
| 125 | + return written, nil |
| 126 | +} |
| 127 | + |
| 128 | +// splice wraps the splice system call. Since the current implementation |
| 129 | +// only uses splice on sockets and pipes, the offset arguments are unused. |
| 130 | +// splice returns int instead of int64, because callers never ask it to |
| 131 | +// move more data in a single call than can fit in an int32. |
| 132 | +func splice(out int, in int, max int, flags int) (int, error) { |
| 133 | + n, err := syscall.Splice(in, nil, out, nil, max, flags) |
| 134 | + return int(n), err |
| 135 | +} |
| 136 | + |
| 137 | +// newTempPipe sets up a temporary pipe for a splice operation. |
| 138 | +func newTempPipe() (prfd, pwfd int, sc string, err error) { |
| 139 | + var fds [2]int |
| 140 | + const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK |
| 141 | + if err := syscall.Pipe2(fds[:], flags); err != nil { |
| 142 | + // pipe2 was added in 2.6.27 and our minimum requirement |
| 143 | + // is 2.6.23, so it might not be implemented. |
| 144 | + if err == syscall.ENOSYS { |
| 145 | + return newTempPipeFallback(fds[:]) |
| 146 | + } |
| 147 | + return -1, -1, "pipe2", err |
| 148 | + } |
| 149 | + return fds[0], fds[1], "", nil |
| 150 | +} |
| 151 | + |
| 152 | +// newTempPipeFallback is a fallback for newTempPipe, for systems |
| 153 | +// which do not support pipe2. |
| 154 | +func newTempPipeFallback(fds []int) (prfd, pwfd int, sc string, err error) { |
| 155 | + syscall.ForkLock.RLock() |
| 156 | + defer syscall.ForkLock.RUnlock() |
| 157 | + if err := syscall.Pipe(fds); err != nil { |
| 158 | + return -1, -1, "pipe", err |
| 159 | + } |
| 160 | + prfd, pwfd = fds[0], fds[1] |
| 161 | + syscall.CloseOnExec(prfd) |
| 162 | + syscall.CloseOnExec(pwfd) |
| 163 | + if err := syscall.SetNonblock(prfd, true); err != nil { |
| 164 | + CloseFunc(prfd) |
| 165 | + CloseFunc(pwfd) |
| 166 | + return -1, -1, "setnonblock", err |
| 167 | + } |
| 168 | + if err := syscall.SetNonblock(pwfd, true); err != nil { |
| 169 | + CloseFunc(prfd) |
| 170 | + CloseFunc(pwfd) |
| 171 | + return -1, -1, "setnonblock", err |
| 172 | + } |
| 173 | + return prfd, pwfd, "", nil |
| 174 | +} |
| 175 | + |
| 176 | +// destroyTempPipe destroys a temporary pipe. |
| 177 | +func destroyTempPipe(prfd, pwfd int) error { |
| 178 | + err := CloseFunc(prfd) |
| 179 | + err1 := CloseFunc(pwfd) |
| 180 | + if err == nil { |
| 181 | + return err1 |
| 182 | + } |
| 183 | + return err |
| 184 | +} |
0 commit comments