From 6feae14d51d6410858f4def1d55e71c141cc62f9 Mon Sep 17 00:00:00 2001 From: Yuri Golobokov Date: Wed, 22 Apr 2026 04:44:26 +0000 Subject: [PATCH 1/2] fix: fix handling recycled channels --- .../internal/channels/ChannelPoolDpImpl.java | 16 +++- .../channels/ChannelPoolDpImplTest.java | 94 +++++++++++++++++++ 2 files changed, 107 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java index 28a05523f2..9ef2a486f1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java @@ -262,11 +262,16 @@ private void removeGroup(AfeChannelGroup group) { } @GuardedBy("this") - private AfeChannelGroup rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId) { + private void rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId) { + // No need to rehome recycled channels. + if (channelWrapper.channel.isShutdown()) { + return; + } + AfeChannelGroup origGroup = channelWrapper.group; if (Objects.equals(origGroup.afeId, afeId)) { - return origGroup; + return; } log(Level.FINE, "Rehoming channel from: %s to %s", origGroup.afeId, afeId); @@ -291,7 +296,7 @@ private AfeChannelGroup rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId newGroup.channels.add(channelWrapper); newGroup.numStreams += channelWrapper.numOutstanding; - return newGroup; + return; } // Update accounting when a stream is closed and releases its channel @@ -322,6 +327,11 @@ private static boolean shouldRecycleChannel(Status status) { @GuardedBy("this") private void recycleChannel(ChannelWrapper channelWrapper) { + if (channelWrapper.channel.isShutdown()) { + // Channel is already recycled. + return; + } + channelWrapper.group.channels.remove(channelWrapper); channelWrapper.channel.shutdown(); // Checking for starting group because we don't want to delete the stating group. diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java index 978da523ed..b5e2de38fa 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java @@ -375,4 +375,98 @@ void testRecycleChannelInGroupOnUnimplemented() { pool.close(); } + + @Test + void testDoubleRecycleCreatesExtraChannel() { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + // Create 2 streams on the same channel + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(Mockito.mock(Listener.class), new Metadata()); + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(Mockito.mock(Listener.class), new Metadata()); + + // Initially 1 channel + verify(channelSupplier, times(1)).get(); + + // Trigger first recycle via first stream + ClientCall.Listener listener1 = listener.getAllValues().get(0); + listener1.onClose(Status.UNIMPLEMENTED, new Metadata()); + + // Channel should be recycled (shutdown + addChannel) + verify(channel, times(1)).shutdown(); + // Now isShutdown returns true for the channel + when(channel.isShutdown()).thenReturn(true); + verify(channelSupplier, times(2)).get(); + + // Trigger second recycle via second stream on the SAME (already recycled) channel + ClientCall.Listener listener2 = listener.getAllValues().get(1); + listener2.onClose(Status.UNIMPLEMENTED, new Metadata()); + + // BUG: This should NOT cause another addChannel() or shutdown() + // If it fails, times(3) will be true. + verify(channelSupplier, times(2)).get(); + verify(channel, times(1)).shutdown(); + + pool.close(); + } + + @Test + void testRecycledChannelDoesNotRejoinPool() throws InterruptedException { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + doReturn(Attributes.EMPTY).when(clientCall).getAttributes(); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + // 1. Create stream1 on channel1 + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(Mockito.mock(Listener.class), new Metadata()); + + // 2. Create stream2 on channel1 to trigger recycle + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(Mockito.mock(Listener.class), new Metadata()); + + ClientCall.Listener listener1 = listener.getAllValues().get(0); + ClientCall.Listener listener2 = listener.getAllValues().get(1); + + // Prepare channel2 that will be picked up by addChannel during recycling + ManagedChannel channel2 = Mockito.mock(ManagedChannel.class); + when(channelSupplier.get()).thenReturn(channel2); + when(channel2.newCall(any(), any())).thenReturn(clientCall); + + // 3. Recycle channel1 via stream2 + listener2.onClose(Status.UNIMPLEMENTED, new Metadata()); + verify(channel, times(1)).shutdown(); + // Now isShutdown for the channel1 returns true + when(channel.isShutdown()).thenReturn(true); + + // 4. stream1 (on recycled channel1) receives headers with AFE ID + // This triggers rehomeChannel + PeerInfo peerInfo = PeerInfo.newBuilder().setApplicationFrontendId(555).build(); + Metadata headers = new Metadata(); + headers.put( + SessionStreamImpl.PEER_INFO_KEY, + Base64.getEncoder().encodeToString(peerInfo.toByteArray())); + listener1.onHeaders(headers); + + // 5. Try to create a new stream. + // It should NOT pick channel1 because it's recycled/shutdown. + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT); + + // BUG: If channel1 was re-added to a group, the picker might have picked it. + // channel.newCall was called 2 times (steps 1 and 2). It should NOT be called a 3rd time. + verify(channel, times(2)).newCall(any(), any()); + // Instead, it should be called on channel2 + verify(channel2, times(1)).newCall(any(), any()); + + pool.close(); + } } From 8f96ba59cb5bd18b3005c4c4228137692cadbf43 Mon Sep 17 00:00:00 2001 From: cloud-java-bot Date: Wed, 22 Apr 2026 04:49:44 +0000 Subject: [PATCH 2/2] chore: generate libraries at Wed Apr 22 04:47:14 UTC 2026 --- .github/scripts/update_generation_config.sh | 17 +++++++++++------ .github/workflows/renovate_config_check.yaml | 7 +++---- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/.github/scripts/update_generation_config.sh b/.github/scripts/update_generation_config.sh index 24d97abee3..f448fafd7d 100644 --- a/.github/scripts/update_generation_config.sh +++ b/.github/scripts/update_generation_config.sh @@ -48,13 +48,14 @@ function update_config() { } # Update an action to a new version in GitHub action. +# the second argument must have the git tag (including "v"). function update_action() { local key_word=$1 local new_value=$2 local file=$3 echo "Update ${key_word} to ${new_value} in ${file}" # use a different delimiter because the key_word contains "/". - sed -i -e "s|${key_word}@v.*$|${key_word}@v${new_value}|" "${file}" + sed -i -e "s|${key_word}@[^ ]*$|${key_word}@${new_value}|" "${file}" } # The parameters of this script is: @@ -143,12 +144,16 @@ rm -rf tmp-googleapis update_config "googleapis_commitish" "${latest_commit}" "${generation_config}" # Update gapic-generator-java version to the latest -latest_version=$(get_latest_released_version "com.google.api" "gapic-generator-java") -update_config "gapic_generator_version" "${latest_version}" "${generation_config}" - -# Update composite action version to latest gapic-generator-java version +latest_gapic_generator_version=$(get_latest_released_version "com.google.api" "gapic-generator-java") +update_config "gapic_generator_version" "${latest_gapic_generator_version}" "${generation_config}" + +# Update the GitHub Actions reference to the latest. +# After the google-cloud-java monorepo migration of sdk-platform-java, +# we cannot rely on the gapic-generator-java version tag. Let's use +# the gapic-libraries-bom version +latest_gapic_libraries_bom_version=$(get_latest_released_version "com.google.cloud" "gapic-libraries-bom") update_action "googleapis/google-cloud-java/sdk-platform-java/.github/scripts" \ - "${latest_version}" \ + "v${latest_gapic_libraries_bom_version}" \ "${workflow}" # Update libraries-bom version to the latest diff --git a/.github/workflows/renovate_config_check.yaml b/.github/workflows/renovate_config_check.yaml index 47b9e87c98..8c922936b9 100644 --- a/.github/workflows/renovate_config_check.yaml +++ b/.github/workflows/renovate_config_check.yaml @@ -4,6 +4,7 @@ on: pull_request: paths: - 'renovate.json' + - '.github/workflows/renovate_config_check.yaml' jobs: renovate_bot_config_validation: @@ -18,8 +19,6 @@ jobs: with: node-version: '22' - - name: Install Renovate and Config Validator + - name: Run Renovate Config Validator run: | - npm install -g npm@latest - npm install --global renovate - renovate-config-validator + npx --package renovate@43.136.0 renovate-config-validator