@@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
2121import { Channel } from './channel' ;
2222import { ChannelOptions } from './channel-options' ;
2323import { CompressionAlgorithms } from './compression-algorithms' ;
24- import { LogVerbosity } from './constants' ;
24+ import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH , LogVerbosity , Status } from './constants' ;
2525import { BaseFilter , Filter , FilterFactory } from './filter' ;
2626import * as logging from './logging' ;
2727import { Metadata , MetadataValue } from './metadata' ;
@@ -98,6 +98,10 @@ class IdentityHandler extends CompressionHandler {
9898}
9999
100100class DeflateHandler extends CompressionHandler {
101+ constructor ( private maxRecvMessageLength : number ) {
102+ super ( ) ;
103+ }
104+
101105 compressMessage ( message : Buffer ) {
102106 return new Promise < Buffer > ( ( resolve , reject ) => {
103107 zlib . deflate ( message , ( err , output ) => {
@@ -112,18 +116,34 @@ class DeflateHandler extends CompressionHandler {
112116
113117 decompressMessage ( message : Buffer ) {
114118 return new Promise < Buffer > ( ( resolve , reject ) => {
115- zlib . inflate ( message , ( err , output ) => {
116- if ( err ) {
117- reject ( err ) ;
118- } else {
119- resolve ( output ) ;
119+ let totalLength = 0 ;
120+ const messageParts : Buffer [ ] = [ ] ;
121+ const decompresser = zlib . createInflate ( ) ;
122+ decompresser . on ( 'data' , ( chunk : Buffer ) => {
123+ messageParts . push ( chunk ) ;
124+ totalLength += chunk . byteLength ;
125+ if ( this . maxRecvMessageLength !== - 1 && totalLength > this . maxRecvMessageLength ) {
126+ decompresser . destroy ( ) ;
127+ reject ( {
128+ code : Status . RESOURCE_EXHAUSTED ,
129+ details : `Received message that decompresses to a size larger than ${ this . maxRecvMessageLength } `
130+ } ) ;
120131 }
121132 } ) ;
133+ decompresser . on ( 'end' , ( ) => {
134+ resolve ( Buffer . concat ( messageParts ) ) ;
135+ } ) ;
136+ decompresser . write ( message ) ;
137+ decompresser . end ( ) ;
122138 } ) ;
123139 }
124140}
125141
126142class GzipHandler extends CompressionHandler {
143+ constructor ( private maxRecvMessageLength : number ) {
144+ super ( ) ;
145+ }
146+
127147 compressMessage ( message : Buffer ) {
128148 return new Promise < Buffer > ( ( resolve , reject ) => {
129149 zlib . gzip ( message , ( err , output ) => {
@@ -138,13 +158,25 @@ class GzipHandler extends CompressionHandler {
138158
139159 decompressMessage ( message : Buffer ) {
140160 return new Promise < Buffer > ( ( resolve , reject ) => {
141- zlib . unzip ( message , ( err , output ) => {
142- if ( err ) {
143- reject ( err ) ;
144- } else {
145- resolve ( output ) ;
161+ let totalLength = 0 ;
162+ const messageParts : Buffer [ ] = [ ] ;
163+ const decompresser = zlib . createGunzip ( ) ;
164+ decompresser . on ( 'data' , ( chunk : Buffer ) => {
165+ messageParts . push ( chunk ) ;
166+ totalLength += chunk . byteLength ;
167+ if ( this . maxRecvMessageLength !== - 1 && totalLength > this . maxRecvMessageLength ) {
168+ decompresser . destroy ( ) ;
169+ reject ( {
170+ code : Status . RESOURCE_EXHAUSTED ,
171+ details : `Received message that decompresses to a size larger than ${ this . maxRecvMessageLength } `
172+ } ) ;
146173 }
147174 } ) ;
175+ decompresser . on ( 'end' , ( ) => {
176+ resolve ( Buffer . concat ( messageParts ) ) ;
177+ } ) ;
178+ decompresser . write ( message ) ;
179+ decompresser . end ( ) ;
148180 } ) ;
149181 }
150182}
@@ -169,14 +201,14 @@ class UnknownHandler extends CompressionHandler {
169201 }
170202}
171203
172- function getCompressionHandler ( compressionName : string ) : CompressionHandler {
204+ function getCompressionHandler ( compressionName : string , maxReceiveMessageSize : number ) : CompressionHandler {
173205 switch ( compressionName ) {
174206 case 'identity' :
175207 return new IdentityHandler ( ) ;
176208 case 'deflate' :
177- return new DeflateHandler ( ) ;
209+ return new DeflateHandler ( maxReceiveMessageSize ) ;
178210 case 'gzip' :
179- return new GzipHandler ( ) ;
211+ return new GzipHandler ( maxReceiveMessageSize ) ;
180212 default :
181213 return new UnknownHandler ( compressionName ) ;
182214 }
@@ -186,6 +218,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
186218 private sendCompression : CompressionHandler = new IdentityHandler ( ) ;
187219 private receiveCompression : CompressionHandler = new IdentityHandler ( ) ;
188220 private currentCompressionAlgorithm : CompressionAlgorithm = 'identity' ;
221+ private maxReceiveMessageLength : number ;
189222
190223 constructor (
191224 channelOptions : ChannelOptions ,
@@ -195,6 +228,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
195228
196229 const compressionAlgorithmKey =
197230 channelOptions [ 'grpc.default_compression_algorithm' ] ;
231+ this . maxReceiveMessageLength = channelOptions [ 'grpc.max_receive_message_length' ] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH
198232 if ( compressionAlgorithmKey !== undefined ) {
199233 if ( isCompressionAlgorithmKey ( compressionAlgorithmKey ) ) {
200234 const clientSelectedEncoding = CompressionAlgorithms [
@@ -215,7 +249,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
215249 ) {
216250 this . currentCompressionAlgorithm = clientSelectedEncoding ;
217251 this . sendCompression = getCompressionHandler (
218- this . currentCompressionAlgorithm
252+ this . currentCompressionAlgorithm ,
253+ - 1
219254 ) ;
220255 }
221256 } else {
@@ -247,7 +282,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
247282 if ( receiveEncoding . length > 0 ) {
248283 const encoding : MetadataValue = receiveEncoding [ 0 ] ;
249284 if ( typeof encoding === 'string' ) {
250- this . receiveCompression = getCompressionHandler ( encoding ) ;
285+ this . receiveCompression = getCompressionHandler ( encoding , this . maxReceiveMessageLength ) ;
251286 }
252287 }
253288 metadata . remove ( 'grpc-encoding' ) ;
0 commit comments