@@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
2121import { Channel } from './channel' ;
2222import { ChannelOptions } from './channel-options' ;
2323import { CompressionAlgorithms } from './compression-algorithms' ;
24- import { LogVerbosity } from './constants' ;
24+ import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH , LogVerbosity , Status } from './constants' ;
2525import { BaseFilter , Filter , FilterFactory } from './filter' ;
2626import * as logging from './logging' ;
2727import { Metadata , MetadataValue } from './metadata' ;
@@ -94,6 +94,10 @@ class IdentityHandler extends CompressionHandler {
9494}
9595
9696class DeflateHandler extends CompressionHandler {
97+ constructor ( private maxRecvMessageLength : number ) {
98+ super ( ) ;
99+ }
100+
97101 compressMessage ( message : Buffer ) {
98102 return new Promise < Buffer > ( ( resolve , reject ) => {
99103 zlib . deflate ( message , ( err , output ) => {
@@ -108,18 +112,34 @@ class DeflateHandler extends CompressionHandler {
108112
109113 decompressMessage ( message : Buffer ) {
110114 return new Promise < Buffer > ( ( resolve , reject ) => {
111- zlib . inflate ( message , ( err , output ) => {
112- if ( err ) {
113- reject ( err ) ;
114- } else {
115- resolve ( output ) ;
115+ let totalLength = 0 ;
116+ const messageParts : Buffer [ ] = [ ] ;
117+ const decompresser = zlib . createInflate ( ) ;
118+ decompresser . on ( 'data' , ( chunk : Buffer ) => {
119+ messageParts . push ( chunk ) ;
120+ totalLength += chunk . byteLength ;
121+ if ( this . maxRecvMessageLength !== - 1 && totalLength > this . maxRecvMessageLength ) {
122+ decompresser . destroy ( ) ;
123+ reject ( {
124+ code : Status . RESOURCE_EXHAUSTED ,
125+ details : `Received message that decompresses to a size larger than ${ this . maxRecvMessageLength } `
126+ } ) ;
116127 }
117128 } ) ;
129+ decompresser . on ( 'end' , ( ) => {
130+ resolve ( Buffer . concat ( messageParts ) ) ;
131+ } ) ;
132+ decompresser . write ( message ) ;
133+ decompresser . end ( ) ;
118134 } ) ;
119135 }
120136}
121137
122138class GzipHandler extends CompressionHandler {
139+ constructor ( private maxRecvMessageLength : number ) {
140+ super ( ) ;
141+ }
142+
123143 compressMessage ( message : Buffer ) {
124144 return new Promise < Buffer > ( ( resolve , reject ) => {
125145 zlib . gzip ( message , ( err , output ) => {
@@ -134,13 +154,25 @@ class GzipHandler extends CompressionHandler {
134154
135155 decompressMessage ( message : Buffer ) {
136156 return new Promise < Buffer > ( ( resolve , reject ) => {
137- zlib . unzip ( message , ( err , output ) => {
138- if ( err ) {
139- reject ( err ) ;
140- } else {
141- resolve ( output ) ;
157+ let totalLength = 0 ;
158+ const messageParts : Buffer [ ] = [ ] ;
159+ const decompresser = zlib . createGunzip ( ) ;
160+ decompresser . on ( 'data' , ( chunk : Buffer ) => {
161+ messageParts . push ( chunk ) ;
162+ totalLength += chunk . byteLength ;
163+ if ( this . maxRecvMessageLength !== - 1 && totalLength > this . maxRecvMessageLength ) {
164+ decompresser . destroy ( ) ;
165+ reject ( {
166+ code : Status . RESOURCE_EXHAUSTED ,
167+ details : `Received message that decompresses to a size larger than ${ this . maxRecvMessageLength } `
168+ } ) ;
142169 }
143170 } ) ;
171+ decompresser . on ( 'end' , ( ) => {
172+ resolve ( Buffer . concat ( messageParts ) ) ;
173+ } ) ;
174+ decompresser . write ( message ) ;
175+ decompresser . end ( ) ;
144176 } ) ;
145177 }
146178}
@@ -165,14 +197,14 @@ class UnknownHandler extends CompressionHandler {
165197 }
166198}
167199
168- function getCompressionHandler ( compressionName : string ) : CompressionHandler {
200+ function getCompressionHandler ( compressionName : string , maxReceiveMessageSize : number ) : CompressionHandler {
169201 switch ( compressionName ) {
170202 case 'identity' :
171203 return new IdentityHandler ( ) ;
172204 case 'deflate' :
173- return new DeflateHandler ( ) ;
205+ return new DeflateHandler ( maxReceiveMessageSize ) ;
174206 case 'gzip' :
175- return new GzipHandler ( ) ;
207+ return new GzipHandler ( maxReceiveMessageSize ) ;
176208 default :
177209 return new UnknownHandler ( compressionName ) ;
178210 }
@@ -182,11 +214,14 @@ export class CompressionFilter extends BaseFilter implements Filter {
182214 private sendCompression : CompressionHandler = new IdentityHandler ( ) ;
183215 private receiveCompression : CompressionHandler = new IdentityHandler ( ) ;
184216 private currentCompressionAlgorithm : CompressionAlgorithm = 'identity' ;
217+ private maxReceiveMessageLength : number ;
185218
186219 constructor ( channelOptions : ChannelOptions , private sharedFilterConfig : SharedCompressionFilterConfig ) {
187220 super ( ) ;
188221
189- const compressionAlgorithmKey = channelOptions [ 'grpc.default_compression_algorithm' ] ;
222+ const compressionAlgorithmKey =
223+ channelOptions [ 'grpc.default_compression_algorithm' ] ;
224+ this . maxReceiveMessageLength = channelOptions [ 'grpc.max_receive_message_length' ] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH
190225 if ( compressionAlgorithmKey !== undefined ) {
191226 if ( isCompressionAlgorithmKey ( compressionAlgorithmKey ) ) {
192227 const clientSelectedEncoding = CompressionAlgorithms [ compressionAlgorithmKey ] as CompressionAlgorithm ;
@@ -200,7 +235,10 @@ export class CompressionFilter extends BaseFilter implements Filter {
200235 */
201236 if ( ! serverSupportedEncodings || serverSupportedEncodings . includes ( clientSelectedEncoding ) ) {
202237 this . currentCompressionAlgorithm = clientSelectedEncoding ;
203- this . sendCompression = getCompressionHandler ( this . currentCompressionAlgorithm ) ;
238+ this . sendCompression = getCompressionHandler (
239+ this . currentCompressionAlgorithm ,
240+ - 1
241+ ) ;
204242 }
205243 } else {
206244 logging . log ( LogVerbosity . ERROR , `Invalid value provided for grpc.default_compression_algorithm option: ${ compressionAlgorithmKey } ` ) ;
@@ -228,7 +266,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
228266 if ( receiveEncoding . length > 0 ) {
229267 const encoding : MetadataValue = receiveEncoding [ 0 ] ;
230268 if ( typeof encoding === 'string' ) {
231- this . receiveCompression = getCompressionHandler ( encoding ) ;
269+ this . receiveCompression = getCompressionHandler ( encoding , this . maxReceiveMessageLength ) ;
232270 }
233271 }
234272 metadata . remove ( 'grpc-encoding' ) ;
0 commit comments