Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,16 @@ private void removeGroup(AfeChannelGroup group) {
}

@GuardedBy("this")
private AfeChannelGroup rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId) {
private void rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId) {
// No need to rehome recycled channels.
if (channelWrapper.channel.isShutdown()) {
return;
}

AfeChannelGroup origGroup = channelWrapper.group;

if (Objects.equals(origGroup.afeId, afeId)) {
return origGroup;
return;
}

log(Level.FINE, "Rehoming channel from: %s to %s", origGroup.afeId, afeId);
Expand All @@ -291,7 +296,7 @@ private AfeChannelGroup rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId
newGroup.channels.add(channelWrapper);
newGroup.numStreams += channelWrapper.numOutstanding;

return newGroup;
return;
}

// Update accounting when a stream is closed and releases its channel
Expand Down Expand Up @@ -322,6 +327,11 @@ private static boolean shouldRecycleChannel(Status status) {

@GuardedBy("this")
private void recycleChannel(ChannelWrapper channelWrapper) {
if (channelWrapper.channel.isShutdown()) {
// Channel is already recycled.
return;
}

channelWrapper.group.channels.remove(channelWrapper);
channelWrapper.channel.shutdown();
// Checking for starting group because we don't want to delete the stating group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,98 @@ void testRecycleChannelInGroupOnUnimplemented() {

pool.close();
}

@Test
void testDoubleRecycleCreatesExtraChannel() {
when(channelSupplier.get()).thenReturn(channel);
when(channel.newCall(any(), any())).thenReturn(clientCall);
doNothing().when(clientCall).start(listener.capture(), any());

ChannelPoolDpImpl pool =
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);

// Create 2 streams on the same channel
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
.start(Mockito.mock(Listener.class), new Metadata());
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
.start(Mockito.mock(Listener.class), new Metadata());

// Initially 1 channel
verify(channelSupplier, times(1)).get();

// Trigger first recycle via first stream
ClientCall.Listener<Object> listener1 = listener.getAllValues().get(0);
listener1.onClose(Status.UNIMPLEMENTED, new Metadata());

// Channel should be recycled (shutdown + addChannel)
verify(channel, times(1)).shutdown();
// Now isShutdown returns true for the channel
when(channel.isShutdown()).thenReturn(true);
verify(channelSupplier, times(2)).get();

// Trigger second recycle via second stream on the SAME (already recycled) channel
ClientCall.Listener<Object> listener2 = listener.getAllValues().get(1);
listener2.onClose(Status.UNIMPLEMENTED, new Metadata());

// BUG: This should NOT cause another addChannel() or shutdown()
// If it fails, times(3) will be true.
verify(channelSupplier, times(2)).get();
verify(channel, times(1)).shutdown();

pool.close();
}

@Test
void testRecycledChannelDoesNotRejoinPool() throws InterruptedException {
when(channelSupplier.get()).thenReturn(channel);
when(channel.newCall(any(), any())).thenReturn(clientCall);
doNothing().when(clientCall).start(listener.capture(), any());
doReturn(Attributes.EMPTY).when(clientCall).getAttributes();

ChannelPoolDpImpl pool =
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);

// 1. Create stream1 on channel1
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
.start(Mockito.mock(Listener.class), new Metadata());

// 2. Create stream2 on channel1 to trigger recycle
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
.start(Mockito.mock(Listener.class), new Metadata());

ClientCall.Listener<Object> listener1 = listener.getAllValues().get(0);
ClientCall.Listener<Object> listener2 = listener.getAllValues().get(1);

// Prepare channel2 that will be picked up by addChannel during recycling
ManagedChannel channel2 = Mockito.mock(ManagedChannel.class);
when(channelSupplier.get()).thenReturn(channel2);
when(channel2.newCall(any(), any())).thenReturn(clientCall);

// 3. Recycle channel1 via stream2
listener2.onClose(Status.UNIMPLEMENTED, new Metadata());
verify(channel, times(1)).shutdown();
// Now isShutdown for the channel1 returns true
when(channel.isShutdown()).thenReturn(true);

// 4. stream1 (on recycled channel1) receives headers with AFE ID
// This triggers rehomeChannel
PeerInfo peerInfo = PeerInfo.newBuilder().setApplicationFrontendId(555).build();
Metadata headers = new Metadata();
headers.put(
SessionStreamImpl.PEER_INFO_KEY,
Base64.getEncoder().encodeToString(peerInfo.toByteArray()));
listener1.onHeaders(headers);

// 5. Try to create a new stream.
// It should NOT pick channel1 because it's recycled/shutdown.
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT);

// BUG: If channel1 was re-added to a group, the picker might have picked it.
// channel.newCall was called 2 times (steps 1 and 2). It should NOT be called a 3rd time.
verify(channel, times(2)).newCall(any(), any());
// Instead, it should be called on channel2
verify(channel2, times(1)).newCall(any(), any());

pool.close();
}
}
Loading