Skip to content

Commit 4e40f38

Browse files
New API, inMemorySocketPair() (#1638)
* New API, inMemorySocketPair() I'm using Array<Socket> instead of Pair<Socket, Socket> because Array is _slightly_ more friendly to Java-language consumers. * apiDump --------- Co-authored-by: Jesse Wilson <[email protected]>
1 parent c6c8340 commit 4e40f38

File tree

4 files changed

+73
-25
lines changed

4 files changed

+73
-25
lines changed

okio/api/okio.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,7 @@ public final class okio/Okio {
650650
public static final fun hashingSink (Lokio/Sink;Ljavax/crypto/Mac;)Lokio/HashingSink;
651651
public static final fun hashingSource (Lokio/Source;Ljava/security/MessageDigest;)Lokio/HashingSource;
652652
public static final fun hashingSource (Lokio/Source;Ljavax/crypto/Mac;)Lokio/HashingSource;
653+
public static final fun inMemorySocketPair (J)[Lokio/Socket;
653654
public static final fun openZip (Lokio/FileSystem;Lokio/Path;)Lokio/FileSystem;
654655
public static final fun sink (Ljava/io/File;)Lokio/Sink;
655656
public static final fun sink (Ljava/io/File;Z)Lokio/Sink;

okio/src/jvmMain/kotlin/okio/JvmOkio.kt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import java.security.MessageDigest
3434
import javax.crypto.Cipher
3535
import javax.crypto.Mac
3636
import okio.internal.DefaultSocket
37+
import okio.internal.PipeSocket
3738
import okio.internal.ResourceFileSystem
3839
import okio.internal.SocketAsyncTimeout
3940
import okio.internal.isAndroidGetsocknameError
@@ -142,6 +143,36 @@ fun Socket.source(): Source {
142143
@JvmName("socket")
143144
fun Socket.asOkioSocket(): okio.Socket = DefaultSocket(this)
144145

146+
/**
147+
* Returns an array of two symmetric sockets, _A_ (element 0) and _B_ (element 1) that are mutually
148+
* connected:
149+
*
150+
* * Pipe AB connects _A_’s sink to _B_’s source.
151+
* * Pipe BA connects _B_’s sink to _A_’s source.
152+
*
153+
* Each pipe uses a buffer to decouple source and sink. This buffer has a user-specified maximum
154+
* size. When a socket writer outruns its corresponding reader, the buffer fills up and eventually
155+
* writes to the sink will block until the reader has caught up. Symmetrically, if a reader outruns
156+
* its writer, reads block until there is data to be read.
157+
*
158+
* There is a buffer for Pipe AB and another for Pipe BA. The maximum amount of memory that could be
159+
* held by the two sockets together is `maxBufferSize * 2`.
160+
*
161+
* Limit the amount of time spent waiting for the other party by configuring [timeouts][Timeout] on
162+
* the source and the sink.
163+
*
164+
* When the sink is closed, source reads will continue to complete normally until the buffer is
165+
* exhausted. At that point reads will return -1, indicating the end of the stream. But if the
166+
* source is closed first, writes to the sink will immediately fail with an [IOException].
167+
*
168+
* Canceling either socket immediately fails all reads and writes on both sockets.
169+
*/
170+
fun inMemorySocketPair(maxBufferSize: Long): Array<okio.Socket> {
171+
val ab = Pipe(maxBufferSize)
172+
val ba = Pipe(maxBufferSize)
173+
return arrayOf(PipeSocket(ab, ba), PipeSocket(ba, ab))
174+
}
175+
145176
/** Returns a sink that writes to `file`. */
146177
@JvmOverloads
147178
@Throws(FileNotFoundException::class)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (C) 2025 Square, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package okio.internal
17+
18+
import okio.Pipe
19+
import okio.Sink
20+
import okio.Socket
21+
import okio.Source
22+
23+
internal class PipeSocket(val sinkPipe: Pipe, val sourcePipe: Pipe) : Socket {
24+
override val source: Source
25+
get() = sourcePipe.source
26+
27+
override val sink: Sink
28+
get() = sinkPipe.sink
29+
30+
override fun cancel() {
31+
sourcePipe.cancel()
32+
sinkPipe.cancel()
33+
}
34+
}

okio/src/jvmTest/kotlin/okio/SocketTest.kt

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ class SocketTest(val factory: Factory = Factory.Default) {
4646
@Before
4747
fun setUp() {
4848
val socketPair = factory.createSocketPair()
49-
this.socket = socketPair.first
50-
this.peerSocket = socketPair.second
49+
this.socket = socketPair[0]
50+
this.peerSocket = socketPair[1]
5151
this.peer = AsyncSocket(peerSocket)
5252
}
5353

@@ -257,7 +257,7 @@ class SocketTest(val factory: Factory = Factory.Default) {
257257
enum class Factory {
258258
/** Implements an okio.Socket using the `java.net.Socket` API on OS sockets. */
259259
Default {
260-
override fun createSocketPair(): Pair<Socket, Socket> {
260+
override fun createSocketPair(): Array<Socket> {
261261
val localhost = InetAddress.getByName("localhost")
262262

263263
val serverSocket = ServerSocket()
@@ -272,33 +272,15 @@ class SocketTest(val factory: Factory = Factory.Default) {
272272
socketA.connect(InetSocketAddress(localhost, serverSocket.localPort))
273273

274274
val socketB = socketBFuture.get()
275-
return socketA.asOkioSocket() to socketB.asOkioSocket()
275+
return arrayOf(socketA.asOkioSocket(), socketB.asOkioSocket())
276276
}
277277
},
278278

279-
/** Implements an okio.Socket using a pair of in-memory pipes. */
280279
Pipes {
281-
override fun createSocketPair(): Pair<Socket, Socket> {
282-
class PipeSocket(val sourcePipe: Pipe, val sinkPipe: Pipe) : Socket {
283-
override val source: Source
284-
get() = sourcePipe.source
285-
override val sink: Sink
286-
get() = sinkPipe.sink
287-
288-
override fun cancel() {
289-
sourcePipe.cancel()
290-
sinkPipe.cancel()
291-
}
292-
}
293-
294-
val aToB = Pipe(1024)
295-
val bToA = Pipe(1024)
296-
297-
return PipeSocket(aToB, bToA) to PipeSocket(bToA, aToB)
298-
}
280+
override fun createSocketPair() = inMemorySocketPair(1024)
299281
};
300282

301-
/** Returns a pair of mutually-connected sockets. */
302-
abstract fun createSocketPair(): Pair<Socket, Socket>
283+
/** Returns two mutually-connected sockets. */
284+
abstract fun createSocketPair(): Array<Socket>
303285
}
304286
}

0 commit comments

Comments
 (0)