VYPR
Medium severity5.3NVD Advisory· Published Jun 10, 2024· Updated Apr 15, 2026

CVE-2024-37168

CVE-2024-37168

Description

@grpc/grps-js implements the core functionality of gRPC purely in JavaScript, without a C++ addon. Prior to versions 1.10.9, 1.9.15, and 1.8.22, there are two separate code paths in which memory can be allocated per message in excess of the grpc.max_receive_message_length channel option: If an incoming message has a size on the wire greater than the configured limit, the entire message is buffered before it is discarded; and/or if an incoming message has a size within the limit on the wire but decompresses to a size greater than the limit, the entire message is decompressed into memory, and on the server is not discarded. This has been patched in versions 1.10.9, 1.9.15, and 1.8.22.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
@grpc/grpc-jsnpm
>= 1.10.0, < 1.10.91.10.9
@grpc/grpc-jsnpm
>= 1.9.0, < 1.9.151.9.15
@grpc/grpc-jsnpm
< 1.8.221.8.22

Patches

3
674f4e351a61

Merge pull request from GHSA-7v5v-9h63-cj86

https://github.com/grpc/grpc-nodeMichael LumishJun 10, 2024via ghsa
10 files changed · +174 149
  • packages/grpc-js/package.json+1 1 modified
    @@ -1,6 +1,6 @@
     {
       "name": "@grpc/grpc-js",
    -  "version": "1.10.8",
    +  "version": "1.10.9",
       "description": "gRPC Library for Node - pure JS implementation",
       "homepage": "https://grpc.io/",
       "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
    
  • packages/grpc-js/src/compression-filter.ts+51 16 modified
    @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
     import { Channel } from './channel';
     import { ChannelOptions } from './channel-options';
     import { CompressionAlgorithms } from './compression-algorithms';
    -import { LogVerbosity } from './constants';
    +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, LogVerbosity, Status } from './constants';
     import { BaseFilter, Filter, FilterFactory } from './filter';
     import * as logging from './logging';
     import { Metadata, MetadataValue } from './metadata';
    @@ -98,6 +98,10 @@ class IdentityHandler extends CompressionHandler {
     }
     
     class DeflateHandler extends CompressionHandler {
    +  constructor(private maxRecvMessageLength: number) {
    +    super();
    +  }
    +
       compressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
           zlib.deflate(message, (err, output) => {
    @@ -112,18 +116,34 @@ class DeflateHandler extends CompressionHandler {
     
       decompressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
    -      zlib.inflate(message, (err, output) => {
    -        if (err) {
    -          reject(err);
    -        } else {
    -          resolve(output);
    +      let totalLength = 0;
    +      const messageParts: Buffer[] = [];
    +      const decompresser = zlib.createInflate();
    +      decompresser.on('data', (chunk: Buffer) => {
    +        messageParts.push(chunk);
    +        totalLength += chunk.byteLength;
    +        if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
    +          decompresser.destroy();
    +          reject({
    +            code: Status.RESOURCE_EXHAUSTED,
    +            details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
    +          });
             }
           });
    +      decompresser.on('end', () => {
    +        resolve(Buffer.concat(messageParts));
    +      });
    +      decompresser.write(message);
    +      decompresser.end();
         });
       }
     }
     
     class GzipHandler extends CompressionHandler {
    +  constructor(private maxRecvMessageLength: number) {
    +    super();
    +  }
    +
       compressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
           zlib.gzip(message, (err, output) => {
    @@ -138,13 +158,25 @@ class GzipHandler extends CompressionHandler {
     
       decompressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
    -      zlib.unzip(message, (err, output) => {
    -        if (err) {
    -          reject(err);
    -        } else {
    -          resolve(output);
    +      let totalLength = 0;
    +      const messageParts: Buffer[] = [];
    +      const decompresser = zlib.createGunzip();
    +      decompresser.on('data', (chunk: Buffer) => {
    +        messageParts.push(chunk);
    +        totalLength += chunk.byteLength;
    +        if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
    +          decompresser.destroy();
    +          reject({
    +            code: Status.RESOURCE_EXHAUSTED,
    +            details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
    +          });
             }
           });
    +      decompresser.on('end', () => {
    +        resolve(Buffer.concat(messageParts));
    +      });
    +      decompresser.write(message);
    +      decompresser.end();
         });
       }
     }
    @@ -169,14 +201,14 @@ class UnknownHandler extends CompressionHandler {
       }
     }
     
    -function getCompressionHandler(compressionName: string): CompressionHandler {
    +function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler {
       switch (compressionName) {
         case 'identity':
           return new IdentityHandler();
         case 'deflate':
    -      return new DeflateHandler();
    +      return new DeflateHandler(maxReceiveMessageSize);
         case 'gzip':
    -      return new GzipHandler();
    +      return new GzipHandler(maxReceiveMessageSize);
         default:
           return new UnknownHandler(compressionName);
       }
    @@ -186,6 +218,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
       private sendCompression: CompressionHandler = new IdentityHandler();
       private receiveCompression: CompressionHandler = new IdentityHandler();
       private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
    +  private maxReceiveMessageLength: number;
     
       constructor(
         channelOptions: ChannelOptions,
    @@ -195,6 +228,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
     
         const compressionAlgorithmKey =
           channelOptions['grpc.default_compression_algorithm'];
    +    this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH
         if (compressionAlgorithmKey !== undefined) {
           if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
             const clientSelectedEncoding = CompressionAlgorithms[
    @@ -215,7 +249,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
             ) {
               this.currentCompressionAlgorithm = clientSelectedEncoding;
               this.sendCompression = getCompressionHandler(
    -            this.currentCompressionAlgorithm
    +            this.currentCompressionAlgorithm,
    +            -1
               );
             }
           } else {
    @@ -247,7 +282,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
         if (receiveEncoding.length > 0) {
           const encoding: MetadataValue = receiveEncoding[0];
           if (typeof encoding === 'string') {
    -        this.receiveCompression = getCompressionHandler(encoding);
    +        this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength);
           }
         }
         metadata.remove('grpc-encoding');
    
  • packages/grpc-js/src/internal-channel.ts+0 2 modified
    @@ -33,7 +33,6 @@ import {
     } from './resolver';
     import { trace } from './logging';
     import { SubchannelAddress } from './subchannel-address';
    -import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
     import { mapProxyName } from './http_proxy';
     import { GrpcUri, parseUri, uriToString } from './uri-parser';
     import { ServerSurfaceCall } from './server-call';
    @@ -402,7 +401,6 @@ export class InternalChannel {
           }
         );
         this.filterStackFactory = new FilterStackFactory([
    -      new MaxMessageSizeFilterFactory(this.options),
           new CompressionFilterFactory(this, this.options),
         ]);
         this.trace(
    
  • packages/grpc-js/src/max-message-size-filter.ts+0 88 removed
    @@ -1,88 +0,0 @@
    -/*
    - * Copyright 2020 gRPC authors.
    - *
    - * Licensed under the Apache License, Version 2.0 (the "License");
    - * you may not use this file except in compliance with the License.
    - * You may obtain a copy of the License at
    - *
    - *     http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - *
    - */
    -
    -import { BaseFilter, Filter, FilterFactory } from './filter';
    -import { WriteObject } from './call-interface';
    -import {
    -  Status,
    -  DEFAULT_MAX_SEND_MESSAGE_LENGTH,
    -  DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
    -} from './constants';
    -import { ChannelOptions } from './channel-options';
    -import { Metadata } from './metadata';
    -
    -export class MaxMessageSizeFilter extends BaseFilter implements Filter {
    -  private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
    -  private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
    -  constructor(options: ChannelOptions) {
    -    super();
    -    if ('grpc.max_send_message_length' in options) {
    -      this.maxSendMessageSize = options['grpc.max_send_message_length']!;
    -    }
    -    if ('grpc.max_receive_message_length' in options) {
    -      this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
    -    }
    -  }
    -
    -  async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
    -    /* A configured size of -1 means that there is no limit, so skip the check
    -     * entirely */
    -    if (this.maxSendMessageSize === -1) {
    -      return message;
    -    } else {
    -      const concreteMessage = await message;
    -      if (concreteMessage.message.length > this.maxSendMessageSize) {
    -        throw {
    -          code: Status.RESOURCE_EXHAUSTED,
    -          details: `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`,
    -          metadata: new Metadata(),
    -        };
    -      } else {
    -        return concreteMessage;
    -      }
    -    }
    -  }
    -
    -  async receiveMessage(message: Promise<Buffer>): Promise<Buffer> {
    -    /* A configured size of -1 means that there is no limit, so skip the check
    -     * entirely */
    -    if (this.maxReceiveMessageSize === -1) {
    -      return message;
    -    } else {
    -      const concreteMessage = await message;
    -      if (concreteMessage.length > this.maxReceiveMessageSize) {
    -        throw {
    -          code: Status.RESOURCE_EXHAUSTED,
    -          details: `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`,
    -          metadata: new Metadata(),
    -        };
    -      } else {
    -        return concreteMessage;
    -      }
    -    }
    -  }
    -}
    -
    -export class MaxMessageSizeFilterFactory
    -  implements FilterFactory<MaxMessageSizeFilter>
    -{
    -  constructor(private readonly options: ChannelOptions) {}
    -
    -  createFilter(): MaxMessageSizeFilter {
    -    return new MaxMessageSizeFilter(this.options);
    -  }
    -}
    
  • packages/grpc-js/src/server-interceptors.ts+54 34 modified
    @@ -30,14 +30,10 @@ import {
     import * as http2 from 'http2';
     import { getErrorMessage } from './error';
     import * as zlib from 'zlib';
    -import { promisify } from 'util';
     import { StreamDecoder } from './stream-decoder';
     import { CallEventTracker } from './transport';
     import * as logging from './logging';
     
    -const unzip = promisify(zlib.unzip);
    -const inflate = promisify(zlib.inflate);
    -
     const TRACER_NAME = 'server_call';
     
     function trace(text: string) {
    @@ -496,7 +492,7 @@ export class BaseServerInterceptingCall
       private wantTrailers = false;
       private cancelNotified = false;
       private incomingEncoding = 'identity';
    -  private decoder = new StreamDecoder();
    +  private decoder: StreamDecoder;
       private readQueue: ReadQueueEntry[] = [];
       private isReadPending = false;
       private receivedHalfClose = false;
    @@ -554,6 +550,8 @@ export class BaseServerInterceptingCall
           this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
         }
     
    +    this.decoder = new StreamDecoder(this.maxReceiveMessageSize);
    +
         const metadata = Metadata.fromHttp2Headers(headers);
     
         if (logging.isTracerEnabled(TRACER_NAME)) {
    @@ -674,18 +672,41 @@ export class BaseServerInterceptingCall
         message: Buffer,
         encoding: string
       ): Buffer | Promise<Buffer> {
    -    switch (encoding) {
    -      case 'deflate':
    -        return inflate(message.subarray(5));
    -      case 'gzip':
    -        return unzip(message.subarray(5));
    -      case 'identity':
    -        return message.subarray(5);
    -      default:
    -        return Promise.reject({
    -          code: Status.UNIMPLEMENTED,
    -          details: `Received message compressed with unsupported encoding "${encoding}"`,
    +    const messageContents = message.subarray(5);
    +    if (encoding === 'identity') {
    +      return messageContents;
    +    } else if (encoding === 'deflate' || encoding === 'gzip') {
    +      let decompresser: zlib.Gunzip | zlib.Deflate;
    +      if (encoding === 'deflate') {
    +        decompresser = zlib.createInflate();
    +      } else {
    +        decompresser = zlib.createGunzip();
    +      }
    +      return new Promise((resolve, reject) => {
    +        let totalLength = 0
    +        const messageParts: Buffer[] = [];
    +        decompresser.on('data', (chunk: Buffer) => {
    +          messageParts.push(chunk);
    +          totalLength += chunk.byteLength;
    +          if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) {
    +            decompresser.destroy();
    +            reject({
    +              code: Status.RESOURCE_EXHAUSTED,
    +              details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}`
    +            });
    +          }
    +        });
    +        decompresser.on('end', () => {
    +          resolve(Buffer.concat(messageParts));
             });
    +        decompresser.write(messageContents);
    +        decompresser.end();
    +      });
    +    } else {
    +      return Promise.reject({
    +        code: Status.UNIMPLEMENTED,
    +        details: `Received message compressed with unsupported encoding "${encoding}"`,
    +      });
         }
       }
     
    @@ -698,10 +719,16 @@ export class BaseServerInterceptingCall
         const compressedMessageEncoding = compressed
           ? this.incomingEncoding
           : 'identity';
    -    const decompressedMessage = await this.decompressMessage(
    -      queueEntry.compressedMessage!,
    -      compressedMessageEncoding
    -    );
    +    let decompressedMessage: Buffer;
    +    try {
    +      decompressedMessage = await this.decompressMessage(
    +        queueEntry.compressedMessage!,
    +        compressedMessageEncoding
    +      );
    +    } catch (err) {
    +      this.sendStatus(err as PartialStatusObject);
    +      return;
    +    }
         try {
           queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage);
         } catch (err) {
    @@ -743,23 +770,16 @@ export class BaseServerInterceptingCall
             ' received data frame of size ' +
             data.length
         );
    -    const rawMessages = this.decoder.write(data);
    +    let rawMessages: Buffer[];
    +    try {
    +      rawMessages = this.decoder.write(data);
    +    } catch (e) {
    +      this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, details: (e as Error).message });
    +      return;
    +    }
     
         for (const messageBytes of rawMessages) {
           this.stream.pause();
    -      if (
    -        this.maxReceiveMessageSize !== -1 &&
    -        messageBytes.length - 5 > this.maxReceiveMessageSize
    -      ) {
    -        this.sendStatus({
    -          code: Status.RESOURCE_EXHAUSTED,
    -          details: `Received message larger than max (${
    -            messageBytes.length - 5
    -          } vs. ${this.maxReceiveMessageSize})`,
    -          metadata: null,
    -        });
    -        return;
    -      }
           const queueEntry: ReadQueueEntry = {
             type: 'COMPRESSED',
             compressedMessage: messageBytes,
    
  • packages/grpc-js/src/stream-decoder.ts+5 0 modified
    @@ -30,6 +30,8 @@ export class StreamDecoder {
       private readPartialMessage: Buffer[] = [];
       private readMessageRemaining = 0;
     
    +  constructor(private maxReadMessageLength: number) {}
    +
       write(data: Buffer): Buffer[] {
         let readHead = 0;
         let toRead: number;
    @@ -60,6 +62,9 @@ export class StreamDecoder {
               // readSizeRemaining >=0 here
               if (this.readSizeRemaining === 0) {
                 this.readMessageSize = this.readPartialSize.readUInt32BE(0);
    +            if (this.maxReadMessageLength !== -1 && this.readMessageSize > this.maxReadMessageLength) {
    +              throw new Error(`Received message larger than max (${this.readMessageSize} vs ${this.maxReadMessageLength})`);
    +            }
                 this.readMessageRemaining = this.readMessageSize;
                 if (this.readMessageRemaining > 0) {
                   this.readState = ReadState.READING_MESSAGE;
    
  • packages/grpc-js/src/subchannel-call.ts+11 3 modified
    @@ -18,7 +18,7 @@
     import * as http2 from 'http2';
     import * as os from 'os';
     
    -import { Status } from './constants';
    +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, Status } from './constants';
     import { Metadata } from './metadata';
     import { StreamDecoder } from './stream-decoder';
     import * as logging from './logging';
    @@ -116,7 +116,7 @@ function mapHttpStatusCode(code: number): StatusObject {
     }
     
     export class Http2SubchannelCall implements SubchannelCall {
    -  private decoder = new StreamDecoder();
    +  private decoder: StreamDecoder;
     
       private isReadFilterPending = false;
       private isPushPending = false;
    @@ -147,6 +147,8 @@ export class Http2SubchannelCall implements SubchannelCall {
         private readonly transport: Transport,
         private readonly callId: number
       ) {
    +    const maxReceiveMessageLength = transport.getOptions()['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
    +    this.decoder = new StreamDecoder(maxReceiveMessageLength);
         http2Stream.on('response', (headers, flags) => {
           let headersString = '';
           for (const header of Object.keys(headers)) {
    @@ -182,7 +184,13 @@ export class Http2SubchannelCall implements SubchannelCall {
             return;
           }
           this.trace('receive HTTP/2 data frame of length ' + data.length);
    -      const messages = this.decoder.write(data);
    +      let messages: Buffer[];
    +      try {
    +        messages = this.decoder.write(data);
    +      } catch (e) {
    +        this.cancelWithStatus(Status.RESOURCE_EXHAUSTED, (e as Error).message);
    +        return;
    +      }
     
           for (const message of messages) {
             this.trace('parsed message of length ' + message.length);
    
  • packages/grpc-js/src/transport.ts+6 1 modified
    @@ -84,6 +84,7 @@ export interface TransportDisconnectListener {
     export interface Transport {
       getChannelzRef(): SocketRef;
       getPeerName(): string;
    +  getOptions(): ChannelOptions;
       createCall(
         metadata: Metadata,
         host: string,
    @@ -147,7 +148,7 @@ class Http2Transport implements Transport {
       constructor(
         private session: http2.ClientHttp2Session,
         subchannelAddress: SubchannelAddress,
    -    options: ChannelOptions,
    +    private options: ChannelOptions,
         /**
          * Name of the remote server, if it is not the same as the subchannel
          * address, i.e. if connecting through an HTTP CONNECT proxy.
    @@ -617,6 +618,10 @@ class Http2Transport implements Transport {
         return this.subchannelAddressString;
       }
     
    +  getOptions() {
    +    return this.options;
    +  }
    +
       shutdown() {
         this.session.close();
         unregisterChannelzRef(this.channelzRef);
    
  • packages/grpc-js/test/fixtures/test_service.proto+1 0 modified
    @@ -21,6 +21,7 @@ message Request {
       bool error = 1;
       string message = 2;
       int32 errorAfter = 3;
    +  int32 responseLength = 4;
     }
     
     message Response {
    
  • packages/grpc-js/test/test-server-errors.ts+45 4 modified
    @@ -33,6 +33,7 @@ import {
     } from '../src/server-call';
     
     import { loadProtoFile } from './common';
    +import { CompressionAlgorithms } from '../src/compression-algorithms';
     
     const protoFile = join(__dirname, 'fixtures', 'test_service.proto');
     const testServiceDef = loadProtoFile(protoFile);
    @@ -310,7 +311,7 @@ describe('Other conditions', () => {
                 trailerMetadata
               );
             } else {
    -          cb(null, { count: 1 }, trailerMetadata);
    +          cb(null, { count: 1, message: 'a'.repeat(req.responseLength) }, trailerMetadata);
             }
           },
     
    @@ -320,20 +321,22 @@ describe('Other conditions', () => {
           ) {
             let count = 0;
             let errored = false;
    +        let responseLength = 0;
     
             stream.on('data', (data: any) => {
               if (data.error) {
                 const message = data.message || 'Requested error';
                 errored = true;
                 cb(new Error(message) as ServiceError, null, trailerMetadata);
               } else {
    +            responseLength += data.responseLength;
                 count++;
               }
             });
     
             stream.on('end', () => {
               if (!errored) {
    -            cb(null, { count }, trailerMetadata);
    +            cb(null, { count, message: 'a'.repeat(responseLength) }, trailerMetadata);
               }
             });
           },
    @@ -349,7 +352,7 @@ describe('Other conditions', () => {
               });
             } else {
               for (let i = 1; i <= 5; i++) {
    -            stream.write({ count: i });
    +            stream.write({ count: i, message: 'a'.repeat(req.responseLength) });
                 if (req.errorAfter && req.errorAfter === i) {
                   stream.emit('error', {
                     code: grpc.status.UNKNOWN,
    @@ -376,7 +379,7 @@ describe('Other conditions', () => {
                 err.metadata.add('count', '' + count);
                 stream.emit('error', err);
               } else {
    -            stream.write({ count });
    +            stream.write({ count, message: 'a'.repeat(data.responseLength) });
                 count++;
               }
             });
    @@ -740,6 +743,44 @@ describe('Other conditions', () => {
           });
         });
       });
    +
    +  describe('Max message size', () => {
    +    const largeMessage = 'a'.repeat(10_000_000);
    +    it('Should be enforced on the server', done => {
    +      client.unary({ message: largeMessage }, (error?: ServiceError) => {
    +        assert(error);
    +        assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +        done();
    +      });
    +    });
    +    it('Should be enforced on the client', done => {
    +      client.unary({ responseLength: 10_000_000 }, (error?: ServiceError) => {
    +        assert(error);
    +        assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +        done();
    +      });
    +    });
    +    describe('Compressed messages', () => {
    +      it('Should be enforced with gzip', done => {
    +        const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.gzip});
    +        compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => {
    +          assert(error);
    +          assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +          assert.match(error.details, /Received message that decompresses to a size larger/);
    +          done();
    +        });
    +      });
    +      it('Should be enforced with deflate', done => {
    +        const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.deflate});
    +        compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => {
    +          assert(error);
    +          assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +          assert.match(error.details, /Received message that decompresses to a size larger/);
    +          done();
    +        });
    +      });
    +    });
    +  });
     });
     
     function identity(arg: any): any {
    
a8a020339c7e

Merge pull request from GHSA-7v5v-9h63-cj86

https://github.com/grpc/grpc-nodeMichael LumishJun 10, 2024via ghsa
10 files changed · +187 149
  • packages/grpc-js/package.json+1 1 modified
    @@ -1,6 +1,6 @@
     {
       "name": "@grpc/grpc-js",
    -  "version": "1.8.21",
    +  "version": "1.8.22",
       "description": "gRPC Library for Node - pure JS implementation",
       "homepage": "https://grpc.io/",
       "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
    
  • packages/grpc-js/src/compression-filter.ts+55 17 modified
    @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
     import { Channel } from './channel';
     import { ChannelOptions } from './channel-options';
     import { CompressionAlgorithms } from './compression-algorithms';
    -import { LogVerbosity } from './constants';
    +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, LogVerbosity, Status } from './constants';
     import { BaseFilter, Filter, FilterFactory } from './filter';
     import * as logging from './logging';
     import { Metadata, MetadataValue } from './metadata';
    @@ -94,6 +94,10 @@ class IdentityHandler extends CompressionHandler {
     }
     
     class DeflateHandler extends CompressionHandler {
    +  constructor(private maxRecvMessageLength: number) {
    +    super();
    +  }
    +
       compressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
           zlib.deflate(message, (err, output) => {
    @@ -108,18 +112,34 @@ class DeflateHandler extends CompressionHandler {
     
       decompressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
    -      zlib.inflate(message, (err, output) => {
    -        if (err) {
    -          reject(err);
    -        } else {
    -          resolve(output);
    +      let totalLength = 0;
    +      const messageParts: Buffer[] = [];
    +      const decompresser = zlib.createInflate();
    +      decompresser.on('data', (chunk: Buffer) => {
    +        messageParts.push(chunk);
    +        totalLength += chunk.byteLength;
    +        if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
    +          decompresser.destroy();
    +          reject({
    +            code: Status.RESOURCE_EXHAUSTED,
    +            details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
    +          });
             }
           });
    +      decompresser.on('end', () => {
    +        resolve(Buffer.concat(messageParts));
    +      });
    +      decompresser.write(message);
    +      decompresser.end();
         });
       }
     }
     
     class GzipHandler extends CompressionHandler {
    +  constructor(private maxRecvMessageLength: number) {
    +    super();
    +  }
    +
       compressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
           zlib.gzip(message, (err, output) => {
    @@ -134,13 +154,25 @@ class GzipHandler extends CompressionHandler {
     
       decompressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
    -      zlib.unzip(message, (err, output) => {
    -        if (err) {
    -          reject(err);
    -        } else {
    -          resolve(output);
    +      let totalLength = 0;
    +      const messageParts: Buffer[] = [];
    +      const decompresser = zlib.createGunzip();
    +      decompresser.on('data', (chunk: Buffer) => {
    +        messageParts.push(chunk);
    +        totalLength += chunk.byteLength;
    +        if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
    +          decompresser.destroy();
    +          reject({
    +            code: Status.RESOURCE_EXHAUSTED,
    +            details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
    +          });
             }
           });
    +      decompresser.on('end', () => {
    +        resolve(Buffer.concat(messageParts));
    +      });
    +      decompresser.write(message);
    +      decompresser.end();
         });
       }
     }
    @@ -165,14 +197,14 @@ class UnknownHandler extends CompressionHandler {
       }
     }
     
    -function getCompressionHandler(compressionName: string): CompressionHandler {
    +function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler {
       switch (compressionName) {
         case 'identity':
           return new IdentityHandler();
         case 'deflate':
    -      return new DeflateHandler();
    +      return new DeflateHandler(maxReceiveMessageSize);
         case 'gzip':
    -      return new GzipHandler();
    +      return new GzipHandler(maxReceiveMessageSize);
         default:
           return new UnknownHandler(compressionName);
       }
    @@ -182,11 +214,14 @@ export class CompressionFilter extends BaseFilter implements Filter {
       private sendCompression: CompressionHandler = new IdentityHandler();
       private receiveCompression: CompressionHandler = new IdentityHandler();
       private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
    +  private maxReceiveMessageLength: number;
     
       constructor(channelOptions: ChannelOptions, private sharedFilterConfig: SharedCompressionFilterConfig) {
         super();
     
    -    const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm'];
    +    const compressionAlgorithmKey =
    +      channelOptions['grpc.default_compression_algorithm'];
    +    this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH
         if (compressionAlgorithmKey !== undefined) {
           if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
             const clientSelectedEncoding = CompressionAlgorithms[compressionAlgorithmKey] as CompressionAlgorithm;
    @@ -200,7 +235,10 @@ export class CompressionFilter extends BaseFilter implements Filter {
              */
             if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) {
               this.currentCompressionAlgorithm = clientSelectedEncoding;
    -          this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm);
    +          this.sendCompression = getCompressionHandler(
    +            this.currentCompressionAlgorithm,
    +            -1
    +          );
             }
           } else {
             logging.log(LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`);
    @@ -228,7 +266,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
         if (receiveEncoding.length > 0) {
           const encoding: MetadataValue = receiveEncoding[0];
           if (typeof encoding === 'string') {
    -        this.receiveCompression = getCompressionHandler(encoding);
    +        this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength);
           }
         }
         metadata.remove('grpc-encoding');
    
  • packages/grpc-js/src/internal-channel.ts+0 2 modified
    @@ -33,7 +33,6 @@ import {
     } from './resolver';
     import { trace } from './logging';
     import { SubchannelAddress } from './subchannel-address';
    -import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
     import { mapProxyName } from './http_proxy';
     import { GrpcUri, parseUri, uriToString } from './uri-parser';
     import { ServerSurfaceCall } from './server-call';
    @@ -310,7 +309,6 @@ export class InternalChannel {
           }
         );
         this.filterStackFactory = new FilterStackFactory([
    -      new MaxMessageSizeFilterFactory(this.options),
           new CompressionFilterFactory(this, this.options),
         ]);
         this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
    
  • packages/grpc-js/src/max-message-size-filter.ts+0 89 removed
    @@ -1,89 +0,0 @@
    -/*
    - * Copyright 2020 gRPC authors.
    - *
    - * Licensed under the Apache License, Version 2.0 (the "License");
    - * you may not use this file except in compliance with the License.
    - * You may obtain a copy of the License at
    - *
    - *     http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - *
    - */
    -
    -import { BaseFilter, Filter, FilterFactory } from './filter';
    -import { WriteObject } from './call-interface';
    -import {
    -  Status,
    -  DEFAULT_MAX_SEND_MESSAGE_LENGTH,
    -  DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
    -} from './constants';
    -import { ChannelOptions } from './channel-options';
    -import { Metadata } from './metadata';
    -
    -export class MaxMessageSizeFilter extends BaseFilter implements Filter {
    -  private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
    -  private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
    -  constructor(
    -    options: ChannelOptions
    -  ) {
    -    super();
    -    if ('grpc.max_send_message_length' in options) {
    -      this.maxSendMessageSize = options['grpc.max_send_message_length']!;
    -    }
    -    if ('grpc.max_receive_message_length' in options) {
    -      this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
    -    }
    -  }
    -
    -  async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
    -    /* A configured size of -1 means that there is no limit, so skip the check
    -     * entirely */
    -    if (this.maxSendMessageSize === -1) {
    -      return message;
    -    } else {
    -      const concreteMessage = await message;
    -      if (concreteMessage.message.length > this.maxSendMessageSize) {
    -        throw {
    -          code: Status.RESOURCE_EXHAUSTED,
    -          details: `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`,
    -          metadata: new Metadata()
    -        };
    -      } else {
    -        return concreteMessage;
    -      }
    -    }
    -  }
    -
    -  async receiveMessage(message: Promise<Buffer>): Promise<Buffer> {
    -    /* A configured size of -1 means that there is no limit, so skip the check
    -     * entirely */
    -    if (this.maxReceiveMessageSize === -1) {
    -      return message;
    -    } else {
    -      const concreteMessage = await message;
    -      if (concreteMessage.length > this.maxReceiveMessageSize) {
    -        throw {
    -          code: Status.RESOURCE_EXHAUSTED,
    -          details: `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`,
    -          metadata: new Metadata()
    -        };
    -      } else {
    -        return concreteMessage;
    -      }
    -    }
    -  }
    -}
    -
    -export class MaxMessageSizeFilterFactory
    -  implements FilterFactory<MaxMessageSizeFilter> {
    -  constructor(private readonly options: ChannelOptions) {}
    -
    -  createFilter(): MaxMessageSizeFilter {
    -    return new MaxMessageSizeFilter(this.options);
    -  }
    -}
    
  • packages/grpc-js/src/server-call.ts+56 31 modified
    @@ -19,7 +19,6 @@ import { EventEmitter } from 'events';
     import * as http2 from 'http2';
     import { Duplex, Readable, Writable } from 'stream';
     import * as zlib from 'zlib';
    -import { promisify } from 'util';
     
     import {
       Status,
    @@ -38,8 +37,6 @@ import { Deadline } from './deadline';
     import { getErrorCode, getErrorMessage } from './error';
     
     const TRACER_NAME = 'server_call';
    -const unzip = promisify(zlib.unzip);
    -const inflate = promisify(zlib.inflate);
     
     function trace(text: string): void {
       logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
    @@ -478,19 +475,42 @@ export class Http2ServerCallStream<
       private getDecompressedMessage(
         message: Buffer,
         encoding: string
    -  ): Buffer | Promise<Buffer> {
    -    if (encoding === 'deflate') {
    -      return inflate(message.subarray(5));
    -    } else if (encoding === 'gzip') {
    -      return unzip(message.subarray(5));
    -    } else if (encoding === 'identity') {
    -      return message.subarray(5);
    +  ): Buffer | Promise<Buffer> {    const messageContents = message.subarray(5);
    +    if (encoding === 'identity') {
    +      return messageContents;
    +    } else if (encoding === 'deflate' || encoding === 'gzip') {
    +      let decompresser: zlib.Gunzip | zlib.Deflate;
    +      if (encoding === 'deflate') {
    +        decompresser = zlib.createInflate();
    +      } else {
    +        decompresser = zlib.createGunzip();
    +      }
    +      return new Promise((resolve, reject) => {
    +        let totalLength = 0
    +        const messageParts: Buffer[] = [];
    +        decompresser.on('data', (chunk: Buffer) => {
    +          messageParts.push(chunk);
    +          totalLength += chunk.byteLength;
    +          if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) {
    +            decompresser.destroy();
    +            reject({
    +              code: Status.RESOURCE_EXHAUSTED,
    +              details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}`
    +            });
    +          }
    +        });
    +        decompresser.on('end', () => {
    +          resolve(Buffer.concat(messageParts));
    +        });
    +        decompresser.write(messageContents);
    +        decompresser.end();
    +      });
    +    } else {
    +      return Promise.reject({
    +        code: Status.UNIMPLEMENTED,
    +        details: `Received message compressed with unsupported encoding "${encoding}"`,
    +      });
         }
    -
    -    return Promise.reject({
    -      code: Status.UNIMPLEMENTED,
    -      details: `Received message compressed with unsupported encoding "${encoding}"`,
    -    });
       }
     
       sendMetadata(customMetadata?: Metadata) {
    @@ -807,7 +827,7 @@ export class Http2ServerCallStream<
           | ServerDuplexStream<RequestType, ResponseType>,
         encoding: string
       ) {
    -    const decoder = new StreamDecoder();
    +    const decoder = new StreamDecoder(this.maxReceiveMessageSize);
     
         let readsDone = false;
     
    @@ -823,29 +843,34 @@ export class Http2ServerCallStream<
         };
     
         this.stream.on('data', async (data: Buffer) => {
    -      const messages = decoder.write(data);
    +      let messages: Buffer[];
    +      try {
    +        messages = decoder.write(data);
    +      } catch (e) {
    +        this.sendError({
    +          code: Status.RESOURCE_EXHAUSTED,
    +          details: (e as Error).message
    +        });
    +        return;
    +      }
     
           pendingMessageProcessing = true;
           this.stream.pause();
           for (const message of messages) {
    -        if (
    -          this.maxReceiveMessageSize !== -1 &&
    -          message.length > this.maxReceiveMessageSize
    -        ) {
    -          this.sendError({
    -            code: Status.RESOURCE_EXHAUSTED,
    -            details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
    -          });
    -          return;
    -        }
             this.emit('receiveMessage');
     
             const compressed = message.readUInt8(0) === 1;
             const compressedMessageEncoding = compressed ? encoding : 'identity';
    -        const decompressedMessage = await this.getDecompressedMessage(
    -          message,
    -          compressedMessageEncoding
    -        );
    +        let decompressedMessage: Buffer;
    +        try {
    +          decompressedMessage = await this.getDecompressedMessage(
    +            message,
    +            compressedMessageEncoding
    +          );
    +        } catch (e) {
    +          this.sendError(e as Partial<StatusObject>);
    +          return;
    +        }
     
             // Encountered an error with decompression; it'll already have been propogated back
             // Just return early
    
  • packages/grpc-js/src/stream-decoder.ts+5 0 modified
    @@ -30,6 +30,8 @@ export class StreamDecoder {
       private readPartialMessage: Buffer[] = [];
       private readMessageRemaining = 0;
     
    +  constructor(private maxReadMessageLength: number) {}
    +
       write(data: Buffer): Buffer[] {
         let readHead = 0;
         let toRead: number;
    @@ -60,6 +62,9 @@ export class StreamDecoder {
               // readSizeRemaining >=0 here
               if (this.readSizeRemaining === 0) {
                 this.readMessageSize = this.readPartialSize.readUInt32BE(0);
    +            if (this.maxReadMessageLength !== -1 && this.readMessageSize > this.maxReadMessageLength) {
    +              throw new Error(`Received message larger than max (${this.readMessageSize} vs ${this.maxReadMessageLength})`);
    +            }
                 this.readMessageRemaining = this.readMessageSize;
                 if (this.readMessageRemaining > 0) {
                   this.readState = ReadState.READING_MESSAGE;
    
  • packages/grpc-js/src/subchannel-call.ts+11 3 modified
    @@ -18,7 +18,7 @@
     import * as http2 from 'http2';
     import * as os from 'os';
     
    -import { Status } from './constants';
    +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, Status } from './constants';
     import { Metadata } from './metadata';
     import { StreamDecoder } from './stream-decoder';
     import * as logging from './logging';
    @@ -76,7 +76,7 @@ export interface SubchannelCallInterceptingListener extends InterceptingListener
     }
     
     export class Http2SubchannelCall implements SubchannelCall {
    -  private decoder = new StreamDecoder();
    +  private decoder: StreamDecoder;
     
       private isReadFilterPending = false;
       private isPushPending = false;
    @@ -106,6 +106,8 @@ export class Http2SubchannelCall implements SubchannelCall {
         private readonly transport: Transport,
         private readonly callId: number
       ) {
    +    const maxReceiveMessageLength = transport.getOptions()['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
    +    this.decoder = new StreamDecoder(maxReceiveMessageLength);
         http2Stream.on('response', (headers, flags) => {
           let headersString = '';
           for (const header of Object.keys(headers)) {
    @@ -163,7 +165,13 @@ export class Http2SubchannelCall implements SubchannelCall {
             return;
           }
           this.trace('receive HTTP/2 data frame of length ' + data.length);
    -      const messages = this.decoder.write(data);
    +      let messages: Buffer[];
    +      try {
    +        messages = this.decoder.write(data);
    +      } catch (e) {
    +        this.cancelWithStatus(Status.RESOURCE_EXHAUSTED, (e as Error).message);
    +        return;
    +      }
     
           for (const message of messages) {
             this.trace('parsed message of length ' + message.length);
    
  • packages/grpc-js/src/transport.ts+13 2 modified
    @@ -62,7 +62,14 @@ export interface TransportDisconnectListener {
     export interface Transport {
       getChannelzRef(): SocketRef;
       getPeerName(): string;
    -  createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial<CallEventTracker>): SubchannelCall;
    +  getOptions(): ChannelOptions;
    +  createCall(
    +    metadata: Metadata,
    +    host: string,
    +    method: string,
    +    listener: SubchannelCallInterceptingListener,
    +    subchannelCallStatsTracker: Partial<CallEventTracker>
    +  ): SubchannelCall;
       addDisconnectListener(listener: TransportDisconnectListener): void;
       shutdown(): void;
     }
    @@ -119,7 +126,7 @@ class Http2Transport implements Transport {
       constructor(
         private session: http2.ClientHttp2Session,
         subchannelAddress: SubchannelAddress,
    -    options: ChannelOptions,
    +    private options: ChannelOptions,
         /**
          * Name of the remote server, if it is not the same as the subchannel
          * address, i.e. if connecting through an HTTP CONNECT proxy.
    @@ -495,6 +502,10 @@ class Http2Transport implements Transport {
         return this.subchannelAddressString;
       }
     
    +  getOptions() {
    +    return this.options;
    +  }
    +
       shutdown() {
         this.session.close();
         unregisterChannelzRef(this.channelzRef);
    
  • packages/grpc-js/test/fixtures/test_service.proto+1 0 modified
    @@ -21,6 +21,7 @@ message Request {
       bool error = 1;
       string message = 2;
       int32 errorAfter = 3;
    +  int32 responseLength = 4;
     }
     
     message Response {
    
  • packages/grpc-js/test/test-server-errors.ts+45 4 modified
    @@ -33,6 +33,7 @@ import {
     } from '../src/server-call';
     
     import { loadProtoFile } from './common';
    +import { CompressionAlgorithms } from '../src/compression-algorithms';
     
     const protoFile = join(__dirname, 'fixtures', 'test_service.proto');
     const testServiceDef = loadProtoFile(protoFile);
    @@ -309,7 +310,7 @@ describe('Other conditions', () => {
                 trailerMetadata
               );
             } else {
    -          cb(null, { count: 1 }, trailerMetadata);
    +          cb(null, { count: 1, message: 'a'.repeat(req.responseLength) }, trailerMetadata);
             }
           },
     
    @@ -319,20 +320,22 @@ describe('Other conditions', () => {
           ) {
             let count = 0;
             let errored = false;
    +        let responseLength = 0;
     
             stream.on('data', (data: any) => {
               if (data.error) {
                 const message = data.message || 'Requested error';
                 errored = true;
                 cb(new Error(message) as ServiceError, null, trailerMetadata);
               } else {
    +            responseLength += data.responseLength;
                 count++;
               }
             });
     
             stream.on('end', () => {
               if (!errored) {
    -            cb(null, { count }, trailerMetadata);
    +            cb(null, { count, message: 'a'.repeat(responseLength) }, trailerMetadata);
               }
             });
           },
    @@ -348,7 +351,7 @@ describe('Other conditions', () => {
               });
             } else {
               for (let i = 1; i <= 5; i++) {
    -            stream.write({ count: i });
    +            stream.write({ count: i, message: 'a'.repeat(req.responseLength) });
                 if (req.errorAfter && req.errorAfter === i) {
                   stream.emit('error', {
                     code: grpc.status.UNKNOWN,
    @@ -375,7 +378,7 @@ describe('Other conditions', () => {
                 err.metadata.add('count', '' + count);
                 stream.emit('error', err);
               } else {
    -            stream.write({ count });
    +            stream.write({ count, message: 'a'.repeat(data.responseLength) });
                 count++;
               }
             });
    @@ -739,6 +742,44 @@ describe('Other conditions', () => {
           });
         });
       });
    +
    +  describe('Max message size', () => {
    +    const largeMessage = 'a'.repeat(10_000_000);
    +    it('Should be enforced on the server', done => {
    +      client.unary({ message: largeMessage }, (error?: ServiceError) => {
    +        assert(error);
    +        assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +        done();
    +      });
    +    });
    +    it('Should be enforced on the client', done => {
    +      client.unary({ responseLength: 10_000_000 }, (error?: ServiceError) => {
    +        assert(error);
    +        assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +        done();
    +      });
    +    });
    +    describe('Compressed messages', () => {
    +      it('Should be enforced with gzip', done => {
    +        const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.gzip});
    +        compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => {
    +          assert(error);
    +          assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +          assert.match(error.details, /Received message that decompresses to a size larger/);
    +          done();
    +        });
    +      });
    +      it('Should be enforced with deflate', done => {
    +        const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.deflate});
    +        compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => {
    +          assert(error);
    +          assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +          assert.match(error.details, /Received message that decompresses to a size larger/);
    +          done();
    +        });
    +      });
    +    });
    +  });
     });
     
     function identity(arg: any): any {
    
08b0422dae56

Merge pull request from GHSA-7v5v-9h63-cj86

https://github.com/grpc/grpc-nodeMichael LumishJun 10, 2024via ghsa
10 files changed · +176 146
  • packages/grpc-js/package.json+1 1 modified
    @@ -1,6 +1,6 @@
     {
       "name": "@grpc/grpc-js",
    -  "version": "1.9.14",
    +  "version": "1.9.15",
       "description": "gRPC Library for Node - pure JS implementation",
       "homepage": "https://grpc.io/",
       "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
    
  • packages/grpc-js/src/compression-filter.ts+51 16 modified
    @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
     import { Channel } from './channel';
     import { ChannelOptions } from './channel-options';
     import { CompressionAlgorithms } from './compression-algorithms';
    -import { LogVerbosity } from './constants';
    +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, LogVerbosity, Status } from './constants';
     import { BaseFilter, Filter, FilterFactory } from './filter';
     import * as logging from './logging';
     import { Metadata, MetadataValue } from './metadata';
    @@ -98,6 +98,10 @@ class IdentityHandler extends CompressionHandler {
     }
     
     class DeflateHandler extends CompressionHandler {
    +  constructor(private maxRecvMessageLength: number) {
    +    super();
    +  }
    +
       compressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
           zlib.deflate(message, (err, output) => {
    @@ -112,18 +116,34 @@ class DeflateHandler extends CompressionHandler {
     
       decompressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
    -      zlib.inflate(message, (err, output) => {
    -        if (err) {
    -          reject(err);
    -        } else {
    -          resolve(output);
    +      let totalLength = 0;
    +      const messageParts: Buffer[] = [];
    +      const decompresser = zlib.createInflate();
    +      decompresser.on('data', (chunk: Buffer) => {
    +        messageParts.push(chunk);
    +        totalLength += chunk.byteLength;
    +        if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
    +          decompresser.destroy();
    +          reject({
    +            code: Status.RESOURCE_EXHAUSTED,
    +            details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
    +          });
             }
           });
    +      decompresser.on('end', () => {
    +        resolve(Buffer.concat(messageParts));
    +      });
    +      decompresser.write(message);
    +      decompresser.end();
         });
       }
     }
     
     class GzipHandler extends CompressionHandler {
    +  constructor(private maxRecvMessageLength: number) {
    +    super();
    +  }
    +
       compressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
           zlib.gzip(message, (err, output) => {
    @@ -138,13 +158,25 @@ class GzipHandler extends CompressionHandler {
     
       decompressMessage(message: Buffer) {
         return new Promise<Buffer>((resolve, reject) => {
    -      zlib.unzip(message, (err, output) => {
    -        if (err) {
    -          reject(err);
    -        } else {
    -          resolve(output);
    +      let totalLength = 0;
    +      const messageParts: Buffer[] = [];
    +      const decompresser = zlib.createGunzip();
    +      decompresser.on('data', (chunk: Buffer) => {
    +        messageParts.push(chunk);
    +        totalLength += chunk.byteLength;
    +        if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
    +          decompresser.destroy();
    +          reject({
    +            code: Status.RESOURCE_EXHAUSTED,
    +            details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
    +          });
             }
           });
    +      decompresser.on('end', () => {
    +        resolve(Buffer.concat(messageParts));
    +      });
    +      decompresser.write(message);
    +      decompresser.end();
         });
       }
     }
    @@ -169,14 +201,14 @@ class UnknownHandler extends CompressionHandler {
       }
     }
     
    -function getCompressionHandler(compressionName: string): CompressionHandler {
    +function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler {
       switch (compressionName) {
         case 'identity':
           return new IdentityHandler();
         case 'deflate':
    -      return new DeflateHandler();
    +      return new DeflateHandler(maxReceiveMessageSize);
         case 'gzip':
    -      return new GzipHandler();
    +      return new GzipHandler(maxReceiveMessageSize);
         default:
           return new UnknownHandler(compressionName);
       }
    @@ -186,6 +218,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
       private sendCompression: CompressionHandler = new IdentityHandler();
       private receiveCompression: CompressionHandler = new IdentityHandler();
       private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
    +  private maxReceiveMessageLength: number;
     
       constructor(
         channelOptions: ChannelOptions,
    @@ -195,6 +228,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
     
         const compressionAlgorithmKey =
           channelOptions['grpc.default_compression_algorithm'];
    +    this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH
         if (compressionAlgorithmKey !== undefined) {
           if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
             const clientSelectedEncoding = CompressionAlgorithms[
    @@ -215,7 +249,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
             ) {
               this.currentCompressionAlgorithm = clientSelectedEncoding;
               this.sendCompression = getCompressionHandler(
    -            this.currentCompressionAlgorithm
    +            this.currentCompressionAlgorithm,
    +            -1
               );
             }
           } else {
    @@ -247,7 +282,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
         if (receiveEncoding.length > 0) {
           const encoding: MetadataValue = receiveEncoding[0];
           if (typeof encoding === 'string') {
    -        this.receiveCompression = getCompressionHandler(encoding);
    +        this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength);
           }
         }
         metadata.remove('grpc-encoding');
    
  • packages/grpc-js/src/internal-channel.ts+0 2 modified
    @@ -33,7 +33,6 @@ import {
     } from './resolver';
     import { trace } from './logging';
     import { SubchannelAddress } from './subchannel-address';
    -import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
     import { mapProxyName } from './http_proxy';
     import { GrpcUri, parseUri, uriToString } from './uri-parser';
     import { ServerSurfaceCall } from './server-call';
    @@ -393,7 +392,6 @@ export class InternalChannel {
           }
         );
         this.filterStackFactory = new FilterStackFactory([
    -      new MaxMessageSizeFilterFactory(this.options),
           new CompressionFilterFactory(this, this.options),
         ]);
         this.trace(
    
  • packages/grpc-js/src/max-message-size-filter.ts+0 88 removed
    @@ -1,88 +0,0 @@
    -/*
    - * Copyright 2020 gRPC authors.
    - *
    - * Licensed under the Apache License, Version 2.0 (the "License");
    - * you may not use this file except in compliance with the License.
    - * You may obtain a copy of the License at
    - *
    - *     http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - *
    - */
    -
    -import { BaseFilter, Filter, FilterFactory } from './filter';
    -import { WriteObject } from './call-interface';
    -import {
    -  Status,
    -  DEFAULT_MAX_SEND_MESSAGE_LENGTH,
    -  DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
    -} from './constants';
    -import { ChannelOptions } from './channel-options';
    -import { Metadata } from './metadata';
    -
    -export class MaxMessageSizeFilter extends BaseFilter implements Filter {
    -  private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
    -  private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
    -  constructor(options: ChannelOptions) {
    -    super();
    -    if ('grpc.max_send_message_length' in options) {
    -      this.maxSendMessageSize = options['grpc.max_send_message_length']!;
    -    }
    -    if ('grpc.max_receive_message_length' in options) {
    -      this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
    -    }
    -  }
    -
    -  async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
    -    /* A configured size of -1 means that there is no limit, so skip the check
    -     * entirely */
    -    if (this.maxSendMessageSize === -1) {
    -      return message;
    -    } else {
    -      const concreteMessage = await message;
    -      if (concreteMessage.message.length > this.maxSendMessageSize) {
    -        throw {
    -          code: Status.RESOURCE_EXHAUSTED,
    -          details: `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`,
    -          metadata: new Metadata(),
    -        };
    -      } else {
    -        return concreteMessage;
    -      }
    -    }
    -  }
    -
    -  async receiveMessage(message: Promise<Buffer>): Promise<Buffer> {
    -    /* A configured size of -1 means that there is no limit, so skip the check
    -     * entirely */
    -    if (this.maxReceiveMessageSize === -1) {
    -      return message;
    -    } else {
    -      const concreteMessage = await message;
    -      if (concreteMessage.length > this.maxReceiveMessageSize) {
    -        throw {
    -          code: Status.RESOURCE_EXHAUSTED,
    -          details: `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`,
    -          metadata: new Metadata(),
    -        };
    -      } else {
    -        return concreteMessage;
    -      }
    -    }
    -  }
    -}
    -
    -export class MaxMessageSizeFilterFactory
    -  implements FilterFactory<MaxMessageSizeFilter>
    -{
    -  constructor(private readonly options: ChannelOptions) {}
    -
    -  createFilter(): MaxMessageSizeFilter {
    -    return new MaxMessageSizeFilter(this.options);
    -  }
    -}
    
  • packages/grpc-js/src/server-call.ts+56 31 modified
    @@ -19,7 +19,6 @@ import { EventEmitter } from 'events';
     import * as http2 from 'http2';
     import { Duplex, Readable, Writable } from 'stream';
     import * as zlib from 'zlib';
    -import { promisify } from 'util';
     
     import {
       Status,
    @@ -38,8 +37,6 @@ import { Deadline } from './deadline';
     import { getErrorCode, getErrorMessage } from './error';
     
     const TRACER_NAME = 'server_call';
    -const unzip = promisify(zlib.unzip);
    -const inflate = promisify(zlib.inflate);
     
     function trace(text: string): void {
       logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
    @@ -480,19 +477,42 @@ export class Http2ServerCallStream<
       private getDecompressedMessage(
         message: Buffer,
         encoding: string
    -  ): Buffer | Promise<Buffer> {
    -    if (encoding === 'deflate') {
    -      return inflate(message.subarray(5));
    -    } else if (encoding === 'gzip') {
    -      return unzip(message.subarray(5));
    -    } else if (encoding === 'identity') {
    -      return message.subarray(5);
    +  ): Buffer | Promise<Buffer> {    const messageContents = message.subarray(5);
    +    if (encoding === 'identity') {
    +      return messageContents;
    +    } else if (encoding === 'deflate' || encoding === 'gzip') {
    +      let decompresser: zlib.Gunzip | zlib.Deflate;
    +      if (encoding === 'deflate') {
    +        decompresser = zlib.createInflate();
    +      } else {
    +        decompresser = zlib.createGunzip();
    +      }
    +      return new Promise((resolve, reject) => {
    +        let totalLength = 0
    +        const messageParts: Buffer[] = [];
    +        decompresser.on('data', (chunk: Buffer) => {
    +          messageParts.push(chunk);
    +          totalLength += chunk.byteLength;
    +          if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) {
    +            decompresser.destroy();
    +            reject({
    +              code: Status.RESOURCE_EXHAUSTED,
    +              details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}`
    +            });
    +          }
    +        });
    +        decompresser.on('end', () => {
    +          resolve(Buffer.concat(messageParts));
    +        });
    +        decompresser.write(messageContents);
    +        decompresser.end();
    +      });
    +    } else {
    +      return Promise.reject({
    +        code: Status.UNIMPLEMENTED,
    +        details: `Received message compressed with unsupported encoding "${encoding}"`,
    +      });
         }
    -
    -    return Promise.reject({
    -      code: Status.UNIMPLEMENTED,
    -      details: `Received message compressed with unsupported encoding "${encoding}"`,
    -    });
       }
     
       sendMetadata(customMetadata?: Metadata) {
    @@ -816,7 +836,7 @@ export class Http2ServerCallStream<
           | ServerDuplexStream<RequestType, ResponseType>,
         encoding: string
       ) {
    -    const decoder = new StreamDecoder();
    +    const decoder = new StreamDecoder(this.maxReceiveMessageSize);
     
         let readsDone = false;
     
    @@ -832,29 +852,34 @@ export class Http2ServerCallStream<
         };
     
         this.stream.on('data', async (data: Buffer) => {
    -      const messages = decoder.write(data);
    +      let messages: Buffer[];
    +      try {
    +        messages = decoder.write(data);
    +      } catch (e) {
    +        this.sendError({
    +          code: Status.RESOURCE_EXHAUSTED,
    +          details: (e as Error).message
    +        });
    +        return;
    +      }
     
           pendingMessageProcessing = true;
           this.stream.pause();
           for (const message of messages) {
    -        if (
    -          this.maxReceiveMessageSize !== -1 &&
    -          message.length > this.maxReceiveMessageSize
    -        ) {
    -          this.sendError({
    -            code: Status.RESOURCE_EXHAUSTED,
    -            details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
    -          });
    -          return;
    -        }
             this.emit('receiveMessage');
     
             const compressed = message.readUInt8(0) === 1;
             const compressedMessageEncoding = compressed ? encoding : 'identity';
    -        const decompressedMessage = await this.getDecompressedMessage(
    -          message,
    -          compressedMessageEncoding
    -        );
    +        let decompressedMessage: Buffer;
    +        try {
    +          decompressedMessage = await this.getDecompressedMessage(
    +            message,
    +            compressedMessageEncoding
    +          );
    +        } catch (e) {
    +          this.sendError(e as Partial<StatusObject>);
    +          return;
    +        }
     
             // Encountered an error with decompression; it'll already have been propogated back
             // Just return early
    
  • packages/grpc-js/src/stream-decoder.ts+5 0 modified
    @@ -30,6 +30,8 @@ export class StreamDecoder {
       private readPartialMessage: Buffer[] = [];
       private readMessageRemaining = 0;
     
    +  constructor(private maxReadMessageLength: number) {}
    +
       write(data: Buffer): Buffer[] {
         let readHead = 0;
         let toRead: number;
    @@ -60,6 +62,9 @@ export class StreamDecoder {
               // readSizeRemaining >=0 here
               if (this.readSizeRemaining === 0) {
                 this.readMessageSize = this.readPartialSize.readUInt32BE(0);
    +            if (this.maxReadMessageLength !== -1 && this.readMessageSize > this.maxReadMessageLength) {
    +              throw new Error(`Received message larger than max (${this.readMessageSize} vs ${this.maxReadMessageLength})`);
    +            }
                 this.readMessageRemaining = this.readMessageSize;
                 if (this.readMessageRemaining > 0) {
                   this.readState = ReadState.READING_MESSAGE;
    
  • packages/grpc-js/src/subchannel-call.ts+11 3 modified
    @@ -18,7 +18,7 @@
     import * as http2 from 'http2';
     import * as os from 'os';
     
    -import { Status } from './constants';
    +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, Status } from './constants';
     import { Metadata } from './metadata';
     import { StreamDecoder } from './stream-decoder';
     import * as logging from './logging';
    @@ -82,7 +82,7 @@ export interface SubchannelCallInterceptingListener
     }
     
     export class Http2SubchannelCall implements SubchannelCall {
    -  private decoder = new StreamDecoder();
    +  private decoder: StreamDecoder;
     
       private isReadFilterPending = false;
       private isPushPending = false;
    @@ -112,6 +112,8 @@ export class Http2SubchannelCall implements SubchannelCall {
         private readonly transport: Transport,
         private readonly callId: number
       ) {
    +    const maxReceiveMessageLength = transport.getOptions()['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
    +    this.decoder = new StreamDecoder(maxReceiveMessageLength);
         http2Stream.on('response', (headers, flags) => {
           let headersString = '';
           for (const header of Object.keys(headers)) {
    @@ -169,7 +171,13 @@ export class Http2SubchannelCall implements SubchannelCall {
             return;
           }
           this.trace('receive HTTP/2 data frame of length ' + data.length);
    -      const messages = this.decoder.write(data);
    +      let messages: Buffer[];
    +      try {
    +        messages = this.decoder.write(data);
    +      } catch (e) {
    +        this.cancelWithStatus(Status.RESOURCE_EXHAUSTED, (e as Error).message);
    +        return;
    +      }
     
           for (const message of messages) {
             this.trace('parsed message of length ' + message.length);
    
  • packages/grpc-js/src/transport.ts+6 1 modified
    @@ -83,6 +83,7 @@ export interface TransportDisconnectListener {
     export interface Transport {
       getChannelzRef(): SocketRef;
       getPeerName(): string;
    +  getOptions(): ChannelOptions;
       createCall(
         metadata: Metadata,
         host: string,
    @@ -146,7 +147,7 @@ class Http2Transport implements Transport {
       constructor(
         private session: http2.ClientHttp2Session,
         subchannelAddress: SubchannelAddress,
    -    options: ChannelOptions,
    +    private options: ChannelOptions,
         /**
          * Name of the remote server, if it is not the same as the subchannel
          * address, i.e. if connecting through an HTTP CONNECT proxy.
    @@ -601,6 +602,10 @@ class Http2Transport implements Transport {
         return this.subchannelAddressString;
       }
     
    +  getOptions() {
    +    return this.options;
    +  }
    +
       shutdown() {
         this.session.close();
         unregisterChannelzRef(this.channelzRef);
    
  • packages/grpc-js/test/fixtures/test_service.proto+1 0 modified
    @@ -21,6 +21,7 @@ message Request {
       bool error = 1;
       string message = 2;
       int32 errorAfter = 3;
    +  int32 responseLength = 4;
     }
     
     message Response {
    
  • packages/grpc-js/test/test-server-errors.ts+45 4 modified
    @@ -33,6 +33,7 @@ import {
     } from '../src/server-call';
     
     import { loadProtoFile } from './common';
    +import { CompressionAlgorithms } from '../src/compression-algorithms';
     
     const protoFile = join(__dirname, 'fixtures', 'test_service.proto');
     const testServiceDef = loadProtoFile(protoFile);
    @@ -310,7 +311,7 @@ describe('Other conditions', () => {
                 trailerMetadata
               );
             } else {
    -          cb(null, { count: 1 }, trailerMetadata);
    +          cb(null, { count: 1, message: 'a'.repeat(req.responseLength) }, trailerMetadata);
             }
           },
     
    @@ -320,20 +321,22 @@ describe('Other conditions', () => {
           ) {
             let count = 0;
             let errored = false;
    +        let responseLength = 0;
     
             stream.on('data', (data: any) => {
               if (data.error) {
                 const message = data.message || 'Requested error';
                 errored = true;
                 cb(new Error(message) as ServiceError, null, trailerMetadata);
               } else {
    +            responseLength += data.responseLength;
                 count++;
               }
             });
     
             stream.on('end', () => {
               if (!errored) {
    -            cb(null, { count }, trailerMetadata);
    +            cb(null, { count, message: 'a'.repeat(responseLength) }, trailerMetadata);
               }
             });
           },
    @@ -349,7 +352,7 @@ describe('Other conditions', () => {
               });
             } else {
               for (let i = 1; i <= 5; i++) {
    -            stream.write({ count: i });
    +            stream.write({ count: i, message: 'a'.repeat(req.responseLength) });
                 if (req.errorAfter && req.errorAfter === i) {
                   stream.emit('error', {
                     code: grpc.status.UNKNOWN,
    @@ -376,7 +379,7 @@ describe('Other conditions', () => {
                 err.metadata.add('count', '' + count);
                 stream.emit('error', err);
               } else {
    -            stream.write({ count });
    +            stream.write({ count, message: 'a'.repeat(data.responseLength) });
                 count++;
               }
             });
    @@ -740,6 +743,44 @@ describe('Other conditions', () => {
           });
         });
       });
    +
    +  describe('Max message size', () => {
    +    const largeMessage = 'a'.repeat(10_000_000);
    +    it('Should be enforced on the server', done => {
    +      client.unary({ message: largeMessage }, (error?: ServiceError) => {
    +        assert(error);
    +        assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +        done();
    +      });
    +    });
    +    it('Should be enforced on the client', done => {
    +      client.unary({ responseLength: 10_000_000 }, (error?: ServiceError) => {
    +        assert(error);
    +        assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +        done();
    +      });
    +    });
    +    describe('Compressed messages', () => {
    +      it('Should be enforced with gzip', done => {
    +        const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.gzip});
    +        compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => {
    +          assert(error);
    +          assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +          assert.match(error.details, /Received message that decompresses to a size larger/);
    +          done();
    +        });
    +      });
    +      it('Should be enforced with deflate', done => {
    +        const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.deflate});
    +        compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => {
    +          assert(error);
    +          assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
    +          assert.match(error.details, /Received message that decompresses to a size larger/);
    +          done();
    +        });
    +      });
    +    });
    +  });
     });
     
     function identity(arg: any): any {
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

6

News mentions

0

No linked articles in our index yet.