Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions lib/io/stream/buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ def syswrite(buffer)

# Reads data from the underlying stream as efficiently as possible.
def sysread(size, buffer)
# Come on Ruby, why couldn't this just return `nil`? EOF is not exceptional. Every file has one.
while true
while [email protected]?
result = @io.read_nonblock(size, buffer, exception: false)

case result
Expand All @@ -146,8 +145,10 @@ def sysread(size, buffer)
return result
end
end
rescue Errno::EBADF
raise ::IOError, "stream closed"

# Otherwise, the `@io` was closed while reading:
# https://github.com/ruby/openssl/issues/798
raise ::IOError, "closed stream"
end
end
end
44 changes: 39 additions & 5 deletions lib/io/stream/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,28 @@ def readpartial(size = nil)
# @parameter offset [Integer] The offset to start searching from.
# @parameter limit [Integer | Nil] The maximum number of bytes to read while searching.
# @returns [Integer | Nil] The index of the pattern, or nil if not found.
private def index_of(pattern, offset, limit)
private def index_of(pattern, offset, limit, discard = false)
# We don't want to split on the pattern, so we subtract the size of the pattern.
split_offset = pattern.bytesize - 1

until index = @read_buffer.index(pattern, offset)
offset = @read_buffer.bytesize - split_offset

offset = 0 if offset < 0

return nil if limit and offset >= limit
return nil unless fill_read_buffer
if limit and offset >= limit
return nil
end

unless fill_read_buffer
return nil
end

if discard
# If we are discarding, we should consume the read buffer up to the offset:
consume_read_buffer(offset)
offset = 0
end
end

return index
Expand All @@ -136,7 +147,8 @@ def readpartial(size = nil)
# @parameter pattern [String] The pattern to match.
# @parameter offset [Integer] The offset to start searching from.
# @parameter limit [Integer] The maximum number of bytes to read, including the pattern (even if chomped).
# @returns [String | Nil] The contents of the stream up until the pattern, which is consumed but not returned.
# @parameter chomp [Boolean] Whether to remove the pattern from the returned data.
# @returns [String | Nil] The contents of the stream up until the pattern, or nil if the pattern was not found.
def read_until(pattern, offset = 0, limit: nil, chomp: true)
if index = index_of(pattern, offset, limit)
return nil if limit and index >= limit
Expand All @@ -149,6 +161,28 @@ def read_until(pattern, offset = 0, limit: nil, chomp: true)
end
end

# Efficiently discard data from the stream until encountering pattern.
# @parameter pattern [String] The pattern to match.
# @parameter offset [Integer] The offset to start searching from.
# @parameter limit [Integer] The maximum number of bytes to read, including the pattern.
# @returns [String | Nil] The contents of the stream up until the pattern, or nil if the pattern was not found.
def discard_until(pattern, offset = 0, limit: nil)
if index = index_of(pattern, offset, limit, true)
@read_buffer.freeze

if limit and index >= limit
@read_buffer = @read_buffer.byteslice(limit, @read_buffer.bytesize)

return nil
end

matched = @read_buffer.byteslice(0, index+pattern.bytesize)
@read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize)

return matched
end
end

# Peek at data in the buffer without consuming it.
# @parameter size [Integer | Nil] The number of bytes to peek at. If nil, peek at all available data.
# @returns [String] The data in the buffer without consuming it.
Expand Down
1 change: 1 addition & 0 deletions releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- On Ruby v3.3+, use `IO#write` directly instead of `IO#write_nonblock`, for better performance.
- `Buffered#sysread` now checks `@io.closed?` before attempting to read, improving error handling.

## v0.7.0

Expand Down
101 changes: 101 additions & 0 deletions test/io/stream/buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,107 @@ def before
expect(write_buffer).to be(:empty?)
end
end

with "#discard_until" do
it "can discard data until pattern" do
server.write("hello\nworld\ntest")
server.close

# Discard until "\n" - should return chunk ending with the pattern
chunk = client.discard_until("\n")
expect(chunk).not.to be_nil
expect(chunk).to be(:end_with?, "\n")
# Read the remaining data to verify it starts with "world"
expect(client.read(5)).to be == "world"

# Discard until "t" - should return chunk ending with the pattern
chunk = client.discard_until("t")
expect(chunk).not.to be_nil
expect(chunk).to be(:end_with?, "t")
# Read remaining data
expect(client.read).to be == "est"
end

it "returns nil when pattern not found and discards all data" do
server.write("hello world")
server.close

expect(client.discard_until("\n")).to be_nil
# Data should still be available since pattern was not found
expect(client.read).to be == "hello world"
end

it "can discard with a limit" do
server.write("hello\nworld\n")
server.close

# Use peek to verify initial buffer state
expect(client.peek).to be == "hello\nworld\n"

# Limit too small to find pattern - discards up to limit
expect(client.discard_until("\n", limit: 4)).to be_nil

# Use peek to verify that 4 bytes were discarded
expect(client.peek).to be == "o\nworld\n"

# After discarding 4 bytes, should find pattern in remaining data
chunk = client.discard_until("\n", limit: 5)
expect(chunk).not.to be_nil
expect(chunk).to be(:end_with?, "\n")

# Use peek to verify final buffer state
expect(client.peek).to be == "world\n"
expect(client.read).to be == "world\n"
end

it "handles patterns spanning buffer boundaries" do
# Use a small block size to force the pattern to span boundaries
client.block_size = 3

server.write("ab")
server.flush
server.write("cdef")
server.close

# Pattern "cd" spans the boundary between "ab" and "cdef"
chunk = client.discard_until("cd")
expect(chunk).not.to be_nil
expect(chunk).to be(:end_with?, "cd")
expect(client.read).to be == "ef"
end

it "handles large patterns efficiently" do
large_pattern = "X" * 20 # Trigger sliding window logic
server.write("some data before")
server.write(large_pattern)
server.write("some data after")
server.close

chunk = client.discard_until(large_pattern)
expect(chunk).not.to be_nil
expect(chunk).to be(:end_with?, large_pattern)
expect(client.read).to be == "some data after"
end

with "with 1-byte block size" do
it "can discard data with a multi-byte pattern" do
server.write("hello\nworld\n")
server.close

client.block_size = 1

chunk1 = client.discard_until("\n")
expect(chunk1).not.to be_nil
expect(chunk1).to be(:end_with?, "\n")

chunk2 = client.discard_until("\n")
expect(chunk2).not.to be_nil
expect(chunk2).to be(:end_with?, "\n")

expect(client.discard_until("\n")).to be_nil
end
end
end
end

ABidirectionalStream = Sus::Shared("a bidirectional stream") do
Expand Down
Loading