diff --git a/config/external.yaml b/config/external.yaml new file mode 100644 index 0000000..dd89dfd --- /dev/null +++ b/config/external.yaml @@ -0,0 +1,3 @@ +async-grpc: + url: https://github.com/socketry/async-grpc.git + command: bundle exec sus diff --git a/context/getting-started.md b/context/getting-started.md index 6997b90..96833cd 100644 --- a/context/getting-started.md +++ b/context/getting-started.md @@ -15,8 +15,8 @@ $ bundle add protocol-grpc `protocol-grpc` has several core concepts: - A {ruby Protocol::GRPC::Interface} class which defines gRPC service contracts with RPC methods, request/response types, and streaming patterns. - - A {ruby Protocol::GRPC::Body::ReadableBody} class which handles reading gRPC messages from HTTP request/response bodies with automatic framing and decoding. - - A {ruby Protocol::GRPC::Body::WritableBody} class which handles writing gRPC messages to HTTP request/response bodies with automatic framing and encoding. + - A {ruby Protocol::GRPC::Body::Readable} class which handles reading gRPC messages from HTTP request/response bodies with automatic framing and decoding. + - A {ruby Protocol::GRPC::Body::Writable} class which handles writing gRPC messages to HTTP request/response bodies with automatic framing and encoding. - A {ruby Protocol::GRPC::Middleware} abstract base class for building gRPC server applications. - A {ruby Protocol::GRPC::Call} class which represents the context of a single gRPC RPC call, including deadline tracking. - A {ruby Protocol::GRPC::Status} module with gRPC status code constants. @@ -47,15 +47,15 @@ end ### Building a Request -Build gRPC requests using `Protocol::GRPC::Methods` and `Protocol::GRPC::Body::WritableBody`: +Build gRPC requests using `Protocol::GRPC::Methods` and `Protocol::GRPC::Body::Writable`: ``` ruby require "protocol/grpc" require "protocol/grpc/methods" -require "protocol/grpc/body/writable_body" +require "protocol/grpc/body/writable" # Build request body -body = Protocol::GRPC::Body::WritableBody.new(message_class: Hello::HelloRequest) +body = Protocol::GRPC::Body::Writable.new(message_class: Hello::HelloRequest) body.write(Hello::HelloRequest.new(name: "World")) body.close_write @@ -69,13 +69,13 @@ request = Protocol::HTTP::Request["POST", path, headers, body] ### Reading a Response -Read gRPC responses using `Protocol::GRPC::Body::ReadableBody`: +Read gRPC responses using `Protocol::GRPC::Body::Readable`: ``` ruby -require "protocol/grpc/body/readable_body" +require "protocol/grpc/body/readable" # Read response body -readable_body = Protocol::GRPC::Body::ReadableBody.new( +readable_body = Protocol::GRPC::Body::Readable.new( response.body, message_class: Hello::HelloReply ) diff --git a/design.md b/design.md index c87f694..06e555c 100644 --- a/design.md +++ b/design.md @@ -293,22 +293,22 @@ module Protocol message ? URI.decode_www_form_component(message) : nil end - # Add gRPC status, message, and optional backtrace to headers. - # Whether these become headers or trailers is controlled by the protocol layer. - # @parameter headers [Protocol::HTTP::Headers] - # @parameter status [Integer] gRPC status code - # @parameter message [String | Nil] Optional status message - # @parameter error [Exception | Nil] Optional error object (used to extract backtrace) - def self.add_status!(headers, status: Status::OK, message: nil, error: nil) - headers["grpc-status"] = Header::Status.new(status) - headers["grpc-message"] = Header::Message.new(Header::Message.encode(message)) if message - + # Add gRPC status, message, and optional backtrace to headers. + # Whether these become headers or trailers is controlled by the protocol layer. + # @parameter headers [Protocol::HTTP::Headers] + # @parameter status [Integer] gRPC status code + # @parameter message [String | Nil] Optional status message + # @parameter error [Exception | Nil] Optional error object (used to extract backtrace) + def self.add_status!(headers, status: Status::OK, message: nil, error: nil) + headers["grpc-status"] = Header::Status.new(status) + headers["grpc-message"] = Header::Message.new(Header::Message.encode(message)) if message + # Add backtrace from error if available - if error && error.backtrace && !error.backtrace.empty? - headers["backtrace"] = error.backtrace + if error && error.backtrace && !error.backtrace.empty? + headers["backtrace"] = error.backtrace + end end end - end end end ``` @@ -446,7 +446,7 @@ module Protocol super(prefix + data) # Call Protocol::HTTP::Body::Writable#write end - protected + protected def compress(data) case @encoding @@ -741,13 +741,13 @@ module Protocol end # Handle the RPC - begin - handle_rpc(request, handler, handler_method, request_class, response_class) - rescue Error => error - make_response(error.status_code, error.message, error: error) - rescue => error - make_response(Status::INTERNAL, error.message, error: error) - end + begin + handle_rpc(request, handler, handler_method, request_class, response_class) + rescue Error => error + make_response(error.status_code, error.message, error: error) + rescue => error + make_response(Status::INTERNAL, error.message, error: error) + end end protected @@ -764,33 +764,33 @@ module Protocol input = Body::Readable.new(request.body, message_class: request_class, encoding: encoding) output = Body::Writable.new(encoding: encoding) - # Create call context + # Create call context response_headers = Protocol::HTTP::Headers.new([], nil, policy: HEADER_POLICY) response_headers["content-type"] = "application/grpc+proto" response_headers["grpc-encoding"] = encoding if encoding call = Call.new(request) - # Invoke handler + # Invoke handler handler.send(method, input, output, call) output.close_write unless output.closed? - # Mark trailers and add status - response_headers.trailer! - Metadata.add_status!(response_headers, status: Status::OK) + # Mark trailers and add status + response_headers.trailer! + Metadata.add_status!(response_headers, status: Status::OK) + + Protocol::HTTP::Response[200, response_headers, output] + end - Protocol::HTTP::Response[200, response_headers, output] - end - protected - - def make_response(status_code, message, error: nil) - headers = Protocol::HTTP::Headers.new([], nil, policy: HEADER_POLICY) - headers["content-type"] = "application/grpc+proto" - Metadata.add_status!(headers, status: status_code, message: message, error: error) - Protocol::HTTP::Response[200, headers, nil] - end + def make_response(status_code, message, error: nil) + headers = Protocol::HTTP::Headers.new([], nil, policy: HEADER_POLICY) + headers["content-type"] = "application/grpc+proto" + Metadata.add_status!(headers, status: status_code, message: message, error: error) + + Protocol::HTTP::Response[200, headers, nil] + end end end end @@ -804,7 +804,7 @@ Standard health checking protocol: module Protocol module GRPC module HealthCheck - # Health check status constants + # Health check status constants module ServingStatus UNKNOWN = 0 SERVING = 1 @@ -888,10 +888,10 @@ require "protocol/grpc" # This would be inside a Rack/HTTP middleware/handler def handle_grpc_request(http_request) - # Parse gRPC path + # Parse gRPC path service, method = Protocol::GRPC::Methods.parse_path(http_request.path) - # Read input messages + # Read input messages input = Protocol::GRPC::Body::Readable.new( http_request.body, message_class: MyService::HelloRequest @@ -899,26 +899,26 @@ def handle_grpc_request(http_request) request_message = input.read - # Process the request + # Process the request reply = MyService::HelloReply.new( message: "Hello, #{request_message.name}!" ) - # Create response body + # Create response body output = Protocol::GRPC::Body::Writable.new output.write(reply) output.close_write - # Build response headers with gRPC policy + # Build response headers with gRPC policy headers = Protocol::HTTP::Headers.new([], nil, policy: Protocol::GRPC::HEADER_POLICY) headers["content-type"] = "application/grpc+proto" - # Mark that trailers will follow (after body) + # Mark that trailers will follow (after body) headers.trailer! - # Add status as trailer - these will be sent after the response body - # Note: The user just adds them to headers; the @tail marker ensures - # they're recognized as trailers internally + # Add status as trailer - these will be sent after the response body + # Note: The user just adds them to headers; the @tail marker ensures + # they're recognized as trailers internally Protocol::GRPC::Metadata.add_status!(headers, status: Protocol::GRPC::Status::OK) Protocol::HTTP::Response[200, headers, output] @@ -1091,20 +1091,20 @@ require "protocol/grpc" require_relative "my_service_pb" # Generated by protoc --ruby_out module MyService - # Client stub for Greeter service + # Client stub for Greeter service class GreeterClient - # @parameter client [Async::GRPC::Client] The gRPC client + # @parameter client [Async::GRPC::Client] The gRPC client def initialize(client) @client = client end SERVICE_PATH = "my_service.Greeter" - # Unary RPC: SayHello - # @parameter request [MyService::HelloRequest] - # @parameter metadata [Hash] Custom metadata - # @parameter timeout [Numeric] Deadline - # @returns [MyService::HelloReply] + # Unary RPC: SayHello + # @parameter request [MyService::HelloRequest] + # @parameter metadata [Hash] Custom metadata + # @parameter timeout [Numeric] Deadline + # @returns [MyService::HelloReply] def say_hello(request, metadata: {}, timeout: nil) @client.unary( SERVICE_PATH, @@ -1116,45 +1116,45 @@ module MyService ) end - # Server streaming RPC: StreamNumbers - # @parameter request [MyService::HelloRequest] - # @yields {|response| ...} Each HelloReply message - # @returns [Enumerator] if no block given + # Server streaming RPC: StreamNumbers + # @parameter request [MyService::HelloRequest] + # @yields {|response| ...} Each HelloReply message + # @returns [Enumerator] if no block given def stream_numbers(request, metadata: {}, timeout: nil, &block) @client.server_streaming( SERVICE_PATH, "StreamNumbers", request, response_class: MyService::HelloReply, - metadata: metadata, - timeout: timeout, + metadata: metadata, + timeout: timeout, &block ) end - # Client streaming RPC: RecordRoute - # @yields {|stream| ...} Block that writes Point messages - # @returns [MyService::RouteSummary] + # Client streaming RPC: RecordRoute + # @yields {|stream| ...} Block that writes Point messages + # @returns [MyService::RouteSummary] def record_route(metadata: {}, timeout: nil, &block) @client.client_streaming( SERVICE_PATH, "RecordRoute", response_class: MyService::RouteSummary, - metadata: metadata, - timeout: timeout, + metadata: metadata, + timeout: timeout, &block ) end - # Bidirectional streaming RPC: RouteChat - # @yields {|input, output| ...} input for writing, output for reading + # Bidirectional streaming RPC: RouteChat + # @yields {|input, output| ...} input for writing, output for reading def route_chat(metadata: {}, timeout: nil, &block) @client.bidirectional_streaming( SERVICE_PATH, "RouteChat", response_class: MyService::Point, - metadata: metadata, - timeout: timeout, + metadata: metadata, + timeout: timeout, &block ) end @@ -1172,76 +1172,76 @@ require "protocol/grpc" require_relative "my_service_pb" # Generated by protoc --ruby_out module MyService - # Base class for Greeter service implementation - # Inherit from this class and implement the RPC methods + # Base class for Greeter service implementation + # Inherit from this class and implement the RPC methods class GreeterService - # Unary RPC: SayHello - # Override this method in your implementation - # @parameter request [MyService::HelloRequest] - # @parameter call [Protocol::GRPC::ServerCall] Call context with metadata - # @returns [MyService::HelloReply] + # Unary RPC: SayHello + # Override this method in your implementation + # @parameter request [MyService::HelloRequest] + # @parameter call [Protocol::GRPC::ServerCall] Call context with metadata + # @returns [MyService::HelloReply] def say_hello(request, call) raise NotImplementedError, "#{self.class}#say_hello not implemented" end - # Server streaming RPC: StreamNumbers - # Override this method in your implementation - # @parameter request [MyService::HelloRequest] - # @parameter call [Protocol::GRPC::ServerCall] Call context with metadata - # @yields [MyService::HelloReply] Yield each response message + # Server streaming RPC: StreamNumbers + # Override this method in your implementation + # @parameter request [MyService::HelloRequest] + # @parameter call [Protocol::GRPC::ServerCall] Call context with metadata + # @yields [MyService::HelloReply] Yield each response message def stream_numbers(request, call) raise NotImplementedError, "#{self.class}#stream_numbers not implemented" end - # Client streaming RPC: RecordRoute - # Override this method in your implementation - # @parameter call [Protocol::GRPC::ServerCall] Call context with metadata - # @yields [MyService::Point] Each request message from client - # @returns [MyService::RouteSummary] + # Client streaming RPC: RecordRoute + # Override this method in your implementation + # @parameter call [Protocol::GRPC::ServerCall] Call context with metadata + # @yields [MyService::Point] Each request message from client + # @returns [MyService::RouteSummary] def record_route(call) raise NotImplementedError, "#{self.class}#record_route not implemented" end - # Bidirectional streaming RPC: RouteChat - # Override this method in your implementation - # @parameter call [Protocol::GRPC::ServerCall] Call context with metadata - # @returns [Enumerator, Enumerator] (input, output) - input for reading, output for writing + # Bidirectional streaming RPC: RouteChat + # Override this method in your implementation + # @parameter call [Protocol::GRPC::ServerCall] Call context with metadata + # @returns [Enumerator, Enumerator] (input, output) - input for reading, output for writing def route_chat(call) raise NotImplementedError, "#{self.class}#route_chat not implemented" end - # Internal: Dispatch method for Async::GRPC::Server - # Maps RPC calls to handler methods + # Internal: Dispatch method for Async::GRPC::Server + # Maps RPC calls to handler methods def self.rpc_descriptions { "SayHello" => { method: :say_hello, - request_class: MyService::HelloRequest, - response_class: MyService::HelloReply, - request_streaming: false, - response_streaming: false + request_class: MyService::HelloRequest, + response_class: MyService::HelloReply, + request_streaming: false, + response_streaming: false + }, + "StreamNumbers" => { + method: :stream_numbers, + request_class: MyService::HelloRequest, + response_class: MyService::HelloReply, + request_streaming: false, + response_streaming: true + }, + "RecordRoute" => { + method: :record_route, + request_class: MyService::Point, + response_class: MyService::RouteSummary, + request_streaming: true, + response_streaming: false }, - "StreamNumbers" => { - method: :stream_numbers, - request_class: MyService::HelloRequest, - response_class: MyService::HelloReply, - request_streaming: false, - response_streaming: true - }, - "RecordRoute" => { - method: :record_route, - request_class: MyService::Point, - response_class: MyService::RouteSummary, - request_streaming: true, - response_streaming: false - }, - "RouteChat" => { - method: :route_chat, - request_class: MyService::Point, - response_class: MyService::Point, - request_streaming: true, - response_streaming: true - } + "RouteChat" => { + method: :route_chat, + request_class: MyService::Point, + response_class: MyService::Point, + request_streaming: true, + response_streaming: true + } } end end @@ -1261,12 +1261,12 @@ Async do client = Async::GRPC::Client.new(endpoint) stub = MyService::GreeterClient.new(client) - # Clean, typed interface! + # Clean, typed interface! request = MyService::HelloRequest.new(name: "World") response = stub.say_hello(request) puts response.message - # Server streaming + # Server streaming stub.stream_numbers(request) do |reply| puts reply.message end @@ -1315,7 +1315,7 @@ Async do server = Async::GRPC::Server.new server.register("my_service.Greeter", MyGreeter.new) - # ... start server + # ... start server end ``` @@ -1332,27 +1332,27 @@ Key classes: module Protocol module GRPC class Generator - # @parameter proto_file [String] Path to .proto file + # @parameter proto_file [String] Path to .proto file def initialize(proto_file) @proto = parse_proto(proto_file) end def generate_client(output_path) - # Generate client stub + # Generate client stub end def generate_server(output_path) - # Generate server base class + # Generate server base class end - private + private def parse_proto(file) - # Simple parsing - extract: - # - package name - # - message names (just reference them, protoc generates these) - # - service definitions - # - RPC methods with request/response types and streaming flags + # Simple parsing - extract: + # - package name + # - message names (just reference them, protoc generates these) + # - service definitions + # - RPC methods with request/response types and streaming flags end end end @@ -1392,8 +1392,8 @@ module Bake Console.logger.info(self){"Generated #{output_path}"} end - # Generate gRPC stubs for all .proto files in directory - # @parameter directory [String] Directory containing .proto files + # Generate gRPC stubs for all .proto files in directory + # @parameter directory [String] Directory containing .proto files def generate_all(directory: ".") Dir.glob(File.join(directory, "**/*.proto")).each do |proto_file| generate(proto_file) diff --git a/fixtures/protocol/grpc/test_middleware.rb b/fixtures/protocol/grpc/test_middleware.rb index b99d7b2..93ca1cb 100644 --- a/fixtures/protocol/grpc/test_middleware.rb +++ b/fixtures/protocol/grpc/test_middleware.rb @@ -6,8 +6,8 @@ require "protocol/grpc/middleware" require "protocol/grpc/methods" require "protocol/grpc/call" -require "protocol/grpc/body/readable_body" -require "protocol/grpc/body/writable_body" +require "protocol/grpc/body/readable" +require "protocol/grpc/body/writable" # Test implementation of Middleware with service routing class TestMiddleware < Protocol::GRPC::Middleware @@ -39,8 +39,8 @@ def dispatch(request) # Create protocol-level objects for gRPC handling encoding = request.headers["grpc-encoding"] - input = Protocol::GRPC::Body::ReadableBody.new(request.body, encoding: encoding) - output = Protocol::GRPC::Body::WritableBody.new(encoding: encoding) + input = Protocol::GRPC::Body::Readable.new(request.body, encoding: encoding) + output = Protocol::GRPC::Body::Writable.new(encoding: encoding) # Create call context response_headers = Protocol::HTTP::Headers.new([], nil, policy: Protocol::GRPC::HEADER_POLICY) @@ -61,7 +61,7 @@ def dispatch(request) result = wrapper.call(input, output, call) # Handler may return a different output, or modify the existing one - final_output = result.is_a?(Protocol::GRPC::Body::WritableBody) ? result : output + final_output = result.is_a?(Protocol::GRPC::Body::Writable) ? result : output final_output.close_write unless final_output.closed? # Mark trailers and add status @@ -101,7 +101,7 @@ def call(input, output, call) underlying_body = input.body # Preserve any buffered data from the original input original_buffer = input.instance_variable_get(:@buffer) - input = Protocol::GRPC::Body::ReadableBody.new(underlying_body, message_class: @request_class, encoding: encoding) + input = Protocol::GRPC::Body::Readable.new(underlying_body, message_class: @request_class, encoding: encoding) # Copy buffered data if any exists if original_buffer && !original_buffer.empty? input.instance_variable_set(:@buffer, original_buffer.dup) @@ -112,7 +112,7 @@ def call(input, output, call) encoding = output.encoding # Create new output with type information # The original output's data is lost, but that's okay since we haven't written to it yet - output = Protocol::GRPC::Body::WritableBody.new(message_class: @response_class, encoding: encoding) + output = Protocol::GRPC::Body::Writable.new(message_class: @response_class, encoding: encoding) end # Call the actual handler method diff --git a/guides/getting-started/readme.md b/guides/getting-started/readme.md index 6997b90..96833cd 100644 --- a/guides/getting-started/readme.md +++ b/guides/getting-started/readme.md @@ -15,8 +15,8 @@ $ bundle add protocol-grpc `protocol-grpc` has several core concepts: - A {ruby Protocol::GRPC::Interface} class which defines gRPC service contracts with RPC methods, request/response types, and streaming patterns. - - A {ruby Protocol::GRPC::Body::ReadableBody} class which handles reading gRPC messages from HTTP request/response bodies with automatic framing and decoding. - - A {ruby Protocol::GRPC::Body::WritableBody} class which handles writing gRPC messages to HTTP request/response bodies with automatic framing and encoding. + - A {ruby Protocol::GRPC::Body::Readable} class which handles reading gRPC messages from HTTP request/response bodies with automatic framing and decoding. + - A {ruby Protocol::GRPC::Body::Writable} class which handles writing gRPC messages to HTTP request/response bodies with automatic framing and encoding. - A {ruby Protocol::GRPC::Middleware} abstract base class for building gRPC server applications. - A {ruby Protocol::GRPC::Call} class which represents the context of a single gRPC RPC call, including deadline tracking. - A {ruby Protocol::GRPC::Status} module with gRPC status code constants. @@ -47,15 +47,15 @@ end ### Building a Request -Build gRPC requests using `Protocol::GRPC::Methods` and `Protocol::GRPC::Body::WritableBody`: +Build gRPC requests using `Protocol::GRPC::Methods` and `Protocol::GRPC::Body::Writable`: ``` ruby require "protocol/grpc" require "protocol/grpc/methods" -require "protocol/grpc/body/writable_body" +require "protocol/grpc/body/writable" # Build request body -body = Protocol::GRPC::Body::WritableBody.new(message_class: Hello::HelloRequest) +body = Protocol::GRPC::Body::Writable.new(message_class: Hello::HelloRequest) body.write(Hello::HelloRequest.new(name: "World")) body.close_write @@ -69,13 +69,13 @@ request = Protocol::HTTP::Request["POST", path, headers, body] ### Reading a Response -Read gRPC responses using `Protocol::GRPC::Body::ReadableBody`: +Read gRPC responses using `Protocol::GRPC::Body::Readable`: ``` ruby -require "protocol/grpc/body/readable_body" +require "protocol/grpc/body/readable" # Read response body -readable_body = Protocol::GRPC::Body::ReadableBody.new( +readable_body = Protocol::GRPC::Body::Readable.new( response.body, message_class: Hello::HelloReply ) diff --git a/lib/protocol/grpc.rb b/lib/protocol/grpc.rb index 73af9d9..89404c9 100644 --- a/lib/protocol/grpc.rb +++ b/lib/protocol/grpc.rb @@ -11,8 +11,8 @@ require_relative "grpc/header" require_relative "grpc/metadata" require_relative "grpc/call" -require_relative "grpc/body/readable_body" -require_relative "grpc/body/writable_body" +require_relative "grpc/body/readable" +require_relative "grpc/body/writable" require_relative "grpc/interface" require_relative "grpc/middleware" require_relative "grpc/health_check" diff --git a/lib/protocol/grpc/body/readable.rb b/lib/protocol/grpc/body/readable.rb new file mode 100644 index 0000000..e76d0d4 --- /dev/null +++ b/lib/protocol/grpc/body/readable.rb @@ -0,0 +1,125 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "protocol/http" +require "protocol/http/body/wrapper" +require "zlib" + +module Protocol + module GRPC + # @namespace + module Body + # Represents a readable body for gRPC messages with length-prefixed framing. + # This is the standard readable body for gRPC - all gRPC responses use message framing. + # Wraps the underlying HTTP body and transforms raw chunks into decoded gRPC messages. + class Readable < Protocol::HTTP::Body::Wrapper + # Wrap the body of a message. + # + # @parameter message [Request | Response] The message to wrap. + # @parameter options [Hash] The options to pass to the initializer. + # @returns [Readable | Nil] The wrapped body or `nil` if the message has no body. + def self.wrap(message, **options) + if body = message.body + message.body = self.new(body, **options) + end + + return message.body + end + + # Initialize a new readable body for gRPC messages. + # @parameter body [Protocol::HTTP::Body::Readable] The underlying HTTP body + # @parameter message_class [Class | Nil] Protobuf message class with .decode method. + # If `nil`, returns raw binary data (useful for channel adapters) + # @parameter encoding [String | Nil] Compression encoding (from grpc-encoding header) + def initialize(body, message_class: nil, encoding: nil) + super(body) + @message_class = message_class + @encoding = encoding + @buffer = String.new.force_encoding(Encoding::BINARY) + end + + # @attribute [String | Nil] The compression encoding. + attr_reader :encoding + + # Read the next gRPC message. + # Overrides Wrapper#read to transform raw HTTP body chunks into decoded gRPC messages. + # @returns [Object | String | Nil] Decoded message, raw binary, or `Nil` if stream ended + def read + return nil if @body.nil? || @body.empty? + + # Read 5-byte prefix: 1 byte compression flag + 4 bytes length + prefix = read_exactly(5) + return nil unless prefix + + compressed = prefix[0].unpack1("C") == 1 + length = prefix[1..4].unpack1("N") + + # Read the message body: + data = read_exactly(length) + return nil unless data + + # Decompress if needed: + data = decompress(data) if compressed + + # Decode using message class if provided, otherwise return binary: + # This allows binary mode for channel adapters + if @message_class + # Use protobuf gem's decode method: + @message_class.decode(data) + else + data # Return raw binary + end + end + + private + + # Read exactly n bytes from the underlying body. + # @parameter n [Integer] The number of bytes to read + # @returns [String | Nil] The data read, or `Nil` if the stream ended + def read_exactly(n) + # Fill buffer until we have enough data: + while @buffer.bytesize < n + return nil if @body.nil? || @body.empty? + + # Read chunk from underlying body: + chunk = @body.read + + if chunk.nil? + # End of stream: + return nil + end + + # Append to buffer: + @buffer << chunk.force_encoding(Encoding::BINARY) + end + + # Extract the required data: + data = @buffer[0...n] + @buffer = @buffer[n..] + data + end + + # Decompress data using the configured encoding. + # @parameter data [String] The compressed data + # @returns [String] The decompressed data + # @raises [Error] If decompression fails + def decompress(data) + case @encoding + when "gzip" + Zlib::Gunzip.new.inflate(data) + when "deflate" + Zlib::Inflate.inflate(data) + else + data + end + rescue StandardError => error + raise Error.new(Status::INTERNAL, "Failed to decompress message: #{error.message}") + end + end + end + end +end + + diff --git a/lib/protocol/grpc/body/readable_body.rb b/lib/protocol/grpc/body/readable_body.rb index 2dce94c..417443b 100644 --- a/lib/protocol/grpc/body/readable_body.rb +++ b/lib/protocol/grpc/body/readable_body.rb @@ -3,121 +3,21 @@ # Released under the MIT License. # Copyright, 2025, by Samuel Williams. -require "protocol/http" -require "protocol/http/body/wrapper" -require "zlib" +# Compatibility shim for the old file name. +# This file is deprecated and will be removed in a future version. +# Please update your code to require 'protocol/grpc/body/readable' instead. + +warn "Requiring 'protocol/grpc/body/readable_body' is deprecated. Please require 'protocol/grpc/body/readable' instead.", uplevel: 1 if $VERBOSE + +require_relative "readable" module Protocol module GRPC - # @namespace module Body - # Represents a readable body for gRPC messages with length-prefixed framing. - # This is the standard readable body for gRPC - all gRPC responses use message framing. - # Wraps the underlying HTTP body and transforms raw chunks into decoded gRPC messages. - class ReadableBody < Protocol::HTTP::Body::Wrapper - # Wrap the body of a message. - # - # @parameter message [Request | Response] The message to wrap. - # @parameter options [Hash] The options to pass to the initializer. - # @returns [ReadableBody | Nil] The wrapped body or `nil` if the message has no body. - def self.wrap(message, **options) - if body = message.body - message.body = self.new(body, **options) - end - - return message.body - end - - # Initialize a new readable body for gRPC messages. - # @parameter body [Protocol::HTTP::Body::Readable] The underlying HTTP body - # @parameter message_class [Class | Nil] Protobuf message class with .decode method. - # If `nil`, returns raw binary data (useful for channel adapters) - # @parameter encoding [String | Nil] Compression encoding (from grpc-encoding header) - def initialize(body, message_class: nil, encoding: nil) - super(body) - @message_class = message_class - @encoding = encoding - @buffer = String.new.force_encoding(Encoding::BINARY) - end - - # @attribute [String | Nil] The compression encoding. - attr_reader :encoding - - # Read the next gRPC message. - # Overrides Wrapper#read to transform raw HTTP body chunks into decoded gRPC messages. - # @returns [Object | String | Nil] Decoded message, raw binary, or `Nil` if stream ended - def read - return nil if @body.nil? || @body.empty? - - # Read 5-byte prefix: 1 byte compression flag + 4 bytes length - prefix = read_exactly(5) - return nil unless prefix - - compressed = prefix[0].unpack1("C") == 1 - length = prefix[1..4].unpack1("N") - - # Read the message body: - data = read_exactly(length) - return nil unless data - - # Decompress if needed: - data = decompress(data) if compressed - - # Decode using message class if provided, otherwise return binary: - # This allows binary mode for channel adapters - if @message_class - # Use protobuf gem's decode method: - @message_class.decode(data) - else - data # Return raw binary - end - end - - private - - # Read exactly n bytes from the underlying body. - # @parameter n [Integer] The number of bytes to read - # @returns [String | Nil] The data read, or `Nil` if the stream ended - def read_exactly(n) - # Fill buffer until we have enough data: - while @buffer.bytesize < n - return nil if @body.nil? || @body.empty? - - # Read chunk from underlying body: - chunk = @body.read - - if chunk.nil? - # End of stream: - return nil - end - - # Append to buffer: - @buffer << chunk.force_encoding(Encoding::BINARY) - end - - # Extract the required data: - data = @buffer[0...n] - @buffer = @buffer[n..] - data - end - - # Decompress data using the configured encoding. - # @parameter data [String] The compressed data - # @returns [String] The decompressed data - # @raises [Error] If decompression fails - def decompress(data) - case @encoding - when "gzip" - Zlib::Gunzip.new.inflate(data) - when "deflate" - Zlib::Inflate.inflate(data) - else - data - end - rescue StandardError => error - raise Error.new(Status::INTERNAL, "Failed to decompress message: #{error.message}") - end - end + # Compatibility alias for the old class name. + # @deprecated Use {Readable} instead. + ReadableBody = Readable end end end + diff --git a/lib/protocol/grpc/body/writable.rb b/lib/protocol/grpc/body/writable.rb new file mode 100644 index 0000000..283858f --- /dev/null +++ b/lib/protocol/grpc/body/writable.rb @@ -0,0 +1,103 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "protocol/http" +require "protocol/http/body/writable" +require "zlib" +require "stringio" + +module Protocol + module GRPC + # @namespace + module Body + # Represents a writable body for gRPC messages with length-prefixed framing. + # This is the standard writable body for gRPC - all gRPC requests use message framing. + class Writable < Protocol::HTTP::Body::Writable + # Initialize a new writable body for gRPC messages. + # @parameter encoding [String | Nil] Compression encoding (gzip, deflate, identity) + # @parameter level [Integer] Compression level if encoding is used + # @parameter message_class [Class | Nil] Expected message class for validation + def initialize(encoding: nil, level: Zlib::DEFAULT_COMPRESSION, message_class: nil, **options) + super(**options) + @encoding = encoding + @level = level + @message_class = message_class + end + + # @attribute [String | Nil] The compression encoding. + attr_reader :encoding + + # @attribute [Class | Nil] The expected message class for validation. + attr_reader :message_class + + # Write a message with gRPC framing. + # @parameter message [Object, String] Protobuf message instance or raw binary data + # @parameter compressed [Boolean | Nil] Whether to compress this specific message. If `nil`, uses the encoding setting. + def write(message, compressed: nil) + # Validate message type if message_class is specified: + if @message_class && !message.is_a?(String) + unless message.is_a?(@message_class) + raise TypeError, "Expected #{@message_class}, got #{message.class}" + end + end + + # Encode message to binary if it's not already a string: + # This supports both high-level (protobuf objects) and low-level (binary) usage + data = if message.is_a?(String) + message # Already binary, use as-is (for channel adapters) + elsif message.respond_to?(:to_proto) + # Use protobuf gem's to_proto method: + message.to_proto + elsif message.respond_to?(:encode) + # Use encode method: + message.encode + else + raise ArgumentError, "Message must respond to :to_proto or :encode" + end + + # Determine if we should compress this message: + # If compressed param is nil, use the encoding setting + should_compress = compressed.nil? ? (@encoding && @encoding != "identity") : compressed + + # Compress if requested: + data = compress(data) if should_compress + + # Build prefix: compression flag + length + compression_flag = should_compress ? 1 : 0 + length = data.bytesize + prefix = [compression_flag].pack("C") + [length].pack("N") + + # Write prefix + data to underlying body: + super(prefix + data) # Call Protocol::HTTP::Body::Writable#write + end + + protected + + # Compress data using the configured encoding. + # @parameter data [String] The data to compress + # @returns [String] The compressed data + # @raises [Error] If compression fails + def compress(data) + case @encoding + when "gzip" + io = StringIO.new + gz = Zlib::GzipWriter.new(io, @level) + gz.write(data) + gz.close + io.string + when "deflate" + Zlib::Deflate.deflate(data, @level) + else + data # No compression or identity + end + rescue StandardError => error + raise Error.new(Status::INTERNAL, "Failed to compress message: #{error.message}") + end + end + end + end +end + + diff --git a/lib/protocol/grpc/body/writable_body.rb b/lib/protocol/grpc/body/writable_body.rb index 95fd146..b87a544 100644 --- a/lib/protocol/grpc/body/writable_body.rb +++ b/lib/protocol/grpc/body/writable_body.rb @@ -3,99 +3,21 @@ # Released under the MIT License. # Copyright, 2025, by Samuel Williams. -require "protocol/http" -require "protocol/http/body/writable" -require "zlib" -require "stringio" +# Compatibility shim for the old file name. +# This file is deprecated and will be removed in a future version. +# Please update your code to require 'protocol/grpc/body/writable' instead. + +warn "Requiring 'protocol/grpc/body/writable_body' is deprecated. Please require 'protocol/grpc/body/writable' instead.", uplevel: 1 if $VERBOSE + +require_relative "writable" module Protocol module GRPC - # @namespace module Body - # Represents a writable body for gRPC messages with length-prefixed framing. - # This is the standard writable body for gRPC - all gRPC requests use message framing. - class WritableBody < Protocol::HTTP::Body::Writable - # Initialize a new writable body for gRPC messages. - # @parameter encoding [String | Nil] Compression encoding (gzip, deflate, identity) - # @parameter level [Integer] Compression level if encoding is used - # @parameter message_class [Class | Nil] Expected message class for validation - def initialize(encoding: nil, level: Zlib::DEFAULT_COMPRESSION, message_class: nil, **options) - super(**options) - @encoding = encoding - @level = level - @message_class = message_class - end - - # @attribute [String | Nil] The compression encoding. - attr_reader :encoding - - # @attribute [Class | Nil] The expected message class for validation. - attr_reader :message_class - - # Write a message with gRPC framing. - # @parameter message [Object, String] Protobuf message instance or raw binary data - # @parameter compressed [Boolean | Nil] Whether to compress this specific message. If `nil`, uses the encoding setting. - def write(message, compressed: nil) - # Validate message type if message_class is specified: - if @message_class && !message.is_a?(String) - unless message.is_a?(@message_class) - raise TypeError, "Expected #{@message_class}, got #{message.class}" - end - end - - # Encode message to binary if it's not already a string: - # This supports both high-level (protobuf objects) and low-level (binary) usage - data = if message.is_a?(String) - message # Already binary, use as-is (for channel adapters) - elsif message.respond_to?(:to_proto) - # Use protobuf gem's to_proto method: - message.to_proto - elsif message.respond_to?(:encode) - # Use encode method: - message.encode - else - raise ArgumentError, "Message must respond to :to_proto or :encode" - end - - # Determine if we should compress this message: - # If compressed param is nil, use the encoding setting - should_compress = compressed.nil? ? (@encoding && @encoding != "identity") : compressed - - # Compress if requested: - data = compress(data) if should_compress - - # Build prefix: compression flag + length - compression_flag = should_compress ? 1 : 0 - length = data.bytesize - prefix = [compression_flag].pack("C") + [length].pack("N") - - # Write prefix + data to underlying body: - super(prefix + data) # Call Protocol::HTTP::Body::Writable#write - end - - protected - - # Compress data using the configured encoding. - # @parameter data [String] The data to compress - # @returns [String] The compressed data - # @raises [Error] If compression fails - def compress(data) - case @encoding - when "gzip" - io = StringIO.new - gz = Zlib::GzipWriter.new(io, @level) - gz.write(data) - gz.close - io.string - when "deflate" - Zlib::Deflate.deflate(data, @level) - else - data # No compression or identity - end - rescue StandardError => error - raise Error.new(Status::INTERNAL, "Failed to compress message: #{error.message}") - end - end + # Compatibility alias for the old class name. + # @deprecated Use {Writable} instead. + WritableBody = Writable end end end + diff --git a/test/protocol/grpc/body/readable_body.rb b/test/protocol/grpc/body/readable.rb similarity index 94% rename from test/protocol/grpc/body/readable_body.rb rename to test/protocol/grpc/body/readable.rb index e10b063..c523b64 100644 --- a/test/protocol/grpc/body/readable_body.rb +++ b/test/protocol/grpc/body/readable.rb @@ -3,11 +3,11 @@ # Released under the MIT License. # Copyright, 2025, by Samuel Williams. -require "protocol/grpc/body/readable_body" +require "protocol/grpc/body/readable" require "protocol/http/body/buffered" require_relative "../../../../fixtures/protocol/grpc/test_message" -describe Protocol::GRPC::Body::ReadableBody do +describe Protocol::GRPC::Body::Readable do let(:message_class) {Protocol::GRPC::Fixtures::TestMessage} let(:source_body) {Protocol::HTTP::Body::Buffered.new} let(:body) {subject.new(source_body, message_class: message_class)} @@ -27,7 +27,7 @@ def write_message(message, compressed: false) it "reads single message" do message = message_class.new(value: "Hello") write_message(message) - # Don't close the body - let ReadableBody handle it + # Don't close the body - let Readable handle it read_message = body.read expect(read_message).to be == message @@ -38,7 +38,7 @@ def write_message(message, compressed: false) message2 = message_class.new(value: "World") write_message(message1) write_message(message2) - # Don't close the body - let ReadableBody handle it + # Don't close the body - let Readable handle it expect(body.read).to be == message1 expect(body.read).to be == message2 @@ -136,3 +136,5 @@ def write_message(message, compressed: false) end end end + + diff --git a/test/protocol/grpc/body/writable_body.rb b/test/protocol/grpc/body/writable.rb similarity index 98% rename from test/protocol/grpc/body/writable_body.rb rename to test/protocol/grpc/body/writable.rb index 7b85044..f5cc3e7 100644 --- a/test/protocol/grpc/body/writable_body.rb +++ b/test/protocol/grpc/body/writable.rb @@ -3,11 +3,11 @@ # Released under the MIT License. # Copyright, 2025, by Samuel Williams. -require "protocol/grpc/body/writable_body" +require "protocol/grpc/body/writable" require "protocol/http/body/writable" require_relative "../../../../fixtures/protocol/grpc/test_message" -describe Protocol::GRPC::Body::WritableBody do +describe Protocol::GRPC::Body::Writable do let(:body) {subject.new} let(:message_class) {Protocol::GRPC::Fixtures::TestMessage} @@ -296,3 +296,5 @@ def wrong_message.to_proto end end end + +