Skip to content

Commit 7353ab4

Browse files
committed
Fix memory leak in WiretapConnector
Prior to this commit, we found in gh-35953 that using the `WebTestClient` the following way leaks data buffers: ``` var body = client.get().uri("download") .exchange() .expectStatus().isOk() .returnResult() .getResponseBodyContent(); ``` Here, the test performs expectations on the response status and headers, but not on the response body. The WiretapConnector already supports this case by subscribing to the Flux response body in those cases and accumulating the entire content as a single byte[]. Here, the `DataBuffer` instances are not decoded by any `Decoder` and are not released. This results in a memory leak. This commit ensures that the automatic subscription in `WiretapConnector` also releases the buffers automatically as the DSL does not allow at that point to go back to performing body expectations. Fixes gh-36050
1 parent f1db0ef commit 7353ab4

File tree

2 files changed

+76
-6
lines changed

2 files changed

+76
-6
lines changed

spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import reactor.core.publisher.Sinks;
3232

3333
import org.springframework.core.io.buffer.DataBuffer;
34+
import org.springframework.core.io.buffer.DataBufferUtils;
3435
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
3536
import org.springframework.http.HttpMethod;
3637
import org.springframework.http.client.reactive.ClientHttpConnector;
@@ -196,11 +197,21 @@ public Mono<byte[]> getContent() {
196197
// 1. Mock server never consumed request body (for example, error before read)
197198
// 2. FluxExchangeResult: getResponseBodyContent called before getResponseBody
198199
//noinspection ConstantConditions
199-
(this.publisher != null ? this.publisher : this.publisherNested)
200-
.onErrorMap(ex -> new IllegalStateException(
201-
"Content has not been consumed, and " +
202-
"an error was raised while attempting to produce it.", ex))
203-
.subscribe();
200+
if (this.publisher != null) {
201+
this.publisher.doOnNext(DataBufferUtils::release)
202+
.onErrorMap(ex -> new IllegalStateException(
203+
"Content has not been consumed, and " +
204+
"an error was raised while attempting to produce it.", ex))
205+
.subscribe();
206+
}
207+
else if (this.publisherNested != null) {
208+
this.publisherNested
209+
.map(pub -> Flux.from(pub).doOnNext(DataBufferUtils::release))
210+
.onErrorMap(ex -> new IllegalStateException(
211+
"Content has not been consumed, and " +
212+
"an error was raised while attempting to produce it.", ex))
213+
.subscribe();
214+
}
204215
}
205216
return this.content.asMono();
206217
});

spring-test/src/test/java/org/springframework/test/web/reactive/server/WiretapConnectorTests.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,18 @@
1717
package org.springframework.test.web.reactive.server;
1818

1919
import java.net.URI;
20+
import java.nio.charset.StandardCharsets;
2021
import java.time.Duration;
2122

23+
import io.netty.buffer.PooledByteBufAllocator;
24+
import org.junit.jupiter.api.AfterEach;
2225
import org.junit.jupiter.api.Test;
26+
import reactor.core.publisher.Flux;
2327
import reactor.core.publisher.Mono;
2428

29+
import org.springframework.core.io.buffer.DataBuffer;
30+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
31+
import org.springframework.core.testfixture.io.buffer.LeakAwareDataBufferFactory;
2532
import org.springframework.http.HttpMethod;
2633
import org.springframework.http.HttpStatus;
2734
import org.springframework.http.client.reactive.ClientHttpConnector;
@@ -30,6 +37,7 @@
3037
import org.springframework.mock.http.client.reactive.MockClientHttpRequest;
3138
import org.springframework.mock.http.client.reactive.MockClientHttpResponse;
3239
import org.springframework.web.reactive.function.client.ClientRequest;
40+
import org.springframework.web.reactive.function.client.ClientResponse;
3341
import org.springframework.web.reactive.function.client.ExchangeFunction;
3442
import org.springframework.web.reactive.function.client.ExchangeFunctions;
3543

@@ -44,11 +52,19 @@
4452
*/
4553
public class WiretapConnectorTests {
4654

55+
private final LeakAwareDataBufferFactory bufferFactory =
56+
new LeakAwareDataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT));
57+
58+
@AfterEach
59+
void tearDown() {
60+
this.bufferFactory.checkForLeaks();
61+
}
62+
4763
@Test
4864
public void captureAndClaim() {
4965
ClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
5066
ClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
51-
ClientHttpConnector connector = (method, uri, fn) -> fn.apply(request).then(Mono.just(response));
67+
ClientHttpConnector connector = createConnector(request, response);
5268

5369
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
5470
.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
@@ -62,4 +78,47 @@ public void captureAndClaim() {
6278
assertThat(result.getUrl().toString()).isEqualTo("/test");
6379
}
6480

81+
@Test
82+
void shouldReleaseBuffers() {
83+
MockClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
84+
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
85+
response.setBody(Flux.just(toDataBuffer("Hello Spring")));
86+
ClientHttpConnector connector = createConnector(request, response);
87+
88+
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
89+
.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
90+
91+
WiretapConnector wiretapConnector = new WiretapConnector(connector, null);
92+
ExchangeFunction function = ExchangeFunctions.create(wiretapConnector);
93+
function.exchange(clientRequest).block(ofMillis(0));
94+
ExchangeResult result = wiretapConnector.getExchangeResult("1", null, Duration.ZERO);
95+
result.getResponseBodyContent();
96+
}
97+
98+
@Test
99+
void shouldReleaseBuffersOnlyOnce() {
100+
MockClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
101+
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
102+
response.setBody(Flux.just(toDataBuffer("Hello Spring"), toDataBuffer("Hello Spring"), toDataBuffer("Hello Spring"), toDataBuffer("Hello Spring")));
103+
ClientHttpConnector connector = createConnector(request, response);
104+
105+
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
106+
.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
107+
108+
WiretapConnector wiretapConnector = new WiretapConnector(connector, null);
109+
ExchangeFunction function = ExchangeFunctions.create(wiretapConnector);
110+
function.exchange(clientRequest).flatMap(ClientResponse::releaseBody).block(ofMillis(0));
111+
ExchangeResult result = wiretapConnector.getExchangeResult("1", null, Duration.ZERO);
112+
result.getResponseBodyContent();
113+
}
114+
115+
private ClientHttpConnector createConnector(ClientHttpRequest request, ClientHttpResponse response) {
116+
return (method, uri, fn) -> fn.apply(request).then(Mono.just(response));
117+
}
118+
119+
private DataBuffer toDataBuffer(String s) {
120+
DataBuffer buffer = this.bufferFactory.allocateBuffer(256);
121+
buffer.write(s.getBytes(StandardCharsets.UTF_8));
122+
return buffer;
123+
}
65124
}

0 commit comments

Comments
 (0)