diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java index 28a05523f2..9ef2a486f1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java @@ -262,11 +262,16 @@ private void removeGroup(AfeChannelGroup group) { } @GuardedBy("this") - private AfeChannelGroup rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId) { + private void rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId) { + // No need to rehome recycled channels. + if (channelWrapper.channel.isShutdown()) { + return; + } + AfeChannelGroup origGroup = channelWrapper.group; if (Objects.equals(origGroup.afeId, afeId)) { - return origGroup; + return; } log(Level.FINE, "Rehoming channel from: %s to %s", origGroup.afeId, afeId); @@ -291,7 +296,7 @@ private AfeChannelGroup rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId newGroup.channels.add(channelWrapper); newGroup.numStreams += channelWrapper.numOutstanding; - return newGroup; + return; } // Update accounting when a stream is closed and releases its channel @@ -322,6 +327,11 @@ private static boolean shouldRecycleChannel(Status status) { @GuardedBy("this") private void recycleChannel(ChannelWrapper channelWrapper) { + if (channelWrapper.channel.isShutdown()) { + // Channel is already recycled. + return; + } + channelWrapper.group.channels.remove(channelWrapper); channelWrapper.channel.shutdown(); // Checking for starting group because we don't want to delete the stating group. diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java index 978da523ed..b5e2de38fa 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java @@ -375,4 +375,98 @@ void testRecycleChannelInGroupOnUnimplemented() { pool.close(); } + + @Test + void testDoubleRecycleCreatesExtraChannel() { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + // Create 2 streams on the same channel + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(Mockito.mock(Listener.class), new Metadata()); + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(Mockito.mock(Listener.class), new Metadata()); + + // Initially 1 channel + verify(channelSupplier, times(1)).get(); + + // Trigger first recycle via first stream + ClientCall.Listener listener1 = listener.getAllValues().get(0); + listener1.onClose(Status.UNIMPLEMENTED, new Metadata()); + + // Channel should be recycled (shutdown + addChannel) + verify(channel, times(1)).shutdown(); + // Now isShutdown returns true for the channel + when(channel.isShutdown()).thenReturn(true); + verify(channelSupplier, times(2)).get(); + + // Trigger second recycle via second stream on the SAME (already recycled) channel + ClientCall.Listener listener2 = listener.getAllValues().get(1); + listener2.onClose(Status.UNIMPLEMENTED, new Metadata()); + + // BUG: This should NOT cause another addChannel() or shutdown() + // If it fails, times(3) will be true. + verify(channelSupplier, times(2)).get(); + verify(channel, times(1)).shutdown(); + + pool.close(); + } + + @Test + void testRecycledChannelDoesNotRejoinPool() throws InterruptedException { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + doReturn(Attributes.EMPTY).when(clientCall).getAttributes(); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + // 1. Create stream1 on channel1 + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(Mockito.mock(Listener.class), new Metadata()); + + // 2. Create stream2 on channel1 to trigger recycle + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(Mockito.mock(Listener.class), new Metadata()); + + ClientCall.Listener listener1 = listener.getAllValues().get(0); + ClientCall.Listener listener2 = listener.getAllValues().get(1); + + // Prepare channel2 that will be picked up by addChannel during recycling + ManagedChannel channel2 = Mockito.mock(ManagedChannel.class); + when(channelSupplier.get()).thenReturn(channel2); + when(channel2.newCall(any(), any())).thenReturn(clientCall); + + // 3. Recycle channel1 via stream2 + listener2.onClose(Status.UNIMPLEMENTED, new Metadata()); + verify(channel, times(1)).shutdown(); + // Now isShutdown for the channel1 returns true + when(channel.isShutdown()).thenReturn(true); + + // 4. stream1 (on recycled channel1) receives headers with AFE ID + // This triggers rehomeChannel + PeerInfo peerInfo = PeerInfo.newBuilder().setApplicationFrontendId(555).build(); + Metadata headers = new Metadata(); + headers.put( + SessionStreamImpl.PEER_INFO_KEY, + Base64.getEncoder().encodeToString(peerInfo.toByteArray())); + listener1.onHeaders(headers); + + // 5. Try to create a new stream. + // It should NOT pick channel1 because it's recycled/shutdown. + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT); + + // BUG: If channel1 was re-added to a group, the picker might have picked it. + // channel.newCall was called 2 times (steps 1 and 2). It should NOT be called a 3rd time. + verify(channel, times(2)).newCall(any(), any()); + // Instead, it should be called on channel2 + verify(channel2, times(1)).newCall(any(), any()); + + pool.close(); + } }