Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions core/src/main/java/io/grpc/internal/ServerCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,20 @@ private void messagesAvailableInternal(final MessageProducer producer) {
return;
}

InputStream message;
try {
InputStream message;
while ((message = producer.next()) != null) {
try {
listener.onMessage(call.method.parseRequest(message));
} catch (Throwable t) {
ReqT parsedMessage;
try (InputStream ignored = message) {
parsedMessage = call.method.parseRequest(message);
} catch (StatusRuntimeException e) {
GrpcUtil.closeQuietly(message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additionally need GrpcUtil.closeQuietly(producer);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.

throw t;
GrpcUtil.closeQuietly(producer);
call.cancelled = true;
call.close(e.getStatus(), new Metadata());
return;
}
message.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before listener.onMessage()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while ((message = producer.next()) != null) {
  ReqT parsedMessage;
  try (InputStream ignored = message) {
    parsedMessage = call.method.parseRequest(message);
  } catch (StatusRuntimeException e) {
    GrpcUtil.closeQuietly(producer);
    call.cancelled = true;
    call.close(e.getStatus(), new Metadata());
    return;
  }
  listener.onMessage(parsedMessage);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@panchenko ,Thank you for clarifying the suggestions. I apologize for the misunderstanding; I initially assumed I should remove listener.onMessage(parsedMessage); along with the try and adding the catch to previous try , which caused failures in existing test cases. I have now corrected this and addressed the comments."

listener.onMessage(parsedMessage);
}
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
Expand Down
41 changes: 41 additions & 0 deletions core/src/test/java/io/grpc/internal/ServerCallImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@
import io.grpc.SecurityLevel;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
import io.perfmark.PerfMark;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.junit.Before;
Expand All @@ -69,6 +71,8 @@ public class ServerCallImplTest {

@Mock private ServerStream stream;
@Mock private ServerCall.Listener<Long> callListener;
@Mock private StreamListener.MessageProducer messageProducer;
@Mock private InputStream message;

private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create();
private ServerCallImpl<Long, Long> call;
Expand Down Expand Up @@ -493,6 +497,43 @@ public void streamListener_unexpectedRuntimeException() {
assertThat(e).hasMessageThat().isEqualTo("unexpected exception");
}

@Test
public void streamListener_statusRuntimeException() throws IOException {
MethodDescriptor<Long, Long> failingParseMethod = MethodDescriptor.<Long, Long>newBuilder()
.setType(MethodType.UNARY)
.setFullMethodName("service/method")
.setRequestMarshaller(new LongMarshaller() {
@Override
public Long parse(InputStream stream) {
throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED
.withDescription("Decompressed gRPC message exceeds maximum size"));
}
})
.setResponseMarshaller(new LongMarshaller())
.build();

call = new ServerCallImpl<>(stream, failingParseMethod, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
serverCallTracer, PerfMark.createTag());

ServerStreamListenerImpl<Long> streamListener =
new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context);

when(messageProducer.next()).thenReturn(message, (InputStream) null);
streamListener.messagesAvailable(messageProducer);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);

verify(stream).close(statusCaptor.capture(),metadataCaptor.capture());
Status status = statusCaptor.getValue();
assertEquals(Status.RESOURCE_EXHAUSTED.getCode(), status.getCode());
assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription());

streamListener.halfClosed();
verify(callListener, never()).onHalfClose();
verify(callListener,never()).onMessage(any());
}

private static class LongMarshaller implements Marshaller<Long> {
@Override
public InputStream stream(Long value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2030,7 +2030,7 @@ private void assertPayload(Payload expected, Payload actual) {
}
}

private static void assertCodeEquals(Status.Code expected, Status actual) {
protected static void assertCodeEquals(Status.Code expected, Status actual) {
assertWithMessage("Unexpected status: %s", actual).that(actual.getCode()).isEqualTo(expected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.testing.integration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.protobuf.ByteString;
Expand All @@ -37,6 +38,8 @@
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.GrpcUtil;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyServerBuilder;
Expand All @@ -53,7 +56,9 @@
import java.io.OutputStream;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

Expand Down Expand Up @@ -84,10 +89,16 @@ public static void registerCompressors() {
compressors.register(Codec.Identity.NONE);
}

@Rule
public final TestName currentTest = new TestName();

@Override
protected ServerBuilder<?> getServerBuilder() {
NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create())
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.maxInboundMessageSize(
DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME.equals(currentTest.getMethodName())
? 1000
: AbstractInteropTest.MAX_MESSAGE_SIZE)
.compressorRegistry(compressors)
.decompressorRegistry(decompressors)
.intercept(new ServerInterceptor() {
Expand Down Expand Up @@ -126,6 +137,22 @@ public void compresses() {
assertTrue(FZIPPER.anyWritten);
}

private static final String DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME =
"decompressedMessageTooLong";

@Test
public void decompressedMessageTooLong() {
Copy link
Contributor

@kannanjgithub kannanjgithub Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This e2e test is great but the unit test you previously wrote in ServerCallImplTest also lets us test code paths in a more fine-grained way. In particular lets re-introduce that test and add the following after causing the parse exception to happen in message handling -

 streamListener.halfClosed();

 verify(callListener, never()).onHalfClose();

in order to test that call.close boolean has been set by the handling of the parse exception, that a future method calls from ServerImpl are simply returned without calling the corresponding method on the listener.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kannanjgithub for the review and suggestions. I've addressed the comments , please review the changes when you have a moment.

assertEquals(DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME, currentTest.getMethodName());
final SimpleRequest bigRequest = SimpleRequest.newBuilder()
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[10_000])))
.build();
StatusRuntimeException e = assertThrows(StatusRuntimeException.class,
() -> blockingStub.withCompression("gzip").unaryCall(bigRequest));
assertCodeEquals(Code.RESOURCE_EXHAUSTED, e.getStatus());
assertEquals("Decompressed gRPC message exceeds maximum size 1000",
e.getStatus().getDescription());
}

@Override
protected NettyChannelBuilder createChannelBuilder() {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())
Expand Down