From 37099980127e05a83c9f3d609879c0af6ca4e66b Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 13 Apr 2023 09:25:38 -0700 Subject: [PATCH 01/30] grpc-js: Fix a couple of errors from a previous PR --- packages/grpc-js/src/subchannel.ts | 2 -- packages/grpc-js/test/test-global-subchannel-pool.ts | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index de420cc97..307f6b81e 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -227,8 +227,6 @@ export class Subchannel { } const previousState = this.connectivityState; this.connectivityState = newState; - process.nextTick(() => { - }); switch (newState) { case ConnectivityState.READY: this.stopBackoff(); diff --git a/packages/grpc-js/test/test-global-subchannel-pool.ts b/packages/grpc-js/test/test-global-subchannel-pool.ts index c19125687..999a11bf7 100644 --- a/packages/grpc-js/test/test-global-subchannel-pool.ts +++ b/packages/grpc-js/test/test-global-subchannel-pool.ts @@ -27,7 +27,7 @@ const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); const echoService = loadProtoFile(protoFile).EchoService as ServiceClientConstructor; -describe.only('Global subchannel pool', () => { +describe('Global subchannel pool', () => { let server: Server; let serverPort: number; From 2cb6ef86d4debde64e440d53ddb0f5ae10f228c7 Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Fri, 7 Apr 2023 14:45:21 -0700 Subject: [PATCH 02/30] PSM Interop: experiment with qps affect on circuit_breaking ref b/232859415 --- packages/grpc-js-xds/scripts/xds.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/grpc-js-xds/scripts/xds.sh b/packages/grpc-js-xds/scripts/xds.sh index d4490c5ef..af22f584f 100755 --- a/packages/grpc-js-xds/scripts/xds.sh +++ b/packages/grpc-js-xds/scripts/xds.sh @@ -60,6 +60,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ --gcp_suffix=$(date '+%s') \ --verbose \ + --qps=50 \ ${XDS_V3_OPT-} \ --client_cmd="$(which node) --enable-source-maps --prof --logfile=${KOKORO_ARTIFACTS_DIR}/github/grpc/reports/prof.log grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \ --server=xds:///{server_uri} \ From 856559cce1d27771d50dc97d130f5dc423dd4723 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 20 Apr 2023 14:34:06 -0700 Subject: [PATCH 03/30] grpc-js-xds: Fix handling of resource validation errors --- .../src/xds-stream-state/xds-stream-state.ts | 46 +++--- packages/grpc-js-xds/test/test-nack.ts | 156 ++++++++++++++++++ 2 files changed, 182 insertions(+), 20 deletions(-) create mode 100644 packages/grpc-js-xds/test/test-nack.ts diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index e20bc7e9b..b2d7b2279 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -15,7 +15,7 @@ * */ -import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { experimental, logVerbosity, Metadata, status, StatusObject } from "@grpc/grpc-js"; import { Any__Output } from "../generated/google/protobuf/Any"; const TRACER_NAME = 'xds_client'; @@ -157,19 +157,32 @@ export abstract class BaseXdsStreamState implements XdsStreamState return Array.from(this.subscriptions.keys()); } handleResponses(responses: ResourcePair[]): HandleResponseResult { - const validResponses: ResponseType[] = []; let result: HandleResponseResult = { accepted: [], rejected: [], missing: [] } + const allResourceNames = new Set(); for (const {resource, raw} of responses) { const resourceName = this.getResourceName(resource); + allResourceNames.add(resourceName); + const subscriptionEntry = this.subscriptions.get(resourceName); if (this.validateResponse(resource)) { - validResponses.push(resource); result.accepted.push({ name: resourceName, raw: raw}); + if (subscriptionEntry) { + const watchers = subscriptionEntry.watchers; + for (const watcher of watchers) { + watcher.onValidUpdate(resource); + } + clearTimeout(subscriptionEntry.resourceTimer); + subscriptionEntry.cachedResponse = resource; + if (subscriptionEntry.deletionIgnored) { + experimental.log(logVerbosity.INFO, 'Received resource with previously ignored deletion: ' + resourceName); + subscriptionEntry.deletionIgnored = false; + } + } } else { this.trace('Validation failed for message ' + JSON.stringify(resource)); result.rejected.push({ @@ -177,23 +190,16 @@ export abstract class BaseXdsStreamState implements XdsStreamState raw: raw, error: `Validation failed for resource ${resourceName}` }); - } - } - const allResourceNames = new Set(); - for (const resource of validResponses) { - const resourceName = this.getResourceName(resource); - allResourceNames.add(resourceName); - const subscriptionEntry = this.subscriptions.get(resourceName); - if (subscriptionEntry) { - const watchers = subscriptionEntry.watchers; - for (const watcher of watchers) { - watcher.onValidUpdate(resource); - } - clearTimeout(subscriptionEntry.resourceTimer); - subscriptionEntry.cachedResponse = resource; - if (subscriptionEntry.deletionIgnored) { - experimental.log(logVerbosity.INFO, 'Received resource with previously ignored deletion: ' + resourceName); - subscriptionEntry.deletionIgnored = false; + if (subscriptionEntry) { + const watchers = subscriptionEntry.watchers; + for (const watcher of watchers) { + watcher.onTransientError({ + code: status.UNAVAILABLE, + details: `Validation failed for resource ${resourceName}`, + metadata: new Metadata() + }); + } + clearTimeout(subscriptionEntry.resourceTimer); } } } diff --git a/packages/grpc-js-xds/test/test-nack.ts b/packages/grpc-js-xds/test/test-nack.ts new file mode 100644 index 000000000..ad1aad448 --- /dev/null +++ b/packages/grpc-js-xds/test/test-nack.ts @@ -0,0 +1,156 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; +import { register } from "../src"; +import { Backend } from "./backend"; +import { XdsTestClient } from "./client"; +import { FakeCluster, FakeRouteGroup } from "./framework"; +import { XdsServer } from "./xds-server"; + +register(); + +describe('Validation errors', () => { + let xdsServer: XdsServer; + let client: XdsTestClient; + beforeEach(done => { + xdsServer = new XdsServer(); + xdsServer.startServer(error => { + done(error); + }); + }); + afterEach(() => { + client?.close(); + xdsServer?.shutdownServer(); + }); + it('Should continue to use a valid resource after receiving an invalid EDS update', done => { + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + client = new XdsTestClient('route1', xdsServer); + client.startCalls(100); + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + // After backends receive calls, set invalid EDS resource + xdsServer.setEdsResource({cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]}); + let seenNack = false; + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + if (seenNack) { + return; + } + seenNack = true; + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + client.stopCalls(); + done(); + }); + } + }); + }, reason => done(reason)); + }, reason => done(reason)); + }); + it('Should continue to use a valid resource after receiving an invalid CDS update', done => { + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + client = new XdsTestClient('route1', xdsServer); + client.startCalls(100); + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + // After backends receive calls, set invalid CDS resource + xdsServer.setCdsResource({name: cluster.getClusterConfig().name}); + let seenNack = false; + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + if (seenNack) { + return; + } + seenNack = true; + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + client.stopCalls(); + done(); + }); + } + }); + }, reason => done(reason)); + }, reason => done(reason)); + }); + it('Should continue to use a valid resource after receiving an invalid RDS update', done => { + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + client = new XdsTestClient('route1', xdsServer); + client.startCalls(100); + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + // After backends receive calls, set invalid RDS resource + xdsServer.setRdsResource({name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]}); + let seenNack = false; + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + if (seenNack) { + return; + } + seenNack = true; + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + client.stopCalls(); + done(); + }); + } + }); + }, reason => done(reason)); + }, reason => done(reason)); + }); + it('Should continue to use a valid resource after receiving an invalid LDS update', done => { + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + client = new XdsTestClient('route1', xdsServer); + client.startCalls(100); + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + // After backends receive calls, set invalid LDS resource + xdsServer.setLdsResource({name: routeGroup.getListener().name}); + let seenNack = false; + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + if (seenNack) { + return; + } + seenNack = true; + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + client.stopCalls(); + done(); + }); + } + }); + }, reason => done(reason)); + }, reason => done(reason)); + }); +}); \ No newline at end of file From 48ef1ed202e19c36e934015b6b1ed039cf7d2f00 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 20 Apr 2023 14:35:39 -0700 Subject: [PATCH 04/30] grpc-js-xds: Bump version to 1.8.2 --- packages/grpc-js-xds/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index 9511776c3..7c735b652 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js-xds", - "version": "1.8.1", + "version": "1.8.2", "description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.", "main": "build/src/index.js", "scripts": { From dfccd687f0bc1df7d8d7dd599249b098a772cf53 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 24 Apr 2023 16:21:12 -0700 Subject: [PATCH 05/30] Address review comments --- .../src/xds-stream-state/xds-stream-state.ts | 8 +++----- packages/grpc-js-xds/test/test-nack.ts | 20 +++++++++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index b2d7b2279..b04adb79a 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -172,14 +172,13 @@ export abstract class BaseXdsStreamState implements XdsStreamState name: resourceName, raw: raw}); if (subscriptionEntry) { - const watchers = subscriptionEntry.watchers; - for (const watcher of watchers) { + for (const watcher of subscriptionEntry.watchers) { watcher.onValidUpdate(resource); } clearTimeout(subscriptionEntry.resourceTimer); subscriptionEntry.cachedResponse = resource; if (subscriptionEntry.deletionIgnored) { - experimental.log(logVerbosity.INFO, 'Received resource with previously ignored deletion: ' + resourceName); + experimental.log(logVerbosity.INFO, `Received resource with previously ignored deletion: ${resourceName}`); subscriptionEntry.deletionIgnored = false; } } @@ -191,8 +190,7 @@ export abstract class BaseXdsStreamState implements XdsStreamState error: `Validation failed for resource ${resourceName}` }); if (subscriptionEntry) { - const watchers = subscriptionEntry.watchers; - for (const watcher of watchers) { + for (const watcher of subscriptionEntry.watchers) { watcher.onTransientError({ code: status.UNAVAILABLE, details: `Validation failed for resource ${resourceName}`, diff --git a/packages/grpc-js-xds/test/test-nack.ts b/packages/grpc-js-xds/test/test-nack.ts index ad1aad448..b5bfb773e 100644 --- a/packages/grpc-js-xds/test/test-nack.ts +++ b/packages/grpc-js-xds/test/test-nack.ts @@ -38,7 +38,7 @@ describe('Validation errors', () => { xdsServer?.shutdownServer(); }); it('Should continue to use a valid resource after receiving an invalid EDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -49,7 +49,8 @@ describe('Validation errors', () => { client.startCalls(100); routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { // After backends receive calls, set invalid EDS resource - xdsServer.setEdsResource({cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]}); + const invalidEdsResource = {cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]}; + xdsServer.setEdsResource(invalidEdsResource); let seenNack = false; xdsServer.addResponseListener((typeUrl, responseState) => { if (responseState.state === 'NACKED') { @@ -67,7 +68,7 @@ describe('Validation errors', () => { }, reason => done(reason)); }); it('Should continue to use a valid resource after receiving an invalid CDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -78,7 +79,8 @@ describe('Validation errors', () => { client.startCalls(100); routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { // After backends receive calls, set invalid CDS resource - xdsServer.setCdsResource({name: cluster.getClusterConfig().name}); + const invalidCdsResource = {name: cluster.getClusterConfig().name}; + xdsServer.setCdsResource(invalidCdsResource); let seenNack = false; xdsServer.addResponseListener((typeUrl, responseState) => { if (responseState.state === 'NACKED') { @@ -96,7 +98,7 @@ describe('Validation errors', () => { }, reason => done(reason)); }); it('Should continue to use a valid resource after receiving an invalid RDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -107,7 +109,8 @@ describe('Validation errors', () => { client.startCalls(100); routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { // After backends receive calls, set invalid RDS resource - xdsServer.setRdsResource({name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]}); + const invalidRdsResource = {name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]}; + xdsServer.setRdsResource(invalidRdsResource); let seenNack = false; xdsServer.addResponseListener((typeUrl, responseState) => { if (responseState.state === 'NACKED') { @@ -125,7 +128,7 @@ describe('Validation errors', () => { }, reason => done(reason)); }); it('Should continue to use a valid resource after receiving an invalid LDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -136,7 +139,8 @@ describe('Validation errors', () => { client.startCalls(100); routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { // After backends receive calls, set invalid LDS resource - xdsServer.setLdsResource({name: routeGroup.getListener().name}); + const invalidLdsResource = {name: routeGroup.getListener().name}; + xdsServer.setLdsResource(invalidLdsResource); let seenNack = false; xdsServer.addResponseListener((typeUrl, responseState) => { if (responseState.state === 'NACKED') { From edeeda6424d568b80eb4478b63afba05c51904a5 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 24 Apr 2023 16:22:49 -0700 Subject: [PATCH 06/30] Add trailing newline in packages/grpc-js-xds/test/test-nack.ts Co-authored-by: Sergii Tkachenko --- packages/grpc-js-xds/test/test-nack.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/test/test-nack.ts b/packages/grpc-js-xds/test/test-nack.ts index b5bfb773e..9395628a6 100644 --- a/packages/grpc-js-xds/test/test-nack.ts +++ b/packages/grpc-js-xds/test/test-nack.ts @@ -157,4 +157,4 @@ describe('Validation errors', () => { }, reason => done(reason)); }, reason => done(reason)); }); -}); \ No newline at end of file +}); From 0933633424b5eeec56d047d5e9fd2dd09dff4ac9 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 28 Apr 2023 15:00:05 -0700 Subject: [PATCH 07/30] PSM Interop: Increase old driver QPS to 75 --- packages/grpc-js-xds/scripts/xds.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/scripts/xds.sh b/packages/grpc-js-xds/scripts/xds.sh index af22f584f..7e6794a31 100755 --- a/packages/grpc-js-xds/scripts/xds.sh +++ b/packages/grpc-js-xds/scripts/xds.sh @@ -60,7 +60,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ --gcp_suffix=$(date '+%s') \ --verbose \ - --qps=50 \ + --qps=75 \ ${XDS_V3_OPT-} \ --client_cmd="$(which node) --enable-source-maps --prof --logfile=${KOKORO_ARTIFACTS_DIR}/github/grpc/reports/prof.log grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \ --server=xds:///{server_uri} \ From 2b455e7d18d1c108bd1ca901678c757434c054b7 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 31 May 2023 14:05:10 -0700 Subject: [PATCH 08/30] grpc-js: Fix a couple of minor issues --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/client.ts | 36 ++++++++++++++------- packages/grpc-js/src/load-balancing-call.ts | 8 +++-- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index dd341ef03..7d8c1a597 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.14", + "version": "1.8.15", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index f96f8fdf0..bdbdc6e97 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -323,7 +323,7 @@ export class Client { emitter.call = call; let responseMessage: ResponseType | null = null; let receivedStatus = false; - const callerStackError = new Error(); + let callerStackError: Error | null = new Error(); call.start(callProperties.metadata, { onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); @@ -342,19 +342,22 @@ export class Client { receivedStatus = true; if (status.code === Status.OK) { if (responseMessage === null) { - const callerStack = getErrorStackString(callerStackError); + const callerStack = getErrorStackString(callerStackError!); callProperties.callback!(callErrorFromStatus({ code: Status.INTERNAL, details: 'No message received', metadata: status.metadata - }, callerStack)); + }, /*callerStack*/'')); } else { callProperties.callback!(null, responseMessage); } } else { - const callerStack = getErrorStackString(callerStackError); - callProperties.callback!(callErrorFromStatus(status, callerStack)); + const callerStack = getErrorStackString(callerStackError!); + callProperties.callback!(callErrorFromStatus(status, /*callerStack*/'')); } + /* Avoid retaining the callerStackError object in the call context of + * the status event handler. */ + callerStackError = null; emitter.emit('status', status); }, }); @@ -448,7 +451,7 @@ export class Client { emitter.call = call; let responseMessage: ResponseType | null = null; let receivedStatus = false; - const callerStackError = new Error(); + let callerStackError: Error | null = new Error(); call.start(callProperties.metadata, { onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); @@ -467,7 +470,7 @@ export class Client { receivedStatus = true; if (status.code === Status.OK) { if (responseMessage === null) { - const callerStack = getErrorStackString(callerStackError); + const callerStack = getErrorStackString(callerStackError!); callProperties.callback!(callErrorFromStatus({ code: Status.INTERNAL, details: 'No message received', @@ -477,9 +480,12 @@ export class Client { callProperties.callback!(null, responseMessage); } } else { - const callerStack = getErrorStackString(callerStackError); + const callerStack = getErrorStackString(callerStackError!); callProperties.callback!(callErrorFromStatus(status, callerStack)); } + /* Avoid retaining the callerStackError object in the call context of + * the status event handler. */ + callerStackError = null; emitter.emit('status', status); }, }); @@ -577,7 +583,7 @@ export class Client { * call after that. */ stream.call = call; let receivedStatus = false; - const callerStackError = new Error(); + let callerStackError: Error | null = new Error(); call.start(callProperties.metadata, { onReceiveMetadata(metadata: Metadata) { stream.emit('metadata', metadata); @@ -593,9 +599,12 @@ export class Client { receivedStatus = true; stream.push(null); if (status.code !== Status.OK) { - const callerStack = getErrorStackString(callerStackError); + const callerStack = getErrorStackString(callerStackError!); stream.emit('error', callErrorFromStatus(status, callerStack)); } + /* Avoid retaining the callerStackError object in the call context of + * the status event handler. */ + callerStackError = null; stream.emit('status', status); }, }); @@ -673,7 +682,7 @@ export class Client { * call after that. */ stream.call = call; let receivedStatus = false; - const callerStackError = new Error(); + let callerStackError: Error | null = new Error(); call.start(callProperties.metadata, { onReceiveMetadata(metadata: Metadata) { stream.emit('metadata', metadata); @@ -688,9 +697,12 @@ export class Client { receivedStatus = true; stream.push(null); if (status.code !== Status.OK) { - const callerStack = getErrorStackString(callerStackError); + const callerStack = getErrorStackString(callerStackError!); stream.emit('error', callErrorFromStatus(status, callerStack)); } + /* Avoid retaining the callerStackError object in the call context of + * the status event handler. */ + callerStackError = null; stream.emit('status', status); }, }); diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts index f74933983..d88bdb809 100644 --- a/packages/grpc-js/src/load-balancing-call.ts +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -216,14 +216,18 @@ export class LoadBalancingCall implements Call { break; case PickResultType.DROP: const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details); - this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'DROP'); + setImmediate(() => { + this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'DROP'); + }); break; case PickResultType.TRANSIENT_FAILURE: if (this.metadata.getOptions().waitForReady) { this.channel.queueCallForPick(this); } else { const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details); - this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'PROCESSED'); + setImmediate(() => { + this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'PROCESSED'); + }); } break; case PickResultType.QUEUE: From 039032cdfb87b455d883e813002b4ed52fb472c9 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 1 Jun 2023 09:43:16 -0700 Subject: [PATCH 09/30] Merge pull request #2457 from XuanWang-Amos/xds_duplicate_bugs PSM Interop: Don't fail target if sub-target already failed --- packages/grpc-js-xds/scripts/xds_k8s_lb.sh | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh index b87e3306c..4a83d44d5 100755 --- a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh +++ b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh @@ -170,9 +170,6 @@ main() { run_test $test || (( ++failed_tests )) done echo "Failed test suites: ${failed_tests}" - if (( failed_tests > 0 )); then - exit 1 - fi } main "$@" From 87b5466b1b5e6d269a9750305c5842c9e30e56e8 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 20 Jun 2023 10:25:59 -0700 Subject: [PATCH 10/30] grpc-js: Implement trace function in Http2SubchannelConnector --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/transport.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 7d8c1a597..f0331f1fc 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.15", + "version": "1.8.16", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 8abc13aba..2f89d8f28 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -485,7 +485,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { private isShutdown = false; constructor(private channelTarget: GrpcUri) {} private trace(text: string) { - + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, this.channelTarget + ' ' + text); } private createSession(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions, proxyConnectionResult: ProxyConnectionResult): Promise { if (this.isShutdown) { From b53f5882f13a5cb3c599804e96304bf5b8407ea6 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 22 Jun 2023 14:26:31 -0700 Subject: [PATCH 11/30] grpc-js: Disallow pick_first as child of outlier_detection --- packages/grpc-js/package.json | 2 +- .../grpc-js/src/load-balancer-outlier-detection.ts | 11 +++++++---- packages/grpc-js/src/resolver-dns.ts | 4 ++-- packages/grpc-js/test/test-outlier-detection.ts | 12 +++++++++++- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index f0331f1fc..d4b822600 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.16", + "version": "1.8.17", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 2f72a9625..885c4feb9 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -113,6 +113,9 @@ export class OutlierDetectionLoadBalancingConfig implements LoadBalancingConfig failurePercentageEjection: Partial | null, private readonly childPolicy: LoadBalancingConfig[] ) { + if (childPolicy[0].getLoadBalancerName() === 'pick_first') { + throw new Error('outlier_detection LB policy cannot have a pick_first child policy'); + } this.intervalMs = intervalMs ?? 10_000; this.baseEjectionTimeMs = baseEjectionTimeMs ?? 30_000; this.maxEjectionTimeMs = maxEjectionTimeMs ?? 300_000; @@ -395,8 +398,8 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { } private isCountingEnabled(): boolean { - return this.latestConfig !== null && - (this.latestConfig.getSuccessRateEjectionConfig() !== null || + return this.latestConfig !== null && + (this.latestConfig.getSuccessRateEjectionConfig() !== null || this.latestConfig.getFailurePercentageEjectionConfig() !== null); } @@ -496,7 +499,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { if (addressesWithTargetVolume < failurePercentageConfig.minimum_hosts) { return; } - + // Step 2 for (const [address, mapEntry] of this.addressMap.entries()) { // Step 2.i @@ -656,4 +659,4 @@ export function setup() { if (OUTLIER_DETECTION_ENABLED) { registerLoadBalancerType(TYPE_NAME, OutlierDetectionLoadBalancer, OutlierDetectionLoadBalancingConfig); } -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 355ce2dfd..b20b7a543 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -137,7 +137,7 @@ class DnsResolver implements Resolver { details: `Name resolution failed for target ${uriToString(this.target)}`, metadata: new Metadata(), }; - + const backoffOptions: BackoffOptions = { initialDelay: channelOptions['grpc.initial_reconnect_backoff_ms'], maxDelay: channelOptions['grpc.max_reconnect_backoff_ms'], @@ -276,7 +276,7 @@ class DnsResolver implements Resolver { } catch (err) { this.latestServiceConfigError = { code: Status.UNAVAILABLE, - details: 'Parsing service config failed', + details: `Parsing service config failed with error ${(err as Error).message}`, metadata: new Metadata(), }; } diff --git a/packages/grpc-js/test/test-outlier-detection.ts b/packages/grpc-js/test/test-outlier-detection.ts index c9021e605..d51ccf3fd 100644 --- a/packages/grpc-js/test/test-outlier-detection.ts +++ b/packages/grpc-js/test/test-outlier-detection.ts @@ -360,6 +360,16 @@ describe('Outlier detection config validation', () => { }, /failure_percentage_ejection\.enforcement_percentage parse error: value out of range for percentage/); }); }); + describe('child_policy', () => { + it('Should reject a pick_first child_policy', () => { + const loadBalancingConfig = { + child_policy: [{pick_first: {}}] + }; + assert.throws(() => { + OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig); + }, /outlier_detection LB policy cannot have a pick_first child policy/); + }); + }); }); describe('Outlier detection', () => { @@ -533,4 +543,4 @@ describe('Outlier detection', () => { }) }); }); -}); \ No newline at end of file +}); From 9441de78f655ada34ada0dc1a8057122eb21f229 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 22 Jun 2023 16:52:53 -0700 Subject: [PATCH 12/30] grpc-js-xds: Use distroless Node image for interop Dockerfile --- packages/grpc-js-xds/interop/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/interop/Dockerfile b/packages/grpc-js-xds/interop/Dockerfile index 5987f1ee4..2986eb3e5 100644 --- a/packages/grpc-js-xds/interop/Dockerfile +++ b/packages/grpc-js-xds/interop/Dockerfile @@ -27,7 +27,7 @@ RUN npm install WORKDIR /node/src/grpc-node/packages/grpc-js-xds RUN npm install -FROM node:18-slim +FROM gcr.io/distroless/nodejs18-debian11:latest WORKDIR /node/src/grpc-node COPY --from=build /node/src/grpc-node/packages/grpc-js ./packages/grpc-js/ COPY --from=build /node/src/grpc-node/packages/grpc-js-xds ./packages/grpc-js-xds/ From a62d2b027bf91e5084c9134305e88a645dc5f1c1 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 23 Jun 2023 09:34:29 -0700 Subject: [PATCH 13/30] Use entrypoint /nodejs/bin/node --- packages/grpc-js-xds/interop/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/interop/Dockerfile b/packages/grpc-js-xds/interop/Dockerfile index 2986eb3e5..5ff12a433 100644 --- a/packages/grpc-js-xds/interop/Dockerfile +++ b/packages/grpc-js-xds/interop/Dockerfile @@ -35,4 +35,4 @@ COPY --from=build /node/src/grpc-node/packages/grpc-js-xds ./packages/grpc-js-xd ENV GRPC_VERBOSITY="DEBUG" ENV GRPC_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds,outlier_detection -ENTRYPOINT [ "node", "/node/src/grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client" ] +ENTRYPOINT [ "/nodejs/bin/node", "/node/src/grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client" ] From a6aa7ea43e1458a578cf9ab81ed492a760c43a78 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 23 Jun 2023 10:37:01 -0700 Subject: [PATCH 14/30] Merge pull request #2475 from XuanWang-Amos/file_multiple_url_map [PSM interop] Don't fail target if sub-target already failed --- packages/grpc-js-xds/scripts/xds_k8s_url_map.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh b/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh index 69126c72b..fc74718f2 100644 --- a/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh +++ b/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh @@ -156,7 +156,7 @@ main() { build_docker_images_if_needed # Run tests cd "${TEST_DRIVER_FULL_DIR}" - run_test url_map + run_test url_map || echo "Failed url_map test" } main "$@" From ed70a0b381144b387698f2d57001f5a7bc82cbe9 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 27 Jun 2023 10:11:45 -0700 Subject: [PATCH 15/30] Fix handling of OD policy with no child --- packages/grpc-js/src/load-balancer-outlier-detection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 885c4feb9..ce8668f18 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -113,7 +113,7 @@ export class OutlierDetectionLoadBalancingConfig implements LoadBalancingConfig failurePercentageEjection: Partial | null, private readonly childPolicy: LoadBalancingConfig[] ) { - if (childPolicy[0].getLoadBalancerName() === 'pick_first') { + if (childPolicy.length > 0 && childPolicy[0].getLoadBalancerName() === 'pick_first') { throw new Error('outlier_detection LB policy cannot have a pick_first child policy'); } this.intervalMs = intervalMs ?? 10_000; From 3cef1ba5472d5127b7346ef1533ffb403e6dcef3 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 28 Jun 2023 16:00:22 -0700 Subject: [PATCH 16/30] Merge pull request #2488 from grpc/psm-interop-server-bump grpc-js-xds: Bump the canonical server from v1.46.x to v1.56.0 --- packages/grpc-js-xds/scripts/xds_k8s_lb.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh index 4a83d44d5..729fb9293 100755 --- a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh +++ b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh @@ -19,7 +19,7 @@ set -eo pipefail readonly GITHUB_REPOSITORY_NAME="grpc-node" readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/grpc/${TEST_DRIVER_BRANCH:-master}/tools/internal_ci/linux/grpc_xds_k8s_install_test_driver.sh" ## xDS test client Docker images -readonly SERVER_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/java-server:v1.46.x" +readonly SERVER_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/java-server:558b5b0bfac8e21755c223063274a779b3898afe" readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/node-client" readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}" readonly BUILD_APP_PATH="packages/grpc-js-xds/interop/Dockerfile" From 45e277547f4ad535a6c83d887992f6707f7f816a Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 12 Jul 2023 14:55:49 -0700 Subject: [PATCH 17/30] grpc-js: Fix mistakenly committed testing changes --- packages/grpc-js/src/client.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index bdbdc6e97..7c856ee5b 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -347,13 +347,13 @@ export class Client { code: Status.INTERNAL, details: 'No message received', metadata: status.metadata - }, /*callerStack*/'')); + }, callerStack)); } else { callProperties.callback!(null, responseMessage); } } else { const callerStack = getErrorStackString(callerStackError!); - callProperties.callback!(callErrorFromStatus(status, /*callerStack*/'')); + callProperties.callback!(callErrorFromStatus(status, callerStack)); } /* Avoid retaining the callerStackError object in the call context of * the status event handler. */ From 713a2c9bd1f30ee7f5bab9aabbd5712b3578ee14 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 12 Jul 2023 15:22:15 -0700 Subject: [PATCH 18/30] grpc-js: Enable the noUnusedLocals TypeScript compiler option --- packages/grpc-js/src/channel-credentials.ts | 12 +----- packages/grpc-js/src/client-interceptors.ts | 1 - packages/grpc-js/src/compression-filter.ts | 2 +- packages/grpc-js/src/index.ts | 2 - packages/grpc-js/src/internal-channel.ts | 19 ++++------ .../src/load-balancer-outlier-detection.ts | 4 +- packages/grpc-js/src/load-balancer.ts | 5 +-- packages/grpc-js/src/load-balancing-call.ts | 10 ++--- .../grpc-js/src/max-message-size-filter.ts | 2 +- packages/grpc-js/src/metadata.ts | 5 --- packages/grpc-js/src/object-stream.ts | 2 +- packages/grpc-js/src/resolver-ip.ts | 2 +- packages/grpc-js/src/resolving-call.ts | 4 +- .../grpc-js/src/resolving-load-balancer.ts | 5 +-- packages/grpc-js/src/server-call.ts | 7 +++- packages/grpc-js/src/server.ts | 28 ++++++-------- packages/grpc-js/src/subchannel-call.ts | 8 ---- packages/grpc-js/src/transport.ts | 38 +++++++++---------- .../grpc-js/test/test-call-propagation.ts | 8 ++-- .../grpc-js/test/test-channel-credentials.ts | 7 +--- packages/grpc-js/test/test-channelz.ts | 4 +- packages/grpc-js/test/test-deadline.ts | 8 +--- .../grpc-js/test/test-outlier-detection.ts | 1 - packages/grpc-js/test/test-server.ts | 4 +- packages/grpc-js/tsconfig.json | 3 +- 25 files changed, 70 insertions(+), 121 deletions(-) diff --git a/packages/grpc-js/src/channel-credentials.ts b/packages/grpc-js/src/channel-credentials.ts index fd9d7b571..64db3e9eb 100644 --- a/packages/grpc-js/src/channel-credentials.ts +++ b/packages/grpc-js/src/channel-credentials.ts @@ -38,14 +38,6 @@ export type CheckServerIdentityCallback = ( cert: PeerCertificate ) => Error | undefined; -function bufferOrNullEqual(buf1: Buffer | null, buf2: Buffer | null) { - if (buf1 === null && buf2 === null) { - return true; - } else { - return buf1 !== null && buf2 !== null && buf1.equals(buf2); - } -} - /** * Additional peer verification options that can be set when creating * SSL credentials. @@ -196,7 +188,7 @@ class SecureChannelCredentialsImpl extends ChannelCredentials { private verifyOptions: VerifyOptions ) { super(); - this.connectionOptions = { + this.connectionOptions = { secureContext }; // Node asserts that this option is a function, so we cannot pass undefined @@ -225,7 +217,7 @@ class SecureChannelCredentialsImpl extends ChannelCredentials { } if (other instanceof SecureChannelCredentialsImpl) { return ( - this.secureContext === other.secureContext && + this.secureContext === other.secureContext && this.verifyOptions.checkServerIdentity === other.verifyOptions.checkServerIdentity ); } else { diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts index d95828550..277a14f59 100644 --- a/packages/grpc-js/src/client-interceptors.ts +++ b/packages/grpc-js/src/client-interceptors.ts @@ -32,7 +32,6 @@ import { import { Status } from './constants'; import { Channel } from './channel'; import { CallOptions } from './client'; -import { CallCredentials } from './call-credentials'; import { ClientMethodDefinition } from './make-client'; import { getErrorMessage } from './error'; diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index f87614114..b8c2e6d1e 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -282,7 +282,7 @@ export class CompressionFilter extends BaseFilter implements Filter { export class CompressionFilterFactory implements FilterFactory { private sharedFilterConfig: SharedCompressionFilterConfig = {}; - constructor(private readonly channel: Channel, private readonly options: ChannelOptions) {} + constructor(channel: Channel, private readonly options: ChannelOptions) {} createFilter(): CompressionFilter { return new CompressionFilter(this.options, this.sharedFilterConfig); } diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index 51f394785..70178a89e 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -43,9 +43,7 @@ import { loadPackageDefinition, makeClientConstructor, MethodDefinition, - ProtobufTypeDefinition, Serialize, - ServiceClientConstructor, ServiceDefinition, } from './make-client'; import { Metadata, MetadataOptions, MetadataValue } from './metadata'; diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 14038bd3f..3d2624895 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options'; import { ResolvingLoadBalancer } from './resolving-load-balancer'; import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; import { ChannelControlHelper } from './load-balancer'; -import { UnavailablePicker, Picker, PickResultType } from './picker'; +import { UnavailablePicker, Picker } from './picker'; import { Metadata } from './metadata'; import { Status, LogVerbosity, Propagate } from './constants'; import { FilterStackFactory } from './filter-stack'; @@ -31,22 +31,19 @@ import { getDefaultAuthority, mapUriDefaultScheme, } from './resolver'; -import { trace, log } from './logging'; +import { trace } from './logging'; import { SubchannelAddress } from './subchannel-address'; import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; import { mapProxyName } from './http_proxy'; -import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; +import { GrpcUri, parseUri, uriToString } from './uri-parser'; import { ServerSurfaceCall } from './server-call'; -import { Filter } from './filter'; import { ConnectivityState } from './connectivity-state'; import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz'; -import { Subchannel } from './subchannel'; import { LoadBalancingCall } from './load-balancing-call'; import { CallCredentials } from './call-credentials'; -import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from './call-interface'; -import { SubchannelCall } from './subchannel-call'; -import { Deadline, deadlineToString, getDeadlineTimeoutString } from './deadline'; +import { Call, CallStreamOptions, StatusObject } from './call-interface'; +import { Deadline, deadlineToString } from './deadline'; import { ResolvingCall } from './resolving-call'; import { getNextCallNumber } from './call-number'; import { restrictControlPlaneStatusCode } from './control-plane-status'; @@ -112,7 +109,7 @@ class ChannelSubchannelWrapper extends BaseSubchannelWrapper implements Subchann } export class InternalChannel { - + private resolvingLoadBalancer: ResolvingLoadBalancer; private subchannelPool: SubchannelPool; private connectivityState: ConnectivityState = ConnectivityState.IDLE; @@ -376,7 +373,7 @@ export class InternalChannel { trace( LogVerbosity.DEBUG, 'connectivity_state', - '(' + this.channelzRef.id + ') ' + + '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + ConnectivityState[this.connectivityState] + @@ -601,7 +598,7 @@ export class InternalChannel { /** * Get the channelz reference object for this channel. The returned value is * garbage if channelz is disabled for this channel. - * @returns + * @returns */ getChannelzRef() { return this.channelzRef; diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index ce8668f18..7297000d0 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -20,11 +20,9 @@ import { ConnectivityState } from "./connectivity-state"; import { LogVerbosity, Status } from "./constants"; import { durationToMs, isDuration, msToDuration } from "./duration"; import { ChannelControlHelper, createChildChannelControlHelper, registerLoadBalancerType } from "./experimental"; -import { BaseFilter, Filter, FilterFactory } from "./filter"; import { getFirstUsableConfig, LoadBalancer, LoadBalancingConfig, validateLoadBalancingConfig } from "./load-balancer"; import { ChildLoadBalancerHandler } from "./load-balancer-child-handler"; -import { PickArgs, Picker, PickResult, PickResultType, QueuePicker, UnavailablePicker } from "./picker"; -import { Subchannel } from "./subchannel"; +import { PickArgs, Picker, PickResult, PickResultType } from "./picker"; import { SubchannelAddress, subchannelAddressToString } from "./subchannel-address"; import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from "./subchannel-interface"; import * as logging from './logging'; diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index 48930c7db..8d1c96981 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -16,7 +16,6 @@ */ import { ChannelOptions } from './channel-options'; -import { Subchannel } from './subchannel'; import { SubchannelAddress } from './subchannel-address'; import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; @@ -58,8 +57,8 @@ export interface ChannelControlHelper { * parent while letting others pass through to the parent unmodified. This * allows other code to create these children without needing to know about * all of the methods to be passed through. - * @param parent - * @param overrides + * @param parent + * @param overrides */ export function createChildChannelControlHelper(parent: ChannelControlHelper, overrides: Partial): ChannelControlHelper { return { diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts index d88bdb809..e9144cb93 100644 --- a/packages/grpc-js/src/load-balancing-call.ts +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -21,7 +21,6 @@ import { SubchannelCall } from "./subchannel-call"; import { ConnectivityState } from "./connectivity-state"; import { LogVerbosity, Status } from "./constants"; import { Deadline, getDeadlineTimeoutString } from "./deadline"; -import { FilterStack, FilterStackFactory } from "./filter-stack"; import { InternalChannel } from "./internal-channel"; import { Metadata } from "./metadata"; import { PickResultType } from "./picker"; @@ -48,7 +47,6 @@ export class LoadBalancingCall implements Call { private readPending = false; private pendingMessage: {context: MessageContext, message: Buffer} | null = null; private pendingHalfClose = false; - private pendingChildStatus: StatusObject | null = null; private ended = false; private serviceUrl: string; private metadata: Metadata | null = null; @@ -104,9 +102,9 @@ export class LoadBalancingCall implements Call { } this.trace('Pick called') const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation); - const subchannelString = pickResult.subchannel ? - '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : - '' + pickResult.subchannel; + const subchannelString = pickResult.subchannel ? + '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : + '' + pickResult.subchannel; this.trace( 'Pick result: ' + PickResultType[pickResult.pickResultType] + @@ -280,4 +278,4 @@ export class LoadBalancingCall implements Call { getCallNumber(): number { return this.callNumber; } -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/max-message-size-filter.ts b/packages/grpc-js/src/max-message-size-filter.ts index 62d01077c..25e4fdc03 100644 --- a/packages/grpc-js/src/max-message-size-filter.ts +++ b/packages/grpc-js/src/max-message-size-filter.ts @@ -29,7 +29,7 @@ export class MaxMessageSizeFilter extends BaseFilter implements Filter { private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH; private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; constructor( - private readonly options: ChannelOptions + options: ChannelOptions ) { super(); if ('grpc.max_send_message_length' in options) { diff --git a/packages/grpc-js/src/metadata.ts b/packages/grpc-js/src/metadata.ts index 0dddd9465..bfef51b0d 100644 --- a/packages/grpc-js/src/metadata.ts +++ b/packages/grpc-js/src/metadata.ts @@ -229,11 +229,6 @@ export class Metadata { return result; } - // For compatibility with the other Metadata implementation - private _getCoreRepresentation() { - return this.internalRepr; - } - /** * This modifies the behavior of JSON.stringify to show an object * representation of the metadata map. diff --git a/packages/grpc-js/src/object-stream.ts b/packages/grpc-js/src/object-stream.ts index 22ab8a41f..9289b9d84 100644 --- a/packages/grpc-js/src/object-stream.ts +++ b/packages/grpc-js/src/object-stream.ts @@ -15,7 +15,7 @@ * */ -import { Duplex, Readable, Writable } from 'stream'; +import { Readable, Writable } from 'stream'; import { EmitterAugmentation1 } from './events'; /* eslint-disable @typescript-eslint/no-explicit-any */ diff --git a/packages/grpc-js/src/resolver-ip.ts b/packages/grpc-js/src/resolver-ip.ts index efb0b8dcb..0704131e1 100644 --- a/packages/grpc-js/src/resolver-ip.ts +++ b/packages/grpc-js/src/resolver-ip.ts @@ -42,7 +42,7 @@ class IpResolver implements Resolver { private addresses: SubchannelAddress[] = []; private error: StatusObject | null = null; constructor( - private target: GrpcUri, + target: GrpcUri, private listener: ResolverListener, channelOptions: ChannelOptions ) { diff --git a/packages/grpc-js/src/resolving-call.ts b/packages/grpc-js/src/resolving-call.ts index f29fb7fd7..683fed5bc 100644 --- a/packages/grpc-js/src/resolving-call.ts +++ b/packages/grpc-js/src/resolving-call.ts @@ -18,7 +18,7 @@ import { CallCredentials } from "./call-credentials"; import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface"; import { LogVerbosity, Propagate, Status } from "./constants"; -import { Deadline, deadlineToString, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline"; +import { Deadline, deadlineToString, getRelativeTimeout, minDeadline } from "./deadline"; import { FilterStack, FilterStackFactory } from "./filter-stack"; import { InternalChannel } from "./internal-channel"; import { Metadata } from "./metadata"; @@ -276,4 +276,4 @@ export class ResolvingCall implements Call { getCallNumber(): number { return this.callNumber; } -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index a39606f2c..5194bef46 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -36,7 +36,6 @@ import { SubchannelAddress } from './subchannel-address'; import { GrpcUri, uriToString } from './uri-parser'; import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; import { ChannelOptions } from './channel-options'; -import { PickFirstLoadBalancingConfig } from './load-balancer-pick-first'; const TRACER_NAME = 'resolving_load_balancer'; @@ -44,8 +43,6 @@ function trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); } -const DEFAULT_LOAD_BALANCER_NAME = 'pick_first'; - function getDefaultConfigSelector( serviceConfig: ServiceConfig | null ): ConfigSelector { @@ -137,7 +134,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { constructor( private readonly target: GrpcUri, private readonly channelControlHelper: ChannelControlHelper, - private readonly channelOptions: ChannelOptions, + channelOptions: ChannelOptions, private readonly onSuccessfulResolution: ResolutionCallback, private readonly onFailedResolution: ResolutionFailureCallback ) { diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 48186bc29..fc840bdf9 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -268,6 +268,9 @@ export class ServerDuplexStreamImpl implements ServerDuplexStream { cancelled: boolean; + /* This field appears to be unsued, but it is actually used in _final, which is assiged from + * ServerWritableStreamImpl.prototype._final below. */ + // @ts-ignore noUnusedLocals private trailingMetadata: Metadata; constructor( @@ -419,7 +422,7 @@ export class Http2ServerCallStream< constructor( private stream: http2.ServerHttp2Stream, private handler: Handler, - private options: ChannelOptions + options: ChannelOptions ) { super(); @@ -720,7 +723,7 @@ export class Http2ServerCallStream< [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details), ...statusObj.metadata?.toHttp2Headers(), }; - + this.stream.sendTrailers(trailersToSend); this.statusSent = true; }); diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index d19186a75..1a01b30dc 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -141,10 +141,6 @@ interface ChannelzSessionInfo { lastMessageReceivedTimestamp: Date | null; } -interface ChannelzListenerInfo { - ref: SocketRef; -} - export class Server { private http2ServerList: { server: (http2.Http2Server | http2.Http2SecureServer), channelzRef: SocketRef }[] = []; @@ -242,7 +238,7 @@ export class Server { private trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text); } - + addProtoService(): never { throw new Error('Not implemented. Use addService() instead'); @@ -743,7 +739,7 @@ export class Server { /** * Get the channelz reference object for this server. The returned value is * garbage if channelz is disabled for this server. - * @returns + * @returns */ getChannelzRef() { return this.channelzRef; @@ -792,14 +788,14 @@ export class Server { return handler } - + private _respondWithError>( - err: T, - stream: http2.ServerHttp2Stream, + err: T, + stream: http2.ServerHttp2Stream, channelzSessionInfo: ChannelzSessionInfo | null = null ) { const call = new Http2ServerCallStream(stream, null!, this.options); - + if (err.code === undefined) { err.code = Status.INTERNAL; } @@ -814,7 +810,7 @@ export class Server { private _channelzHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) { const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session); - + this.callTracker.addCallStarted(); channelzSessionInfo?.streamTracker.addCallStarted(); @@ -834,9 +830,9 @@ export class Server { }, stream, channelzSessionInfo) return } - + const call = new Http2ServerCallStream(stream, handler, this.options); - + call.once('callEnd', (code: Status) => { if (code === Status.OK) { this.callTracker.addCallSucceeded(); @@ -844,7 +840,7 @@ export class Server { this.callTracker.addCallFailed(); } }); - + if (channelzSessionInfo) { call.once('streamEnd', (success: boolean) => { if (success) { @@ -954,8 +950,8 @@ export class Server { } this.serverAddressString = serverAddressString - const handler = this.channelzEnabled - ? this._channelzHandler + const handler = this.channelzEnabled + ? this._channelzHandler : this._streamHandler http2Server.on('stream', handler.bind(this)) diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index 969282e19..f9c24f6bd 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -23,19 +23,11 @@ import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; import * as logging from './logging'; import { LogVerbosity } from './constants'; -import { ServerSurfaceCall } from './server-call'; -import { Deadline } from './deadline'; import { InterceptingListener, MessageContext, StatusObject, WriteCallback } from './call-interface'; import { CallEventTracker, Transport } from './transport'; const TRACER_NAME = 'subchannel_call'; -const { - HTTP2_HEADER_STATUS, - HTTP2_HEADER_CONTENT_TYPE, - NGHTTP2_CANCEL, -} = http2.constants; - /** * https://nodejs.org/api/errors.html#errors_class_systemerror */ diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 2f89d8f28..36176d643 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -46,10 +46,6 @@ const { HTTP2_HEADER_USER_AGENT, } = http2.constants; -/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't - * have a constant for the max signed 32 bit integer, so this is a simple way - * to calculate it */ -const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); const KEEPALIVE_TIMEOUT_MS = 20000; export interface CallEventTracker { @@ -108,11 +104,6 @@ class Http2Transport implements Transport { // Channelz info private channelzRef: SocketRef; private readonly channelzEnabled: boolean = true; - /** - * Name of the remote server, if it is not the same as the subchannel - * address, i.e. if connecting through an HTTP CONNECT proxy. - */ - private remoteName: string | null = null; private streamTracker = new ChannelzCallTracker(); private keepalivesSent = 0; private messagesSent = 0; @@ -123,7 +114,12 @@ class Http2Transport implements Transport { constructor( private session: http2.ClientHttp2Session, subchannelAddress: SubchannelAddress, - options: ChannelOptions + options: ChannelOptions, + /** + * Name of the remote server, if it is not the same as the subchannel + * address, i.e. if connecting through an HTTP CONNECT proxy. + */ + private remoteName: string | null ) { // Build user-agent string. this.userAgent = [ @@ -133,7 +129,7 @@ class Http2Transport implements Transport { ] .filter((e) => e) .join(' '); // remove falsey values first - + if ('grpc.keepalive_time_ms' in options) { this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; } @@ -271,7 +267,7 @@ class Http2Transport implements Transport { * @param tooManyPings If true, this was triggered by a GOAWAY with data * indicating that the session was closed becaues the client sent too many * pings. - * @returns + * @returns */ private reportDisconnectToOwner(tooManyPings: boolean) { if (this.disconnectHandled) { @@ -405,11 +401,11 @@ class Http2Transport implements Transport { this.session.state.remoteWindowSize ); this.internalsTrace( - 'session.closed=' + - this.session.closed + - ' session.destroyed=' + - this.session.destroyed + - ' session.socket.destroyed=' + + 'session.closed=' + + this.session.closed + + ' session.destroyed=' + + this.session.destroyed + + ' session.socket.destroyed=' + this.session.socket.destroyed); let eventTracker: CallEventTracker; let call: Http2SubchannelCall; @@ -565,12 +561,12 @@ export class Http2SubchannelConnector implements SubchannelConnector { } }; } - + connectionOptions = { ...connectionOptions, ...address, }; - + /* http2.connect uses the options here: * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036 * The spread operator overides earlier values with later ones, so any port @@ -596,7 +592,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { session.unref(); session.once('connect', () => { session.removeAllListeners(); - resolve(new Http2Transport(session, address, options)); + resolve(new Http2Transport(session, address, options, remoteName)); this.session = null; }); session.once('close', () => { @@ -666,4 +662,4 @@ export class Http2SubchannelConnector implements SubchannelConnector { this.session?.close(); this.session = null; } -} \ No newline at end of file +} diff --git a/packages/grpc-js/test/test-call-propagation.ts b/packages/grpc-js/test/test-call-propagation.ts index 3ce57be17..4a2619f1f 100644 --- a/packages/grpc-js/test/test-call-propagation.ts +++ b/packages/grpc-js/test/test-call-propagation.ts @@ -165,7 +165,6 @@ describe('Call propagation', () => { describe('Deadlines', () => { it('should work with unary requests', (done) => { done = multiDone(done, 2); - let call: grpc.ClientUnaryCall; proxyServer.addService(Client.service, { unary: (parent: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { client.unary(parent.request, {parent: parent, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { @@ -178,7 +177,7 @@ describe('Call propagation', () => { }); const deadline = new Date(); deadline.setMilliseconds(deadline.getMilliseconds() + 100); - call = proxyClient.unary({}, {deadline}, (error: grpc.ServiceError, value: unknown) => { + proxyClient.unary({}, {deadline}, (error: grpc.ServiceError, value: unknown) => { assert(error); assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); done(); @@ -186,7 +185,6 @@ describe('Call propagation', () => { }); it('Should work with client streaming requests', (done) => { done = multiDone(done, 2); - let call: grpc.ClientWritableStream; proxyServer.addService(Client.service, { clientStream: (parent: grpc.ServerReadableStream, callback: grpc.sendUnaryData) => { client.clientStream({parent: parent, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { @@ -199,7 +197,7 @@ describe('Call propagation', () => { }); const deadline = new Date(); deadline.setMilliseconds(deadline.getMilliseconds() + 100); - call = proxyClient.clientStream({deadline, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { + proxyClient.clientStream({deadline, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { assert(error); assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); done(); @@ -250,4 +248,4 @@ describe('Call propagation', () => { }); }); }); -}); \ No newline at end of file +}); diff --git a/packages/grpc-js/test/test-channel-credentials.ts b/packages/grpc-js/test/test-channel-credentials.ts index 2b537ac97..62e4c19a1 100644 --- a/packages/grpc-js/test/test-channel-credentials.ts +++ b/packages/grpc-js/test/test-channel-credentials.ts @@ -19,14 +19,11 @@ import * as assert from 'assert'; import * as fs from 'fs'; import * as path from 'path'; import { promisify } from 'util'; -import * as protoLoader from '@grpc/proto-loader'; import { CallCredentials } from '../src/call-credentials'; import { ChannelCredentials } from '../src/channel-credentials'; import * as grpc from '../src'; import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; -import { TestServiceClient, TestServiceHandlers } from './generated/TestService'; -import { ProtoGrpcType as TestServiceGrpcType } from './generated/test_service'; import { assert2, loadProtoFile, mockFunction } from './common'; import { sendUnaryData, ServerUnaryCall, ServiceError } from '../src'; @@ -171,7 +168,7 @@ describe('ChannelCredentials usage', () => { callback(null, call.request); }, }); - + server.bindAsync( 'localhost:0', serverCreds, @@ -209,4 +206,4 @@ describe('ChannelCredentials usage', () => { })); assert2.afterMustCallsSatisfied(done); }); -}); \ No newline at end of file +}); diff --git a/packages/grpc-js/test/test-channelz.ts b/packages/grpc-js/test/test-channelz.ts index f14145c37..26c15d47d 100644 --- a/packages/grpc-js/test/test-channelz.ts +++ b/packages/grpc-js/test/test-channelz.ts @@ -21,8 +21,6 @@ import * as grpc from '../src'; import { ProtoGrpcType } from '../src/generated/channelz' import { ChannelzClient } from '../src/generated/grpc/channelz/v1/Channelz'; -import { Channel__Output } from '../src/generated/grpc/channelz/v1/Channel'; -import { Server__Output } from '../src/generated/grpc/channelz/v1/Server'; import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; import { loadProtoFile } from './common'; @@ -318,4 +316,4 @@ describe('Disabling channelz', () => { done(); }); }); -}); \ No newline at end of file +}); diff --git a/packages/grpc-js/test/test-deadline.ts b/packages/grpc-js/test/test-deadline.ts index bb6b3ba9b..d117fc52a 100644 --- a/packages/grpc-js/test/test-deadline.ts +++ b/packages/grpc-js/test/test-deadline.ts @@ -19,14 +19,10 @@ import * as assert from 'assert'; import * as grpc from '../src'; import { experimental } from '../src'; -import { ServerCredentials } from '../src'; import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; import { loadProtoFile } from './common'; import ServiceConfig = experimental.ServiceConfig; -const clientInsecureCreds = grpc.credentials.createInsecure(); -const serverInsecureCreds = ServerCredentials.createInsecure(); - const TIMEOUT_SERVICE_CONFIG: ServiceConfig = { loadBalancingConfig: [], methodConfig: [{ @@ -44,7 +40,7 @@ describe('Client with configured timeout', () => { let server: grpc.Server; let Client: ServiceClientConstructor; let client: ServiceClient; - + before(done => { Client = loadProtoFile(__dirname + '/fixtures/test_service.proto').TestService as ServiceClientConstructor; server = new grpc.Server(); @@ -87,4 +83,4 @@ describe('Client with configured timeout', () => { done(); }); }); -}); \ No newline at end of file +}); diff --git a/packages/grpc-js/test/test-outlier-detection.ts b/packages/grpc-js/test/test-outlier-detection.ts index d51ccf3fd..a91350fd7 100644 --- a/packages/grpc-js/test/test-outlier-detection.ts +++ b/packages/grpc-js/test/test-outlier-detection.ts @@ -20,7 +20,6 @@ import * as path from 'path'; import * as grpc from '../src'; import { loadProtoFile } from './common'; import { OutlierDetectionLoadBalancingConfig } from '../src/load-balancer-outlier-detection' -import { ServiceClient } from '../src/make-client'; function multiDone(done: Mocha.Done, target: number) { let count = 0; diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index c67ebc4d6..d67307f61 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -686,7 +686,7 @@ describe('Compressed requests', () => { }, ServerStream(call) { - const { metadata, request } = call; + const { request } = call; for (let i = 0; i < 5; i++) { call.write({ count: request.message.length }); @@ -908,7 +908,7 @@ describe('Compressed requests', () => { done(); }) }) - + /* As of Node 16, Writable and Duplex streams validate the encoding * argument to write, and the flags values we are passing there are not * valid. We don't currently have an alternative way to pass that flag diff --git a/packages/grpc-js/tsconfig.json b/packages/grpc-js/tsconfig.json index 310b633c7..5f955a982 100644 --- a/packages/grpc-js/tsconfig.json +++ b/packages/grpc-js/tsconfig.json @@ -7,7 +7,8 @@ "module": "commonjs", "resolveJsonModule": true, "incremental": true, - "types": ["mocha"] + "types": ["mocha"], + "noUnusedLocals": true }, "include": [ "src/**/*.ts", From 493cbaaf45cf6bd47a1b877253c47e9148551058 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 12 Jul 2023 15:23:34 -0700 Subject: [PATCH 19/30] grpc-js: Increment version to 1.8.18 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index d4b822600..a001e617f 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.17", + "version": "1.8.18", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", From 2e9060385c6578a8e69687c917015b9eaf475f25 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 24 Jul 2023 11:20:00 -0700 Subject: [PATCH 20/30] grpc-js: Fix keepalive ping timing after inactivity --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/transport.ts | 77 +++++++++++++++++++++---------- 2 files changed, 53 insertions(+), 26 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index a001e617f..33096012c 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.18", + "version": "1.8.19", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 36176d643..a3408d836 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -81,7 +81,12 @@ class Http2Transport implements Transport { /** * Timer reference for timeout that indicates when to send the next ping */ - private keepaliveIntervalId: NodeJS.Timer; + private keepaliveTimerId: NodeJS.Timer | null = null; + /** + * Indicates that the keepalive timer ran out while there were no active + * calls, and a ping should be sent the next time a call starts. + */ + private pendingSendKeepalivePing = false; /** * Timer reference tracking when the most recent ping will be considered lost */ @@ -142,10 +147,8 @@ class Http2Transport implements Transport { } else { this.keepaliveWithoutCalls = false; } - this.keepaliveIntervalId = setTimeout(() => {}, 0); - clearTimeout(this.keepaliveIntervalId); if (this.keepaliveWithoutCalls) { - this.startKeepalivePings(); + this.maybeStartKeepalivePingTimer(); } this.subchannelAddressString = subchannelAddressToString(subchannelAddress); @@ -295,6 +298,14 @@ class Http2Transport implements Transport { this.disconnectListeners.push(listener); } + private clearKeepaliveTimer() { + if (!this.keepaliveTimerId) { + return; + } + clearTimeout(this.keepaliveTimerId); + this.keepaliveTimerId = null; + } + private clearKeepaliveTimeout() { if (!this.keepaliveTimeoutId) { return; @@ -303,7 +314,16 @@ class Http2Transport implements Transport { this.keepaliveTimeoutId = null; } - private sendPing() { + private canSendPing() { + return this.keepaliveTimeMs > 0 && (this.keepaliveWithoutCalls || this.activeCalls.size > 0); + } + + private maybeSendPing() { + this.clearKeepaliveTimer(); + if (!this.canSendPing()) { + this.pendingSendKeepalivePing = true; + return; + } if (this.channelzEnabled) { this.keepalivesSent += 1; } @@ -320,6 +340,7 @@ class Http2Transport implements Transport { (err: Error | null, duration: number, payload: Buffer) => { this.keepaliveTrace('Received ping response'); this.clearKeepaliveTimeout(); + this.maybeStartKeepalivePingTimer(); } ); } catch (e) { @@ -329,25 +350,34 @@ class Http2Transport implements Transport { } } - private startKeepalivePings() { - if (this.keepaliveTimeMs < 0) { + /** + * Starts the keepalive ping timer if appropriate. If the timer already ran + * out while there were no active requests, instead send a ping immediately. + * If the ping timer is already running or a ping is currently in flight, + * instead do nothing and wait for them to resolve. + */ + private maybeStartKeepalivePingTimer() { + if (!this.canSendPing()) { return; } - this.keepaliveIntervalId = setInterval(() => { - this.sendPing(); - }, this.keepaliveTimeMs); - this.keepaliveIntervalId.unref?.(); - /* Don't send a ping immediately because whatever caused us to start - * sending pings should also involve some network activity. */ + if (this.pendingSendKeepalivePing) { + this.pendingSendKeepalivePing = false; + this.maybeSendPing(); + } else if (!this.keepaliveTimerId && !this.keepaliveTimeoutId) { + this.keepaliveTrace('Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms'); + this.keepaliveTimerId = setTimeout(() => { + this.maybeSendPing(); + }, this.keepaliveTimeMs).unref?.(); + } + /* Otherwise, there is already either a keepalive timer or a ping pending, + * wait for those to resolve. */ } - /** - * Stop keepalive pings when terminating a connection. This discards the - * outstanding ping timeout, so it should not be called if the same - * connection will still be used. - */ private stopKeepalivePings() { - clearInterval(this.keepaliveIntervalId); + if (this.keepaliveTimerId) { + clearTimeout(this.keepaliveTimerId); + this.keepaliveTimerId = null; + } this.clearKeepaliveTimeout(); } @@ -355,20 +385,17 @@ class Http2Transport implements Transport { this.activeCalls.delete(call); if (this.activeCalls.size === 0) { this.session.unref(); - if (!this.keepaliveWithoutCalls) { - this.stopKeepalivePings(); - } } } private addActiveCall(call: Http2SubchannelCall) { - if (this.activeCalls.size === 0) { + this.activeCalls.add(call); + if (this.activeCalls.size === 1) { this.session.ref(); if (!this.keepaliveWithoutCalls) { - this.startKeepalivePings(); + this.maybeStartKeepalivePingTimer(); } } - this.activeCalls.add(call); } createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial): Http2SubchannelCall { From 42a02749eb4a067dbdb84fc111ab10ef09bc2ec4 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 24 Jul 2023 13:08:55 -0700 Subject: [PATCH 21/30] grpc-js: Fix compilation error from new @types/node version --- packages/grpc-js/src/server-call.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index fc840bdf9..ff0a10336 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -954,8 +954,8 @@ export class Http2ServerCallStream< } getPeer(): string { - const socket = this.stream.session.socket; - if (socket.remoteAddress) { + const socket = this.stream.session?.socket; + if (socket?.remoteAddress) { if (socket.remotePort) { return `${socket.remoteAddress}:${socket.remotePort}`; } else { From 6d979565492916de8106b6d7f6bfbc1f1fb63cf4 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 25 Jul 2023 09:36:58 -0700 Subject: [PATCH 22/30] grpc-js: Fix a crash when grpc.keepalive_permit_without_calls is set --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/transport.ts | 23 +++++++++++++---------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 33096012c..93b2f7826 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.19", + "version": "1.8.20", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index a3408d836..4f26e96e2 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -126,6 +126,14 @@ class Http2Transport implements Transport { */ private remoteName: string | null ) { + /* Populate subchannelAddressString and channelzRef before doing anything + * else, because they are used in the trace methods. */ + this.subchannelAddressString = subchannelAddressToString(subchannelAddress); + + if (options['grpc.enable_channelz'] === 0) { + this.channelzEnabled = false; + } + this.channelzRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled); // Build user-agent string. this.userAgent = [ options['grpc.primary_user_agent'], @@ -147,16 +155,6 @@ class Http2Transport implements Transport { } else { this.keepaliveWithoutCalls = false; } - if (this.keepaliveWithoutCalls) { - this.maybeStartKeepalivePingTimer(); - } - - this.subchannelAddressString = subchannelAddressToString(subchannelAddress); - - if (options['grpc.enable_channelz'] === 0) { - this.channelzEnabled = false; - } - this.channelzRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled); session.once('close', () => { this.trace('session closed'); @@ -205,6 +203,11 @@ class Http2Transport implements Transport { ); }); } + /* Start the keepalive timer last, because this can trigger trace logs, + * which should only happen after everything else is set up. */ + if (this.keepaliveWithoutCalls) { + this.maybeStartKeepalivePingTimer(); + } } private getChannelzInfo(): SocketInfo { From 4e111e77921096b1190873298f8f44cab1bd47a0 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 28 Jul 2023 14:21:19 -0700 Subject: [PATCH 23/30] grpc-js: Fix propagation of UNIMPLEMENTED error messages --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/server.ts | 38 +++++------- packages/grpc-js/test/test-server.ts | 90 ++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 24 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 93b2f7826..29c2a00ee 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.20", + "version": "1.8.21", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 1a01b30dc..9853ddf56 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -61,7 +61,6 @@ import { import { parseUri } from './uri-parser'; import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz'; import { CipherNameAndProtocol, TLSSocket } from 'tls'; -import { getErrorCode, getErrorMessage } from './error'; const UNLIMITED_CONNECTION_AGE_MS = ~(1<<31); const KEEPALIVE_MAX_TIME_MS = ~(1<<31); @@ -765,9 +764,7 @@ export class Server { return true } - private _retrieveHandler(headers: http2.IncomingHttpHeaders): Handler { - const path = headers[HTTP2_HEADER_PATH] as string - + private _retrieveHandler(path: string): Handler | null { this.trace( 'Received call to method ' + path + @@ -783,7 +780,7 @@ export class Server { path + '. Sending UNIMPLEMENTED status.' ); - throw getUnimplementedStatusResponse(path); + return null; } return handler @@ -820,15 +817,12 @@ export class Server { return } - let handler: Handler - try { - handler = this._retrieveHandler(headers) - } catch (err) { - this._respondWithError({ - details: getErrorMessage(err), - code: getErrorCode(err) ?? undefined - }, stream, channelzSessionInfo) - return + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError(getUnimplementedStatusResponse(path), stream, channelzSessionInfo); + return; } const call = new Http2ServerCallStream(stream, handler, this.options); @@ -875,15 +869,13 @@ export class Server { return } - let handler: Handler - try { - handler = this._retrieveHandler(headers) - } catch (err) { - this._respondWithError({ - details: getErrorMessage(err), - code: getErrorCode(err) ?? undefined - }, stream, null) - return + + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError(getUnimplementedStatusResponse(path), stream, null); + return; } const call = new Http2ServerCallStream(stream, handler, this.options) diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index d67307f61..68890df55 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -408,6 +408,7 @@ describe('Server', () => { (error: ServiceError, response: any) => { assert(error); assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED); + assert.match(error.details, /does not implement the method.*Div/); done(); } ); @@ -417,6 +418,7 @@ describe('Server', () => { const call = client.sum((error: ServiceError, response: any) => { assert(error); assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED); + assert.match(error.details, /does not implement the method.*Sum/); done(); }); @@ -433,6 +435,7 @@ describe('Server', () => { call.on('error', (err: ServiceError) => { assert(err); assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + assert.match(err.details, /does not implement the method.*Fib/); done(); }); }); @@ -447,6 +450,93 @@ describe('Server', () => { call.on('error', (err: ServiceError) => { assert(err); assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + assert.match(err.details, /does not implement the method.*DivMany/); + done(); + }); + + call.end(); + }); + }); + + describe('Unregistered service', () => { + let server: Server; + let client: ServiceClient; + + const mathProtoFile = path.join(__dirname, 'fixtures', 'math.proto'); + const mathClient = (loadProtoFile(mathProtoFile).math as any).Math; + + before(done => { + server = new Server(); + // Don't register a service at all + server.bindAsync( + 'localhost:0', + ServerCredentials.createInsecure(), + (err, port) => { + assert.ifError(err); + client = new mathClient( + `localhost:${port}`, + grpc.credentials.createInsecure() + ); + server.start(); + done(); + } + ); + }); + + after(done => { + client.close(); + server.tryShutdown(done); + }); + + it('should respond to a unary call with UNIMPLEMENTED', done => { + client.div( + { divisor: 4, dividend: 3 }, + (error: ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED); + assert.match(error.details, /does not implement the method.*Div/); + done(); + } + ); + }); + + it('should respond to a client stream with UNIMPLEMENTED', done => { + const call = client.sum((error: ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED); + assert.match(error.details, /does not implement the method.*Sum/); + done(); + }); + + call.end(); + }); + + it('should respond to a server stream with UNIMPLEMENTED', done => { + const call = client.fib({ limit: 5 }); + + call.on('data', (value: any) => { + assert.fail('No messages expected'); + }); + + call.on('error', (err: ServiceError) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + assert.match(err.details, /does not implement the method.*Fib/); + done(); + }); + }); + + it('should respond to a bidi call with UNIMPLEMENTED', done => { + const call = client.divMany(); + + call.on('data', (value: any) => { + assert.fail('No messages expected'); + }); + + call.on('error', (err: ServiceError) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + assert.match(err.details, /does not implement the method.*DivMany/); done(); }); From ec687f6fb108a40cd7ee99a0caa947d4fbb49062 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 14 Aug 2023 13:58:03 -0700 Subject: [PATCH 24/30] grpc-js: Switch Timer type to Timeout (1.8.x) --- packages/grpc-js/src/backoff-timeout.ts | 2 +- packages/grpc-js/src/internal-channel.ts | 2 +- packages/grpc-js/src/load-balancer-outlier-detection.ts | 2 +- packages/grpc-js/src/resolver-dns.ts | 2 +- packages/grpc-js/src/resolving-call.ts | 2 +- packages/grpc-js/src/retrying-call.ts | 6 +++--- packages/grpc-js/src/server-call.ts | 2 +- packages/grpc-js/src/server.ts | 6 +++--- packages/grpc-js/src/subchannel-pool.ts | 2 +- packages/grpc-js/src/transport.ts | 4 ++-- 10 files changed, 15 insertions(+), 15 deletions(-) diff --git a/packages/grpc-js/src/backoff-timeout.ts b/packages/grpc-js/src/backoff-timeout.ts index f523e259a..3ffd26064 100644 --- a/packages/grpc-js/src/backoff-timeout.ts +++ b/packages/grpc-js/src/backoff-timeout.ts @@ -63,7 +63,7 @@ export class BackoffTimeout { * to an object representing a timer that has ended, but it can still be * interacted with without error. */ - private timerId: NodeJS.Timer; + private timerId: NodeJS.Timeout; /** * Indicates whether the timer is currently running. */ diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 3d2624895..38646a0cd 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -131,7 +131,7 @@ export class InternalChannel { * the invariant is that callRefTimer is reffed if and only if pickQueue * is non-empty. */ - private callRefTimer: NodeJS.Timer; + private callRefTimer: NodeJS.Timeout; private configSelector: ConfigSelector | null = null; /** * This is the error from the name resolver if it failed most recently. It diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 7297000d0..2cba9d14a 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -367,7 +367,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; private addressMap: Map = new Map(); private latestConfig: OutlierDetectionLoadBalancingConfig | null = null; - private ejectionTimer: NodeJS.Timer; + private ejectionTimer: NodeJS.Timeout; private timerStartTime: Date | null = null; constructor(channelControlHelper: ChannelControlHelper) { diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index b20b7a543..775a25c36 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -96,7 +96,7 @@ class DnsResolver implements Resolver { private defaultResolutionError: StatusObject; private backoff: BackoffTimeout; private continueResolving = false; - private nextResolutionTimer: NodeJS.Timer; + private nextResolutionTimer: NodeJS.Timeout; private isNextResolutionTimerRunning = false; private isServiceConfigEnabled = true; constructor( diff --git a/packages/grpc-js/src/resolving-call.ts b/packages/grpc-js/src/resolving-call.ts index 683fed5bc..5dbe83cd9 100644 --- a/packages/grpc-js/src/resolving-call.ts +++ b/packages/grpc-js/src/resolving-call.ts @@ -41,7 +41,7 @@ export class ResolvingCall implements Call { private deadline: Deadline; private host: string; private statusWatchers: ((status: StatusObject) => void)[] = []; - private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0); + private deadlineTimer: NodeJS.Timeout = setTimeout(() => {}, 0); private filterStack: FilterStack | null = null; constructor( diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts index 5ae585b9e..bf15c9143 100644 --- a/packages/grpc-js/src/retrying-call.ts +++ b/packages/grpc-js/src/retrying-call.ts @@ -169,7 +169,7 @@ export class RetryingCall implements Call { * Number of attempts so far */ private attempts: number = 0; - private hedgingTimer: NodeJS.Timer | null = null; + private hedgingTimer: NodeJS.Timeout | null = null; private committedCallIndex: number | null = null; private initialRetryBackoffSec = 0; private nextRetryBackoffSec = 0; @@ -625,7 +625,7 @@ export class RetryingCall implements Call { return; } const call = this.underlyingCalls[this.committedCallIndex]; - bufferEntry.callback = context.callback; + bufferEntry.callback = context.callback; if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { call.call.sendMessageWithContext({ callback: (error) => { @@ -668,4 +668,4 @@ export class RetryingCall implements Call { getHost(): string { return this.host; } -} \ No newline at end of file +} diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index ff0a10336..be807b959 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -407,7 +407,7 @@ export class Http2ServerCallStream< ResponseType > extends EventEmitter { cancelled = false; - deadlineTimer: NodeJS.Timer | null = null; + deadlineTimer: NodeJS.Timeout | null = null; private statusSent = false; private deadline: Deadline = Infinity; private wantTrailers = false; diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 9853ddf56..1b481bbf9 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -971,8 +971,8 @@ export class Server { this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress); this.sessionChildrenTracker.refChild(channelzRef); } - let connectionAgeTimer: NodeJS.Timer | null = null; - let connectionAgeGraceTimer: NodeJS.Timer | null = null; + let connectionAgeTimer: NodeJS.Timeout | null = null; + let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { // Apply a random jitter within a +/-10% range @@ -1000,7 +1000,7 @@ export class Server { } }, this.maxConnectionAgeMs + jitter).unref?.(); } - const keeapliveTimeTimer: NodeJS.Timer | null = setInterval(() => { + const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => { const timeoutTImer = setTimeout(() => { sessionClosedByServer = true; if (this.channelzEnabled) { diff --git a/packages/grpc-js/src/subchannel-pool.ts b/packages/grpc-js/src/subchannel-pool.ts index bbfbea02b..f9400bede 100644 --- a/packages/grpc-js/src/subchannel-pool.ts +++ b/packages/grpc-js/src/subchannel-pool.ts @@ -45,7 +45,7 @@ export class SubchannelPool { /** * A timer of a task performing a periodic subchannel cleanup. */ - private cleanupTimer: NodeJS.Timer | null = null; + private cleanupTimer: NodeJS.Timeout | null = null; /** * A pool of subchannels use for making connections. Subchannels with the diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 4f26e96e2..a9da5db4d 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -81,7 +81,7 @@ class Http2Transport implements Transport { /** * Timer reference for timeout that indicates when to send the next ping */ - private keepaliveTimerId: NodeJS.Timer | null = null; + private keepaliveTimerId: NodeJS.Timeout | null = null; /** * Indicates that the keepalive timer ran out while there were no active * calls, and a ping should be sent the next time a call starts. @@ -90,7 +90,7 @@ class Http2Transport implements Transport { /** * Timer reference tracking when the most recent ping will be considered lost */ - private keepaliveTimeoutId: NodeJS.Timer | null = null; + private keepaliveTimeoutId: NodeJS.Timeout | null = null; /** * Indicates whether keepalive pings should be sent without any active calls */ From c2434173c111db636e3de4b8d71d057a67f398ae Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 4 Jan 2024 13:11:54 -0800 Subject: [PATCH 25/30] Merge pull request #2635 from XuanWang-Amos/psm-interop-shared-build buildscripts: Use the Kokoro shared install lib from the new repo --- packages/grpc-js-xds/scripts/xds_k8s_lb.sh | 2 +- packages/grpc-js-xds/scripts/xds_k8s_url_map.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh index 729fb9293..ed7c77fe2 100755 --- a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh +++ b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh @@ -17,7 +17,7 @@ set -eo pipefail # Constants readonly GITHUB_REPOSITORY_NAME="grpc-node" -readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/grpc/${TEST_DRIVER_BRANCH:-master}/tools/internal_ci/linux/grpc_xds_k8s_install_test_driver.sh" +readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/psm-interop/${TEST_DRIVER_BRANCH:-main}/.kokoro/psm_interop_kokoro_lib.sh" ## xDS test client Docker images readonly SERVER_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/java-server:558b5b0bfac8e21755c223063274a779b3898afe" readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/node-client" diff --git a/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh b/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh index fc74718f2..9344d054b 100644 --- a/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh +++ b/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh @@ -17,7 +17,7 @@ set -eo pipefail # Constants readonly GITHUB_REPOSITORY_NAME="grpc-node" -readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/grpc/${TEST_DRIVER_BRANCH:-master}/tools/internal_ci/linux/grpc_xds_k8s_install_test_driver.sh" +readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/psm-interop/${TEST_DRIVER_BRANCH:-main}/.kokoro/psm_interop_kokoro_lib.sh" ## xDS test client Docker images readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/node-client" readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}" From f38966aab5cd7b4c94cb6f3e6a518375f11f5e52 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 8 Apr 2024 17:47:26 -0700 Subject: [PATCH 26/30] Merge pull request #2712 from sergiitk/psm-interop-pkg-dev PSM Interop: Migrate to Artifact Registry --- packages/grpc-js-xds/scripts/xds_k8s_lb.sh | 7 ++++--- packages/grpc-js-xds/scripts/xds_k8s_url_map.sh | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh index ed7c77fe2..2c5684ea3 100755 --- a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh +++ b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh @@ -19,8 +19,9 @@ set -eo pipefail readonly GITHUB_REPOSITORY_NAME="grpc-node" readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/psm-interop/${TEST_DRIVER_BRANCH:-main}/.kokoro/psm_interop_kokoro_lib.sh" ## xDS test client Docker images -readonly SERVER_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/java-server:558b5b0bfac8e21755c223063274a779b3898afe" -readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/node-client" +readonly DOCKER_REGISTRY="us-docker.pkg.dev" +readonly SERVER_IMAGE_NAME="us-docker.pkg.dev/grpc-testing/psm-interop/java-server:canonical" +readonly CLIENT_IMAGE_NAME="us-docker.pkg.dev/grpc-testing/psm-interop/node-client" readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}" readonly BUILD_APP_PATH="packages/grpc-js-xds/interop/Dockerfile" readonly LANGUAGE_NAME="Node" @@ -46,7 +47,7 @@ build_test_app_docker_images() { -t "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \ . - gcloud -q auth configure-docker + gcloud -q auth configure-docker "${DOCKER_REGISTRY}" docker push "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" if is_version_branch "${TESTING_VERSION}"; then tag_and_push_docker_image "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}" "${TESTING_VERSION}" diff --git a/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh b/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh index 9344d054b..d6e2c7ed4 100644 --- a/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh +++ b/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh @@ -19,7 +19,8 @@ set -eo pipefail readonly GITHUB_REPOSITORY_NAME="grpc-node" readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/psm-interop/${TEST_DRIVER_BRANCH:-main}/.kokoro/psm_interop_kokoro_lib.sh" ## xDS test client Docker images -readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/node-client" +readonly DOCKER_REGISTRY="us-docker.pkg.dev" +readonly CLIENT_IMAGE_NAME="us-docker.pkg.dev/grpc-testing/psm-interop/node-client" readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}" readonly BUILD_APP_PATH="packages/grpc-js-xds/interop/Dockerfile" readonly LANGUAGE_NAME="Node" @@ -45,7 +46,7 @@ build_test_app_docker_images() { -t "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \ . - gcloud -q auth configure-docker + gcloud -q auth configure-docker "${DOCKER_REGISTRY}" docker push "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" if is_version_branch "${TESTING_VERSION}"; then tag_and_push_docker_image "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}" "${TESTING_VERSION}" From 969e30502767f3d7036b975efbffcc97b1fff40d Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 3 May 2024 14:24:44 -0700 Subject: [PATCH 27/30] Merge pull request #2735 from murgatroid99/grpc-js_linkify-it_fix root: Update dependency on jsdoc to avoid linkify-it compilation error --- package.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/package.json b/package.json index 70a15fbbf..49087fb05 100644 --- a/package.json +++ b/package.json @@ -20,14 +20,13 @@ "del": "^3.0.0", "execa": "^0.8.0", "gulp": "^4.0.1", - "gulp-jsdoc3": "^1.0.1", "gulp-jshint": "^2.0.4", "gulp-mocha": "^4.3.1", "gulp-sourcemaps": "^2.6.1", "gulp-tslint": "^8.1.1", "gulp-typescript": "^3.2.2", "gulp-util": "^3.0.8", - "jsdoc": "^3.3.2", + "jsdoc": "^4.0.3", "jshint": "^2.9.5", "make-dir": "^1.1.0", "merge2": "^1.1.0", From 00f348c4861ab87aa3c18935a07babd0715227a6 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 6 May 2024 15:20:11 -0700 Subject: [PATCH 28/30] Merge pull request #2729 from sergiitk/psm-interop-common-prod-tests PSM Interop: simplify Kokoro buildscripts --- .../scripts/psm-interop-build-node.sh | 38 ++++ .../scripts/psm-interop-test-node.sh | 40 ++++ packages/grpc-js-xds/scripts/xds_k8s_lb.sh | 176 ------------------ .../grpc-js-xds/scripts/xds_k8s_url_map.sh | 163 ---------------- test/kokoro/xds-interop.cfg | 24 --- test/kokoro/xds_k8s_lb.cfg | 6 +- test/kokoro/xds_k8s_url_map.cfg | 6 +- 7 files changed, 88 insertions(+), 365 deletions(-) create mode 100755 packages/grpc-js-xds/scripts/psm-interop-build-node.sh create mode 100755 packages/grpc-js-xds/scripts/psm-interop-test-node.sh delete mode 100755 packages/grpc-js-xds/scripts/xds_k8s_lb.sh delete mode 100644 packages/grpc-js-xds/scripts/xds_k8s_url_map.sh delete mode 100644 test/kokoro/xds-interop.cfg diff --git a/packages/grpc-js-xds/scripts/psm-interop-build-node.sh b/packages/grpc-js-xds/scripts/psm-interop-build-node.sh new file mode 100755 index 000000000..d52206f0e --- /dev/null +++ b/packages/grpc-js-xds/scripts/psm-interop-build-node.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -eo pipefail + +####################################### +# Builds test app Docker images and pushes them to GCR. +# Called from psm_interop_kokoro_lib.sh. +# +# Globals: +# SRC_DIR: Absolute path to the source repo on Kokoro VM +# SERVER_IMAGE_NAME: Test server Docker image name +# CLIENT_IMAGE_NAME: Test client Docker image name +# GIT_COMMIT: SHA-1 of git commit being built +# DOCKER_REGISTRY: Docker registry to push to +# Outputs: +# Writes the output of docker image build stdout, stderr +####################################### +psm::lang::build_docker_images() { + local client_dockerfile="packages/grpc-js-xds/interop/Dockerfile" + + cd "${SRC_DIR}" + psm::tools::run_verbose git submodule update --init --recursive + psm::tools::run_verbose git submodule status + + psm::build::docker_images_generic "${client_dockerfile}" +} diff --git a/packages/grpc-js-xds/scripts/psm-interop-test-node.sh b/packages/grpc-js-xds/scripts/psm-interop-test-node.sh new file mode 100755 index 000000000..169cf06f2 --- /dev/null +++ b/packages/grpc-js-xds/scripts/psm-interop-test-node.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -eo pipefail + +# Input parameters to psm:: methods of the install script. +readonly GRPC_LANGUAGE="node" +readonly BUILD_SCRIPT_DIR="$(dirname "$0")" + +# Used locally. +readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/psm-interop/${TEST_DRIVER_BRANCH:-main}/.kokoro/psm_interop_kokoro_lib.sh" + +psm::lang::source_install_lib() { + echo "Sourcing test driver install script from: ${TEST_DRIVER_INSTALL_SCRIPT_URL}" + local install_lib + # Download to a tmp file. + install_lib="$(mktemp -d)/psm_interop_kokoro_lib.sh" + curl -s --retry-connrefused --retry 5 -o "${install_lib}" "${TEST_DRIVER_INSTALL_SCRIPT_URL}" + # Checksum. + if command -v sha256sum &> /dev/null; then + echo "Install script checksum:" + sha256sum "${install_lib}" + fi + source "${install_lib}" +} + +psm::lang::source_install_lib +source "${BUILD_SCRIPT_DIR}/psm-interop-build-${GRPC_LANGUAGE}.sh" +psm::run "${PSM_TEST_SUITE}" diff --git a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh b/packages/grpc-js-xds/scripts/xds_k8s_lb.sh deleted file mode 100755 index 2c5684ea3..000000000 --- a/packages/grpc-js-xds/scripts/xds_k8s_lb.sh +++ /dev/null @@ -1,176 +0,0 @@ -#!/usr/bin/env bash -# Copyright 2022 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -eo pipefail - -# Constants -readonly GITHUB_REPOSITORY_NAME="grpc-node" -readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/psm-interop/${TEST_DRIVER_BRANCH:-main}/.kokoro/psm_interop_kokoro_lib.sh" -## xDS test client Docker images -readonly DOCKER_REGISTRY="us-docker.pkg.dev" -readonly SERVER_IMAGE_NAME="us-docker.pkg.dev/grpc-testing/psm-interop/java-server:canonical" -readonly CLIENT_IMAGE_NAME="us-docker.pkg.dev/grpc-testing/psm-interop/node-client" -readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}" -readonly BUILD_APP_PATH="packages/grpc-js-xds/interop/Dockerfile" -readonly LANGUAGE_NAME="Node" - -####################################### -# Builds test app Docker images and pushes them to GCR -# Globals: -# BUILD_APP_PATH -# CLIENT_IMAGE_NAME: Test client Docker image name -# GIT_COMMIT: SHA-1 of git commit being built -# TESTING_VERSION: version branch under test, f.e. v1.42.x, master -# Arguments: -# None -# Outputs: -# Writes the output of `gcloud builds submit` to stdout, stderr -####################################### -build_test_app_docker_images() { - echo "Building ${LANGUAGE_NAME} xDS interop test app Docker images" - - pushd "${SRC_DIR}" - docker build \ - -f "${BUILD_APP_PATH}" \ - -t "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \ - . - - gcloud -q auth configure-docker "${DOCKER_REGISTRY}" - docker push "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" - if is_version_branch "${TESTING_VERSION}"; then - tag_and_push_docker_image "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}" "${TESTING_VERSION}" - fi - popd -} - -####################################### -# Builds test app and its docker images unless they already exist -# Globals: -# CLIENT_IMAGE_NAME: Test client Docker image name -# GIT_COMMIT: SHA-1 of git commit being built -# FORCE_IMAGE_BUILD -# Arguments: -# None -# Outputs: -# Writes the output to stdout, stderr -####################################### -build_docker_images_if_needed() { - # Check if images already exist - client_tags="$(gcloud_gcr_list_image_tags "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}")" - printf "Client image: %s:%s\n" "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}" - echo "${client_tags:-Client image not found}" - - # Build if any of the images are missing, or FORCE_IMAGE_BUILD=1 - if [[ "${FORCE_IMAGE_BUILD}" == "1" || -z "${client_tags}" ]]; then - build_test_app_docker_images - else - echo "Skipping ${LANGUAGE_NAME} test app build" - fi -} - -####################################### -# Executes the test case -# Globals: -# TEST_DRIVER_FLAGFILE: Relative path to test driver flagfile -# KUBE_CONTEXT: The name of kubectl context with GKE cluster access -# SECONDARY_KUBE_CONTEXT: The name of kubectl context with secondary GKE cluster access, if any -# TEST_XML_OUTPUT_DIR: Output directory for the test xUnit XML report -# CLIENT_IMAGE_NAME: Test client Docker image name -# GIT_COMMIT: SHA-1 of git commit being built -# Arguments: -# Test case name -# Outputs: -# Writes the output of test execution to stdout, stderr -# Test xUnit report to ${TEST_XML_OUTPUT_DIR}/${test_name}/sponge_log.xml -####################################### -run_test() { - # Test driver usage: - # https://github.com/grpc/grpc/tree/master/tools/run_tests/xds_k8s_test_driver#basic-usage - local test_name="${1:?Usage: run_test test_name}" - local out_dir="${TEST_XML_OUTPUT_DIR}/${test_name}" - mkdir -pv "${out_dir}" - # testing_version is used by the framework to determine the supported PSM - # features. It's captured from Kokoro job name of the Node repo, which takes - # the form: - # grpc/node// - python3 -m "tests.${test_name}" \ - --flagfile="${TEST_DRIVER_FLAGFILE}" \ - --kube_context="${KUBE_CONTEXT}" \ - --secondary_kube_context="${SECONDARY_KUBE_CONTEXT}" \ - --client_image="${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \ - --server_image="${SERVER_IMAGE_NAME}" \ - --testing_version="${TESTING_VERSION}" \ - --force_cleanup \ - --collect_app_logs \ - --log_dir="${out_dir}" \ - --xml_output_file="${out_dir}/sponge_log.xml" \ - |& tee "${out_dir}/sponge_log.log" -} - -####################################### -# Main function: provision software necessary to execute tests, and run them -# Globals: -# KOKORO_ARTIFACTS_DIR -# GITHUB_REPOSITORY_NAME -# SRC_DIR: Populated with absolute path to the source repo -# TEST_DRIVER_REPO_DIR: Populated with the path to the repo containing -# the test driver -# TEST_DRIVER_FULL_DIR: Populated with the path to the test driver source code -# TEST_DRIVER_FLAGFILE: Populated with relative path to test driver flagfile -# TEST_XML_OUTPUT_DIR: Populated with the path to test xUnit XML report -# GIT_ORIGIN_URL: Populated with the origin URL of git repo used for the build -# GIT_COMMIT: Populated with the SHA-1 of git commit being built -# GIT_COMMIT_SHORT: Populated with the short SHA-1 of git commit being built -# KUBE_CONTEXT: Populated with name of kubectl context with GKE cluster access -# SECONDARY_KUBE_CONTEXT: Populated with name of kubectl context with secondary GKE cluster access, if any -# Arguments: -# None -# Outputs: -# Writes the output of test execution to stdout, stderr -####################################### -main() { - local script_dir - script_dir="$(dirname "$0")" - - cd "${script_dir}" - - git submodule update --init --recursive - - # Source the test driver from the master branch. - echo "Sourcing test driver install script from: ${TEST_DRIVER_INSTALL_SCRIPT_URL}" - source /dev/stdin <<< "$(curl -s "${TEST_DRIVER_INSTALL_SCRIPT_URL}")" - - activate_gke_cluster GKE_CLUSTER_PSM_LB - activate_secondary_gke_cluster GKE_CLUSTER_PSM_LB - - set -x - if [[ -n "${KOKORO_ARTIFACTS_DIR}" ]]; then - kokoro_setup_test_driver "${GITHUB_REPOSITORY_NAME}" - else - local_setup_test_driver "${script_dir}" - fi - build_docker_images_if_needed - - # Run tests - cd "${TEST_DRIVER_FULL_DIR}" - local failed_tests=0 - test_suites=("baseline_test" "api_listener_test" "change_backend_service_test" "failover_test" "remove_neg_test" "round_robin_test" "outlier_detection_test") - for test in "${test_suites[@]}"; do - run_test $test || (( ++failed_tests )) - done - echo "Failed test suites: ${failed_tests}" -} - -main "$@" diff --git a/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh b/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh deleted file mode 100644 index d6e2c7ed4..000000000 --- a/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env bash -# Copyright 2022 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -eo pipefail - -# Constants -readonly GITHUB_REPOSITORY_NAME="grpc-node" -readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/psm-interop/${TEST_DRIVER_BRANCH:-main}/.kokoro/psm_interop_kokoro_lib.sh" -## xDS test client Docker images -readonly DOCKER_REGISTRY="us-docker.pkg.dev" -readonly CLIENT_IMAGE_NAME="us-docker.pkg.dev/grpc-testing/psm-interop/node-client" -readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}" -readonly BUILD_APP_PATH="packages/grpc-js-xds/interop/Dockerfile" -readonly LANGUAGE_NAME="Node" - -####################################### -# Builds test app Docker images and pushes them to GCR -# Globals: -# BUILD_APP_PATH -# CLIENT_IMAGE_NAME: Test client Docker image name -# GIT_COMMIT: SHA-1 of git commit being built -# TESTING_VERSION: version branch under test, f.e. v1.42.x, master -# Arguments: -# None -# Outputs: -# Writes the output of `gcloud builds submit` to stdout, stderr -####################################### -build_test_app_docker_images() { - echo "Building ${LANGUAGE_NAME} xDS interop test app Docker images" - - pushd "${SRC_DIR}" - docker build \ - -f "${BUILD_APP_PATH}" \ - -t "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \ - . - - gcloud -q auth configure-docker "${DOCKER_REGISTRY}" - docker push "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" - if is_version_branch "${TESTING_VERSION}"; then - tag_and_push_docker_image "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}" "${TESTING_VERSION}" - fi - popd -} - -####################################### -# Builds test app and its docker images unless they already exist -# Globals: -# CLIENT_IMAGE_NAME: Test client Docker image name -# GIT_COMMIT: SHA-1 of git commit being built -# FORCE_IMAGE_BUILD -# Arguments: -# None -# Outputs: -# Writes the output to stdout, stderr -####################################### -build_docker_images_if_needed() { - # Check if images already exist - client_tags="$(gcloud_gcr_list_image_tags "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}")" - printf "Client image: %s:%s\n" "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}" - echo "${client_tags:-Client image not found}" - - # Build if any of the images are missing, or FORCE_IMAGE_BUILD=1 - if [[ "${FORCE_IMAGE_BUILD}" == "1" || -z "${client_tags}" ]]; then - build_test_app_docker_images - else - echo "Skipping ${LANGUAGE_NAME} test app build" - fi -} - -####################################### -# Executes the test case -# Globals: -# TEST_DRIVER_FLAGFILE: Relative path to test driver flagfile -# KUBE_CONTEXT: The name of kubectl context with GKE cluster access -# TEST_XML_OUTPUT_DIR: Output directory for the test xUnit XML report -# CLIENT_IMAGE_NAME: Test client Docker image name -# GIT_COMMIT: SHA-1 of git commit being built -# TESTING_VERSION: version branch under test: used by the framework to determine the supported PSM -# features. -# Arguments: -# Test case name -# Outputs: -# Writes the output of test execution to stdout, stderr -# Test xUnit report to ${TEST_XML_OUTPUT_DIR}/${test_name}/sponge_log.xml -####################################### -run_test() { - # Test driver usage: - # https://github.com/grpc/grpc/tree/master/tools/run_tests/xds_k8s_test_driver#basic-usage - local test_name="${1:?Usage: run_test test_name}" - local out_dir="${TEST_XML_OUTPUT_DIR}/${test_name}" - mkdir -pv "${out_dir}" - set -x - python3 -m "tests.${test_name}" \ - --flagfile="${TEST_DRIVER_FLAGFILE}" \ - --flagfile="config/url-map.cfg" \ - --kube_context="${KUBE_CONTEXT}" \ - --client_image="${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \ - --testing_version="${TESTING_VERSION}" \ - --collect_app_logs \ - --log_dir="${out_dir}" \ - --xml_output_file="${out_dir}/sponge_log.xml" \ - |& tee "${out_dir}/sponge_log.log" -} - -####################################### -# Main function: provision software necessary to execute tests, and run them -# Globals: -# KOKORO_ARTIFACTS_DIR -# GITHUB_REPOSITORY_NAME -# SRC_DIR: Populated with absolute path to the source repo -# TEST_DRIVER_REPO_DIR: Populated with the path to the repo containing -# the test driver -# TEST_DRIVER_FULL_DIR: Populated with the path to the test driver source code -# TEST_DRIVER_FLAGFILE: Populated with relative path to test driver flagfile -# TEST_XML_OUTPUT_DIR: Populated with the path to test xUnit XML report -# GIT_ORIGIN_URL: Populated with the origin URL of git repo used for the build -# GIT_COMMIT: Populated with the SHA-1 of git commit being built -# GIT_COMMIT_SHORT: Populated with the short SHA-1 of git commit being built -# KUBE_CONTEXT: Populated with name of kubectl context with GKE cluster access -# Arguments: -# None -# Outputs: -# Writes the output of test execution to stdout, stderr -####################################### -main() { - local script_dir - script_dir="$(dirname "$0")" - - cd "${script_dir}" - - git submodule update --init --recursive - - # Source the test driver from the master branch. - echo "Sourcing test driver install script from: ${TEST_DRIVER_INSTALL_SCRIPT_URL}" - source /dev/stdin <<< "$(curl -s "${TEST_DRIVER_INSTALL_SCRIPT_URL}")" - - activate_gke_cluster GKE_CLUSTER_PSM_BASIC - - set -x - if [[ -n "${KOKORO_ARTIFACTS_DIR}" ]]; then - kokoro_setup_test_driver "${GITHUB_REPOSITORY_NAME}" - else - local_setup_test_driver "${script_dir}" - fi - build_docker_images_if_needed - # Run tests - cd "${TEST_DRIVER_FULL_DIR}" - run_test url_map || echo "Failed url_map test" -} - -main "$@" diff --git a/test/kokoro/xds-interop.cfg b/test/kokoro/xds-interop.cfg deleted file mode 100644 index 866cb4b58..000000000 --- a/test/kokoro/xds-interop.cfg +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2017 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Config file for Kokoro (in protobuf text format) - -# Location of the continuous shell script in repository. -build_file: "grpc-node/packages/grpc-js-xds/scripts/xds.sh" -timeout_mins: 360 -action { - define_artifacts { - regex: "github/grpc/reports/**" - } -} diff --git a/test/kokoro/xds_k8s_lb.cfg b/test/kokoro/xds_k8s_lb.cfg index 09aa3d17d..3efb62f29 100644 --- a/test/kokoro/xds_k8s_lb.cfg +++ b/test/kokoro/xds_k8s_lb.cfg @@ -15,7 +15,7 @@ # Config file for Kokoro (in protobuf text format) # Location of the continuous shell script in repository. -build_file: "grpc-node/packages/grpc-js-xds/scripts/xds_k8s_lb.sh" +build_file: "grpc-node/packages/grpc-js-xds/scripts/psm-interop-test-node.sh" timeout_mins: 180 action { define_artifacts { @@ -24,3 +24,7 @@ action { strip_prefix: "artifacts" } } +env_vars { + key: "PSM_TEST_SUITE" + value: "lb" +} diff --git a/test/kokoro/xds_k8s_url_map.cfg b/test/kokoro/xds_k8s_url_map.cfg index 50d523b66..bb6e6baf1 100644 --- a/test/kokoro/xds_k8s_url_map.cfg +++ b/test/kokoro/xds_k8s_url_map.cfg @@ -15,7 +15,7 @@ # Config file for Kokoro (in protobuf text format) # Location of the continuous shell script in repository. -build_file: "grpc-node/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh" +build_file: "grpc-node/packages/grpc-js-xds/scripts/psm-interop-test-node.sh" timeout_mins: 180 action { define_artifacts { @@ -24,3 +24,7 @@ action { strip_prefix: "artifacts" } } +env_vars { + key: "PSM_TEST_SUITE" + value: "url_map" +} From 8e622220c8af437687927d25aeb3f340447aed43 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 7 Jun 2024 10:44:44 -0700 Subject: [PATCH 29/30] grpc-js: Avoid buffering significantly more than max_receive_message_size per received message (1.8.x) --- packages/grpc-js/src/compression-filter.ts | 72 +++++++++++---- packages/grpc-js/src/internal-channel.ts | 2 - .../grpc-js/src/max-message-size-filter.ts | 89 ------------------- packages/grpc-js/src/server-call.ts | 87 +++++++++++------- packages/grpc-js/src/stream-decoder.ts | 5 ++ packages/grpc-js/src/subchannel-call.ts | 14 ++- packages/grpc-js/src/transport.ts | 15 +++- .../grpc-js/test/fixtures/test_service.proto | 1 + packages/grpc-js/test/test-server-errors.ts | 49 +++++++++- 9 files changed, 186 insertions(+), 148 deletions(-) delete mode 100644 packages/grpc-js/src/max-message-size-filter.ts diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index b8c2e6d1e..86af3a5f6 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface'; import { Channel } from './channel'; import { ChannelOptions } from './channel-options'; import { CompressionAlgorithms } from './compression-algorithms'; -import { LogVerbosity } from './constants'; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, LogVerbosity, Status } from './constants'; import { BaseFilter, Filter, FilterFactory } from './filter'; import * as logging from './logging'; import { Metadata, MetadataValue } from './metadata'; @@ -94,6 +94,10 @@ class IdentityHandler extends CompressionHandler { } class DeflateHandler extends CompressionHandler { + constructor(private maxRecvMessageLength: number) { + super(); + } + compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.deflate(message, (err, output) => { @@ -108,18 +112,34 @@ class DeflateHandler extends CompressionHandler { decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { - zlib.inflate(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); + let totalLength = 0; + const messageParts: Buffer[] = []; + const decompresser = zlib.createInflate(); + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}` + }); } }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(message); + decompresser.end(); }); } } class GzipHandler extends CompressionHandler { + constructor(private maxRecvMessageLength: number) { + super(); + } + compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.gzip(message, (err, output) => { @@ -134,13 +154,25 @@ class GzipHandler extends CompressionHandler { decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { - zlib.unzip(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); + let totalLength = 0; + const messageParts: Buffer[] = []; + const decompresser = zlib.createGunzip(); + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}` + }); } }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(message); + decompresser.end(); }); } } @@ -165,14 +197,14 @@ class UnknownHandler extends CompressionHandler { } } -function getCompressionHandler(compressionName: string): CompressionHandler { +function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler { switch (compressionName) { case 'identity': return new IdentityHandler(); case 'deflate': - return new DeflateHandler(); + return new DeflateHandler(maxReceiveMessageSize); case 'gzip': - return new GzipHandler(); + return new GzipHandler(maxReceiveMessageSize); default: return new UnknownHandler(compressionName); } @@ -182,11 +214,14 @@ export class CompressionFilter extends BaseFilter implements Filter { private sendCompression: CompressionHandler = new IdentityHandler(); private receiveCompression: CompressionHandler = new IdentityHandler(); private currentCompressionAlgorithm: CompressionAlgorithm = 'identity'; + private maxReceiveMessageLength: number; constructor(channelOptions: ChannelOptions, private sharedFilterConfig: SharedCompressionFilterConfig) { super(); - const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm']; + const compressionAlgorithmKey = + channelOptions['grpc.default_compression_algorithm']; + this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH if (compressionAlgorithmKey !== undefined) { if (isCompressionAlgorithmKey(compressionAlgorithmKey)) { const clientSelectedEncoding = CompressionAlgorithms[compressionAlgorithmKey] as CompressionAlgorithm; @@ -200,7 +235,10 @@ export class CompressionFilter extends BaseFilter implements Filter { */ if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) { this.currentCompressionAlgorithm = clientSelectedEncoding; - this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm); + this.sendCompression = getCompressionHandler( + this.currentCompressionAlgorithm, + -1 + ); } } else { logging.log(LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`); @@ -228,7 +266,7 @@ export class CompressionFilter extends BaseFilter implements Filter { if (receiveEncoding.length > 0) { const encoding: MetadataValue = receiveEncoding[0]; if (typeof encoding === 'string') { - this.receiveCompression = getCompressionHandler(encoding); + this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength); } } metadata.remove('grpc-encoding'); diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 38646a0cd..242a1cbd8 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -33,7 +33,6 @@ import { } from './resolver'; import { trace } from './logging'; import { SubchannelAddress } from './subchannel-address'; -import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; import { mapProxyName } from './http_proxy'; import { GrpcUri, parseUri, uriToString } from './uri-parser'; import { ServerSurfaceCall } from './server-call'; @@ -310,7 +309,6 @@ export class InternalChannel { } ); this.filterStackFactory = new FilterStackFactory([ - new MaxMessageSizeFilterFactory(this.options), new CompressionFilterFactory(this, this.options), ]); this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); diff --git a/packages/grpc-js/src/max-message-size-filter.ts b/packages/grpc-js/src/max-message-size-filter.ts deleted file mode 100644 index 25e4fdc03..000000000 --- a/packages/grpc-js/src/max-message-size-filter.ts +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { BaseFilter, Filter, FilterFactory } from './filter'; -import { WriteObject } from './call-interface'; -import { - Status, - DEFAULT_MAX_SEND_MESSAGE_LENGTH, - DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, -} from './constants'; -import { ChannelOptions } from './channel-options'; -import { Metadata } from './metadata'; - -export class MaxMessageSizeFilter extends BaseFilter implements Filter { - private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH; - private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; - constructor( - options: ChannelOptions - ) { - super(); - if ('grpc.max_send_message_length' in options) { - this.maxSendMessageSize = options['grpc.max_send_message_length']!; - } - if ('grpc.max_receive_message_length' in options) { - this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!; - } - } - - async sendMessage(message: Promise): Promise { - /* A configured size of -1 means that there is no limit, so skip the check - * entirely */ - if (this.maxSendMessageSize === -1) { - return message; - } else { - const concreteMessage = await message; - if (concreteMessage.message.length > this.maxSendMessageSize) { - throw { - code: Status.RESOURCE_EXHAUSTED, - details: `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`, - metadata: new Metadata() - }; - } else { - return concreteMessage; - } - } - } - - async receiveMessage(message: Promise): Promise { - /* A configured size of -1 means that there is no limit, so skip the check - * entirely */ - if (this.maxReceiveMessageSize === -1) { - return message; - } else { - const concreteMessage = await message; - if (concreteMessage.length > this.maxReceiveMessageSize) { - throw { - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`, - metadata: new Metadata() - }; - } else { - return concreteMessage; - } - } - } -} - -export class MaxMessageSizeFilterFactory - implements FilterFactory { - constructor(private readonly options: ChannelOptions) {} - - createFilter(): MaxMessageSizeFilter { - return new MaxMessageSizeFilter(this.options); - } -} diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index be807b959..b06feda14 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -19,7 +19,6 @@ import { EventEmitter } from 'events'; import * as http2 from 'http2'; import { Duplex, Readable, Writable } from 'stream'; import * as zlib from 'zlib'; -import { promisify } from 'util'; import { Status, @@ -38,8 +37,6 @@ import { Deadline } from './deadline'; import { getErrorCode, getErrorMessage } from './error'; const TRACER_NAME = 'server_call'; -const unzip = promisify(zlib.unzip); -const inflate = promisify(zlib.inflate); function trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); @@ -478,19 +475,42 @@ export class Http2ServerCallStream< private getDecompressedMessage( message: Buffer, encoding: string - ): Buffer | Promise { - if (encoding === 'deflate') { - return inflate(message.subarray(5)); - } else if (encoding === 'gzip') { - return unzip(message.subarray(5)); - } else if (encoding === 'identity') { - return message.subarray(5); + ): Buffer | Promise { const messageContents = message.subarray(5); + if (encoding === 'identity') { + return messageContents; + } else if (encoding === 'deflate' || encoding === 'gzip') { + let decompresser: zlib.Gunzip | zlib.Deflate; + if (encoding === 'deflate') { + decompresser = zlib.createInflate(); + } else { + decompresser = zlib.createGunzip(); + } + return new Promise((resolve, reject) => { + let totalLength = 0 + const messageParts: Buffer[] = []; + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}` + }); + } + }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(messageContents); + decompresser.end(); + }); + } else { + return Promise.reject({ + code: Status.UNIMPLEMENTED, + details: `Received message compressed with unsupported encoding "${encoding}"`, + }); } - - return Promise.reject({ - code: Status.UNIMPLEMENTED, - details: `Received message compressed with unsupported encoding "${encoding}"`, - }); } sendMetadata(customMetadata?: Metadata) { @@ -807,7 +827,7 @@ export class Http2ServerCallStream< | ServerDuplexStream, encoding: string ) { - const decoder = new StreamDecoder(); + const decoder = new StreamDecoder(this.maxReceiveMessageSize); let readsDone = false; @@ -823,29 +843,34 @@ export class Http2ServerCallStream< }; this.stream.on('data', async (data: Buffer) => { - const messages = decoder.write(data); + let messages: Buffer[]; + try { + messages = decoder.write(data); + } catch (e) { + this.sendError({ + code: Status.RESOURCE_EXHAUSTED, + details: (e as Error).message + }); + return; + } pendingMessageProcessing = true; this.stream.pause(); for (const message of messages) { - if ( - this.maxReceiveMessageSize !== -1 && - message.length > this.maxReceiveMessageSize - ) { - this.sendError({ - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`, - }); - return; - } this.emit('receiveMessage'); const compressed = message.readUInt8(0) === 1; const compressedMessageEncoding = compressed ? encoding : 'identity'; - const decompressedMessage = await this.getDecompressedMessage( - message, - compressedMessageEncoding - ); + let decompressedMessage: Buffer; + try { + decompressedMessage = await this.getDecompressedMessage( + message, + compressedMessageEncoding + ); + } catch (e) { + this.sendError(e as Partial); + return; + } // Encountered an error with decompression; it'll already have been propogated back // Just return early diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts index 671ad41ae..ea669d14c 100644 --- a/packages/grpc-js/src/stream-decoder.ts +++ b/packages/grpc-js/src/stream-decoder.ts @@ -30,6 +30,8 @@ export class StreamDecoder { private readPartialMessage: Buffer[] = []; private readMessageRemaining = 0; + constructor(private maxReadMessageLength: number) {} + write(data: Buffer): Buffer[] { let readHead = 0; let toRead: number; @@ -60,6 +62,9 @@ export class StreamDecoder { // readSizeRemaining >=0 here if (this.readSizeRemaining === 0) { this.readMessageSize = this.readPartialSize.readUInt32BE(0); + if (this.maxReadMessageLength !== -1 && this.readMessageSize > this.maxReadMessageLength) { + throw new Error(`Received message larger than max (${this.readMessageSize} vs ${this.maxReadMessageLength})`); + } this.readMessageRemaining = this.readMessageSize; if (this.readMessageRemaining > 0) { this.readState = ReadState.READING_MESSAGE; diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index f9c24f6bd..a210c9ce7 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -18,7 +18,7 @@ import * as http2 from 'http2'; import * as os from 'os'; -import { Status } from './constants'; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, Status } from './constants'; import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; import * as logging from './logging'; @@ -76,7 +76,7 @@ export interface SubchannelCallInterceptingListener extends InterceptingListener } export class Http2SubchannelCall implements SubchannelCall { - private decoder = new StreamDecoder(); + private decoder: StreamDecoder; private isReadFilterPending = false; private isPushPending = false; @@ -106,6 +106,8 @@ export class Http2SubchannelCall implements SubchannelCall { private readonly transport: Transport, private readonly callId: number ) { + const maxReceiveMessageLength = transport.getOptions()['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; + this.decoder = new StreamDecoder(maxReceiveMessageLength); http2Stream.on('response', (headers, flags) => { let headersString = ''; for (const header of Object.keys(headers)) { @@ -163,7 +165,13 @@ export class Http2SubchannelCall implements SubchannelCall { return; } this.trace('receive HTTP/2 data frame of length ' + data.length); - const messages = this.decoder.write(data); + let messages: Buffer[]; + try { + messages = this.decoder.write(data); + } catch (e) { + this.cancelWithStatus(Status.RESOURCE_EXHAUSTED, (e as Error).message); + return; + } for (const message of messages) { this.trace('parsed message of length ' + message.length); diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index a9da5db4d..8c0164e9d 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -62,7 +62,14 @@ export interface TransportDisconnectListener { export interface Transport { getChannelzRef(): SocketRef; getPeerName(): string; - createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial): SubchannelCall; + getOptions(): ChannelOptions; + createCall( + metadata: Metadata, + host: string, + method: string, + listener: SubchannelCallInterceptingListener, + subchannelCallStatsTracker: Partial + ): SubchannelCall; addDisconnectListener(listener: TransportDisconnectListener): void; shutdown(): void; } @@ -119,7 +126,7 @@ class Http2Transport implements Transport { constructor( private session: http2.ClientHttp2Session, subchannelAddress: SubchannelAddress, - options: ChannelOptions, + private options: ChannelOptions, /** * Name of the remote server, if it is not the same as the subchannel * address, i.e. if connecting through an HTTP CONNECT proxy. @@ -495,6 +502,10 @@ class Http2Transport implements Transport { return this.subchannelAddressString; } + getOptions() { + return this.options; + } + shutdown() { this.session.close(); unregisterChannelzRef(this.channelzRef); diff --git a/packages/grpc-js/test/fixtures/test_service.proto b/packages/grpc-js/test/fixtures/test_service.proto index 64ce0d378..2a7a303f3 100644 --- a/packages/grpc-js/test/fixtures/test_service.proto +++ b/packages/grpc-js/test/fixtures/test_service.proto @@ -21,6 +21,7 @@ message Request { bool error = 1; string message = 2; int32 errorAfter = 3; + int32 responseLength = 4; } message Response { diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts index 91b7c196c..2ad67c6cc 100644 --- a/packages/grpc-js/test/test-server-errors.ts +++ b/packages/grpc-js/test/test-server-errors.ts @@ -33,6 +33,7 @@ import { } from '../src/server-call'; import { loadProtoFile } from './common'; +import { CompressionAlgorithms } from '../src/compression-algorithms'; const protoFile = join(__dirname, 'fixtures', 'test_service.proto'); const testServiceDef = loadProtoFile(protoFile); @@ -309,7 +310,7 @@ describe('Other conditions', () => { trailerMetadata ); } else { - cb(null, { count: 1 }, trailerMetadata); + cb(null, { count: 1, message: 'a'.repeat(req.responseLength) }, trailerMetadata); } }, @@ -319,6 +320,7 @@ describe('Other conditions', () => { ) { let count = 0; let errored = false; + let responseLength = 0; stream.on('data', (data: any) => { if (data.error) { @@ -326,13 +328,14 @@ describe('Other conditions', () => { errored = true; cb(new Error(message) as ServiceError, null, trailerMetadata); } else { + responseLength += data.responseLength; count++; } }); stream.on('end', () => { if (!errored) { - cb(null, { count }, trailerMetadata); + cb(null, { count, message: 'a'.repeat(responseLength) }, trailerMetadata); } }); }, @@ -348,7 +351,7 @@ describe('Other conditions', () => { }); } else { for (let i = 1; i <= 5; i++) { - stream.write({ count: i }); + stream.write({ count: i, message: 'a'.repeat(req.responseLength) }); if (req.errorAfter && req.errorAfter === i) { stream.emit('error', { code: grpc.status.UNKNOWN, @@ -375,7 +378,7 @@ describe('Other conditions', () => { err.metadata.add('count', '' + count); stream.emit('error', err); } else { - stream.write({ count }); + stream.write({ count, message: 'a'.repeat(data.responseLength) }); count++; } }); @@ -739,6 +742,44 @@ describe('Other conditions', () => { }); }); }); + + describe('Max message size', () => { + const largeMessage = 'a'.repeat(10_000_000); + it('Should be enforced on the server', done => { + client.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + done(); + }); + }); + it('Should be enforced on the client', done => { + client.unary({ responseLength: 10_000_000 }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + done(); + }); + }); + describe('Compressed messages', () => { + it('Should be enforced with gzip', done => { + const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.gzip}); + compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + assert.match(error.details, /Received message that decompresses to a size larger/); + done(); + }); + }); + it('Should be enforced with deflate', done => { + const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.deflate}); + compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + assert.match(error.details, /Received message that decompresses to a size larger/); + done(); + }); + }); + }); + }); }); function identity(arg: any): any { From 3b110cddfe4c895d6f642092b99a6667cef5ae00 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 7 Jun 2024 10:58:16 -0700 Subject: [PATCH 30/30] grpc-js: Bump to 1.8.22 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 29c2a00ee..19e37d19b 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.21", + "version": "1.8.22", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",