From fe36e6defde9cf6a73eea02a1f9f2ba3a47d974e Mon Sep 17 00:00:00 2001 From: vimanikag Date: Mon, 15 Sep 2025 06:05:38 +0000 Subject: [PATCH 01/14] 11246 :: Unexpected error when server expands a compressed message to learn it is too large --- .../java/io/grpc/internal/ServerImpl.java | 13 ++- .../java/io/grpc/internal/ServerImplTest.java | 94 +++++++++++++++++++ 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index dc0709e1fb8..75e410845ab 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -56,6 +56,7 @@ import io.grpc.ServerServiceDefinition; import io.grpc.ServerTransportFilter; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.perfmark.Link; import io.perfmark.PerfMark; import io.perfmark.Tag; @@ -808,10 +809,18 @@ void setListener(ServerStreamListener listener) { /** * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ - private void internalClose(Throwable t) { + private void internalClose(Throwable throwable) { // TODO(ejona86): this is not thread-safe :) String description = "Application error processing RPC"; - stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata()); + Status statusToPropagate = Status.UNKNOWN.withDescription(description).withCause(throwable); + if (throwable instanceof StatusRuntimeException) { + StatusRuntimeException statusRuntimeException = (StatusRuntimeException) throwable; + Status.Code code = statusRuntimeException.getStatus().getCode(); + if (code == Status.Code.RESOURCE_EXHAUSTED) { + statusToPropagate = statusRuntimeException.getStatus().withCause(throwable); + } + } + stream.close(statusToPropagate, new Metadata()); } @Override diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 0f18efe078c..1027fe53681 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -77,6 +77,7 @@ import io.grpc.ServiceDescriptor; import io.grpc.Status; import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; import io.grpc.StringMarshaller; import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder; @@ -1542,6 +1543,99 @@ public void channelz_transport_membershp() throws Exception { assertTrue(after.end); } + @Test + public void testInternalClose_nonProtocolStatusRuntimeExceptionBecomesUnknown() { + JumpToApplicationThreadServerStreamListener listener + = new JumpToApplicationThreadServerStreamListener( + executor.getScheduledExecutorService(), + executor.getScheduledExecutorService(), + stream, + Context.ROOT.withCancellation(), + PerfMark.createTag()); + ServerStreamListener mockListener = mock(ServerStreamListener.class); + listener.setListener(mockListener); + + StatusRuntimeException statusRuntimeException + = new StatusRuntimeException(Status.PERMISSION_DENIED.withDescription("denied")); + doThrow(statusRuntimeException).when(mockListener).onReady(); + listener.onReady(); + try { + executor.runDueTasks(); + fail("Expected exception"); + } catch (RuntimeException t) { + assertSame(statusRuntimeException, t); + ensureServerStateNotLeaked(); + } + verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); + Status status = statusCaptor.getValue(); + assertEquals(Code.UNKNOWN, status.getCode()); + assertEquals("Application error processing RPC", status.getDescription()); + assertEquals(statusRuntimeException, status.getCause()); + assertTrue(metadataCaptor.getValue().keys().isEmpty()); + } + + @Test + public void testInternalClose_otherExceptionBecomesUnknown() { + JumpToApplicationThreadServerStreamListener listener + = new JumpToApplicationThreadServerStreamListener( + executor.getScheduledExecutorService(), + executor.getScheduledExecutorService(), + stream, + Context.ROOT.withCancellation(), + PerfMark.createTag()); + ServerStreamListener mockListener = mock(ServerStreamListener.class); + listener.setListener(mockListener); + + RuntimeException expectedT = new RuntimeException(); + doThrow(expectedT).when(mockListener) + .messagesAvailable(any(StreamListener.MessageProducer.class)); + listener.messagesAvailable(mock(StreamListener.MessageProducer.class)); + try { + executor.runDueTasks(); + fail("Expected exception"); + } catch (RuntimeException t) { + assertSame(expectedT, t); + ensureServerStateNotLeaked(); + } + verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); + Status status = statusCaptor.getValue(); + assertEquals(Code.UNKNOWN, status.getCode()); + assertEquals("Application error processing RPC", status.getDescription()); + assertEquals(expectedT, status.getCause()); + assertTrue(metadataCaptor.getValue().keys().isEmpty()); + } + + @Test + public void testInternalClose_propagatesResourceExhausted() { + JumpToApplicationThreadServerStreamListener listener + = new JumpToApplicationThreadServerStreamListener( + executor.getScheduledExecutorService(), + executor.getScheduledExecutorService(), + stream, + Context.ROOT.withCancellation(), + PerfMark.createTag()); + ServerStreamListener mockListener = mock(ServerStreamListener.class); + listener.setListener(mockListener); + + StatusRuntimeException statusRuntimeException + = new StatusRuntimeException(Status.RESOURCE_EXHAUSTED.withDescription("exhausted")); + doThrow(statusRuntimeException).when(mockListener) + .messagesAvailable(any(StreamListener.MessageProducer.class)); + listener.messagesAvailable(mock(StreamListener.MessageProducer.class)); + try { + executor.runDueTasks(); + fail("Expected exception"); + } catch (RuntimeException t) { + assertSame(statusRuntimeException, t); + } + verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); + Status status = statusCaptor.getValue(); + assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); + assertEquals("exhausted", status.getDescription()); + assertEquals(statusRuntimeException, status.getCause()); + assertTrue(metadataCaptor.getValue().keys().isEmpty()); + } + private void createAndStartServer() throws IOException { createServer(); server.start(); From 61fbe27382416b77524824eeca9e5126f26a29a1 Mon Sep 17 00:00:00 2001 From: vimanikag Date: Mon, 15 Sep 2025 06:18:02 +0000 Subject: [PATCH 02/14] 11246 :: Unexpected error when server expands a compressed message to learn it is too large --- core/src/main/java/io/grpc/internal/ServerImpl.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 75e410845ab..d482ba908d7 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -809,15 +809,15 @@ void setListener(ServerStreamListener listener) { /** * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ - private void internalClose(Throwable throwable) { + private void internalClose(Throwable t) { // TODO(ejona86): this is not thread-safe :) String description = "Application error processing RPC"; - Status statusToPropagate = Status.UNKNOWN.withDescription(description).withCause(throwable); - if (throwable instanceof StatusRuntimeException) { - StatusRuntimeException statusRuntimeException = (StatusRuntimeException) throwable; + Status statusToPropagate = Status.UNKNOWN.withDescription(description).withCause(t); + if (t instanceof StatusRuntimeException) { + StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t; Status.Code code = statusRuntimeException.getStatus().getCode(); if (code == Status.Code.RESOURCE_EXHAUSTED) { - statusToPropagate = statusRuntimeException.getStatus().withCause(throwable); + statusToPropagate = statusRuntimeException.getStatus().withCause(t); } } stream.close(statusToPropagate, new Metadata()); From bb929aa37bdd66db0f319603f6bf5b076257c994 Mon Sep 17 00:00:00 2001 From: vimanikag Date: Mon, 15 Sep 2025 10:03:10 +0000 Subject: [PATCH 03/14] 11246 :: Unexpected error when server expands a compressed message to learn it is too large --- core/src/main/java/io/grpc/internal/ServerImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index d482ba908d7..9af2f8a2cf9 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -814,10 +814,8 @@ private void internalClose(Throwable t) { String description = "Application error processing RPC"; Status statusToPropagate = Status.UNKNOWN.withDescription(description).withCause(t); if (t instanceof StatusRuntimeException) { - StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t; - Status.Code code = statusRuntimeException.getStatus().getCode(); - if (code == Status.Code.RESOURCE_EXHAUSTED) { - statusToPropagate = statusRuntimeException.getStatus().withCause(t); + if (((StatusRuntimeException) t).getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) { + statusToPropagate = ((StatusRuntimeException) t).getStatus().withCause(t); } } stream.close(statusToPropagate, new Metadata()); From 1024a7f3340ccaa615aa23150e7fcdef39b2ccbb Mon Sep 17 00:00:00 2001 From: vimanikag Date: Tue, 7 Oct 2025 09:39:34 +0000 Subject: [PATCH 04/14] 11246:: addressing the review comments. --- core/src/main/java/io/grpc/internal/ServerImpl.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 9af2f8a2cf9..be956e72e61 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -56,6 +56,7 @@ import io.grpc.ServerServiceDefinition; import io.grpc.ServerTransportFilter; import io.grpc.Status; +import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import io.perfmark.Link; import io.perfmark.PerfMark; @@ -813,10 +814,14 @@ private void internalClose(Throwable t) { // TODO(ejona86): this is not thread-safe :) String description = "Application error processing RPC"; Status statusToPropagate = Status.UNKNOWN.withDescription(description).withCause(t); + Status extractedStatus = null; if (t instanceof StatusRuntimeException) { - if (((StatusRuntimeException) t).getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) { - statusToPropagate = ((StatusRuntimeException) t).getStatus().withCause(t); - } + extractedStatus = ((StatusRuntimeException) t).getStatus(); + } else if (t instanceof StatusException) { + extractedStatus = ((StatusException) t).getStatus(); + } + if (extractedStatus != null && extractedStatus.getCode() == Status.Code.RESOURCE_EXHAUSTED) { + statusToPropagate = extractedStatus.withCause(t); } stream.close(statusToPropagate, new Metadata()); } From 5410196c0f2cc2192e479e531e1bbdeb96747f0e Mon Sep 17 00:00:00 2001 From: vimanikag Date: Wed, 8 Oct 2025 09:45:13 +0000 Subject: [PATCH 05/14] 11246:: added the junit's for checked exception (StatusException) --- .../java/io/grpc/internal/ServerImplTest.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 1027fe53681..44b9e6230bb 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -77,6 +77,7 @@ import io.grpc.ServiceDescriptor; import io.grpc.Status; import io.grpc.Status.Code; +import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import io.grpc.StringMarshaller; import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; @@ -88,6 +89,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Arrays; @@ -1606,7 +1608,7 @@ public void testInternalClose_otherExceptionBecomesUnknown() { } @Test - public void testInternalClose_propagatesResourceExhausted() { + public void testInternalClose_propagatesStatusRuntimeException() { JumpToApplicationThreadServerStreamListener listener = new JumpToApplicationThreadServerStreamListener( executor.getScheduledExecutorService(), @@ -1636,6 +1638,33 @@ public void testInternalClose_propagatesResourceExhausted() { assertTrue(metadataCaptor.getValue().keys().isEmpty()); } + @Test + public void testInternalClose_propagatesStatusException() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + JumpToApplicationThreadServerStreamListener listener + = new JumpToApplicationThreadServerStreamListener( + executor.getScheduledExecutorService(), + executor.getScheduledExecutorService(), + stream, + Context.ROOT.withCancellation(), + PerfMark.createTag()); + + StatusException statusException + = new StatusException(Status.RESOURCE_EXHAUSTED.withDescription("exhausted")); + java.lang.reflect.Method internalClose = + JumpToApplicationThreadServerStreamListener.class.getDeclaredMethod( + "internalClose", Throwable.class); + internalClose.setAccessible(true); + + internalClose.invoke(listener, statusException); + verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); + Status status = statusCaptor.getValue(); + assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); + assertEquals("exhausted", status.getDescription()); + assertEquals(statusException, status.getCause()); + assertTrue(metadataCaptor.getValue().keys().isEmpty()); + } + private void createAndStartServer() throws IOException { createServer(); server.start(); From 14adfdb466e7a07d2512766e4f38c44e7904491c Mon Sep 17 00:00:00 2001 From: vimanikag Date: Wed, 15 Oct 2025 11:37:11 +0000 Subject: [PATCH 06/14] 11246:: addressing the review comments --- core/src/main/java/io/grpc/internal/ServerImpl.java | 5 ++++- .../src/test/java/io/grpc/internal/ServerImplTest.java | 10 ++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index be956e72e61..217ac5081b7 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -820,7 +820,10 @@ private void internalClose(Throwable t) { } else if (t instanceof StatusException) { extractedStatus = ((StatusException) t).getStatus(); } - if (extractedStatus != null && extractedStatus.getCode() == Status.Code.RESOURCE_EXHAUSTED) { + String message = t.getMessage(); + if (extractedStatus != null && extractedStatus.getCode() == Status.Code.RESOURCE_EXHAUSTED + && message != null + && message.contains("Decompressed gRPC message exceeds maximum size")) { statusToPropagate = extractedStatus.withCause(t); } stream.close(statusToPropagate, new Metadata()); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 44b9e6230bb..3eee31da2be 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -1620,7 +1620,8 @@ public void testInternalClose_propagatesStatusRuntimeException() { listener.setListener(mockListener); StatusRuntimeException statusRuntimeException - = new StatusRuntimeException(Status.RESOURCE_EXHAUSTED.withDescription("exhausted")); + = new StatusRuntimeException(Status.RESOURCE_EXHAUSTED + .withDescription("Decompressed gRPC message exceeds maximum size")); doThrow(statusRuntimeException).when(mockListener) .messagesAvailable(any(StreamListener.MessageProducer.class)); listener.messagesAvailable(mock(StreamListener.MessageProducer.class)); @@ -1633,7 +1634,7 @@ public void testInternalClose_propagatesStatusRuntimeException() { verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); Status status = statusCaptor.getValue(); assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); - assertEquals("exhausted", status.getDescription()); + assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription()); assertEquals(statusRuntimeException, status.getCause()); assertTrue(metadataCaptor.getValue().keys().isEmpty()); } @@ -1650,7 +1651,8 @@ public void testInternalClose_propagatesStatusException() PerfMark.createTag()); StatusException statusException - = new StatusException(Status.RESOURCE_EXHAUSTED.withDescription("exhausted")); + = new StatusException(Status.RESOURCE_EXHAUSTED + .withDescription("Decompressed gRPC message exceeds maximum size")); java.lang.reflect.Method internalClose = JumpToApplicationThreadServerStreamListener.class.getDeclaredMethod( "internalClose", Throwable.class); @@ -1660,7 +1662,7 @@ public void testInternalClose_propagatesStatusException() verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); Status status = statusCaptor.getValue(); assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); - assertEquals("exhausted", status.getDescription()); + assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription()); assertEquals(statusException, status.getCause()); assertTrue(metadataCaptor.getValue().keys().isEmpty()); } From 631edd3692d1d4cebd228ef1a36099277def9045 Mon Sep 17 00:00:00 2001 From: vimanikag Date: Fri, 17 Oct 2025 08:22:38 +0000 Subject: [PATCH 07/14] 11246:: addressing the review comments --- .../io/grpc/internal/MessageDeframer.java | 13 +- .../java/io/grpc/internal/ServerCallImpl.java | 10 ++ .../java/io/grpc/internal/ServerImpl.java | 17 +-- .../io/grpc/internal/MessageDeframerTest.java | 17 +++ .../java/io/grpc/internal/ServerImplTest.java | 125 ------------------ 5 files changed, 38 insertions(+), 144 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java index 13a01efec0a..66bc7c80ba0 100644 --- a/core/src/main/java/io/grpc/internal/MessageDeframer.java +++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java @@ -24,6 +24,7 @@ import io.grpc.Codec; import io.grpc.Decompressor; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import java.io.Closeable; import java.io.FilterInputStream; import java.io.IOException; @@ -519,9 +520,9 @@ private void reportCount() { private void verifySize() { if (count > maxMessageSize) { - throw Status.RESOURCE_EXHAUSTED - .withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize) - .asRuntimeException(); + throw new TooLongDecompressedMessageException( + Status.RESOURCE_EXHAUSTED + .withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize)); } } } @@ -541,4 +542,10 @@ public InputStream next() { return messageToReturn; } } + + static class TooLongDecompressedMessageException extends StatusRuntimeException { + public TooLongDecompressedMessageException(Status status) { + super(status); + } + } } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index e224384ce8f..ef9fa7e29c5 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -24,6 +24,7 @@ import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; +import static io.grpc.internal.MessageDeframer.TooLongDecompressedMessageException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; @@ -73,6 +74,7 @@ final class ServerCallImpl extends ServerCall { private boolean closeCalled; private Compressor compressor; private boolean messageSent; + private Status exceptionStatus = null; ServerCallImpl(ServerStream stream, MethodDescriptor method, Metadata inboundHeaders, Context.CancellableContext context, @@ -270,6 +272,11 @@ public SecurityLevel getSecurityLevel() { * on. */ private void handleInternalError(Throwable internalError) { + if (exceptionStatus != null) { + stream.close(exceptionStatus, new Metadata()); + exceptionStatus = null; + return; + } log.log(Level.WARNING, "Cancelling the stream because of internal error", internalError); Status status = (internalError instanceof StatusRuntimeException) ? ((StatusRuntimeException) internalError).getStatus() @@ -338,6 +345,9 @@ private void messagesAvailableInternal(final MessageProducer producer) { } message.close(); } + } catch (TooLongDecompressedMessageException e) { + this.call.exceptionStatus = e.getStatus(); + this.call.handleInternalError(e); } catch (Throwable t) { GrpcUtil.closeQuietly(producer); Throwables.throwIfUnchecked(t); diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 217ac5081b7..dc0709e1fb8 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -56,8 +56,6 @@ import io.grpc.ServerServiceDefinition; import io.grpc.ServerTransportFilter; import io.grpc.Status; -import io.grpc.StatusException; -import io.grpc.StatusRuntimeException; import io.perfmark.Link; import io.perfmark.PerfMark; import io.perfmark.Tag; @@ -813,20 +811,7 @@ void setListener(ServerStreamListener listener) { private void internalClose(Throwable t) { // TODO(ejona86): this is not thread-safe :) String description = "Application error processing RPC"; - Status statusToPropagate = Status.UNKNOWN.withDescription(description).withCause(t); - Status extractedStatus = null; - if (t instanceof StatusRuntimeException) { - extractedStatus = ((StatusRuntimeException) t).getStatus(); - } else if (t instanceof StatusException) { - extractedStatus = ((StatusException) t).getStatus(); - } - String message = t.getMessage(); - if (extractedStatus != null && extractedStatus.getCode() == Status.Code.RESOURCE_EXHAUSTED - && message != null - && message.contains("Decompressed gRPC message exceeds maximum size")) { - statusToPropagate = extractedStatus.withCause(t); - } - stream.close(statusToPropagate, new Metadata()); + stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata()); } @Override diff --git a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java index 54758bc096f..64cd83706e7 100644 --- a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java +++ b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; +import static io.grpc.internal.MessageDeframer.TooLongDecompressedMessageException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -491,6 +492,22 @@ public void sizeEnforcingInputStream_markReset() throws IOException { stream.close(); checkSizeEnforcingInputStreamStats(tracer, 3); } + + @Test + public void testThrows_TooLongDecompressedMessageException() throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8)); + SizeEnforcingInputStream stream = + new MessageDeframer.SizeEnforcingInputStream(in, 2, statsTraceCtx); + + try { + StatusRuntimeException e = + assertThrows(TooLongDecompressedMessageException.class, () -> stream.skip(4)); + assertThat(e).hasMessageThat() + .isEqualTo("RESOURCE_EXHAUSTED: Decompressed gRPC message exceeds maximum size 2"); + } finally { + stream.close(); + } + } } /** diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 3eee31da2be..0f18efe078c 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -77,8 +77,6 @@ import io.grpc.ServiceDescriptor; import io.grpc.Status; import io.grpc.Status.Code; -import io.grpc.StatusException; -import io.grpc.StatusRuntimeException; import io.grpc.StringMarshaller; import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder; @@ -89,7 +87,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Arrays; @@ -1545,128 +1542,6 @@ public void channelz_transport_membershp() throws Exception { assertTrue(after.end); } - @Test - public void testInternalClose_nonProtocolStatusRuntimeExceptionBecomesUnknown() { - JumpToApplicationThreadServerStreamListener listener - = new JumpToApplicationThreadServerStreamListener( - executor.getScheduledExecutorService(), - executor.getScheduledExecutorService(), - stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); - ServerStreamListener mockListener = mock(ServerStreamListener.class); - listener.setListener(mockListener); - - StatusRuntimeException statusRuntimeException - = new StatusRuntimeException(Status.PERMISSION_DENIED.withDescription("denied")); - doThrow(statusRuntimeException).when(mockListener).onReady(); - listener.onReady(); - try { - executor.runDueTasks(); - fail("Expected exception"); - } catch (RuntimeException t) { - assertSame(statusRuntimeException, t); - ensureServerStateNotLeaked(); - } - verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); - Status status = statusCaptor.getValue(); - assertEquals(Code.UNKNOWN, status.getCode()); - assertEquals("Application error processing RPC", status.getDescription()); - assertEquals(statusRuntimeException, status.getCause()); - assertTrue(metadataCaptor.getValue().keys().isEmpty()); - } - - @Test - public void testInternalClose_otherExceptionBecomesUnknown() { - JumpToApplicationThreadServerStreamListener listener - = new JumpToApplicationThreadServerStreamListener( - executor.getScheduledExecutorService(), - executor.getScheduledExecutorService(), - stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); - ServerStreamListener mockListener = mock(ServerStreamListener.class); - listener.setListener(mockListener); - - RuntimeException expectedT = new RuntimeException(); - doThrow(expectedT).when(mockListener) - .messagesAvailable(any(StreamListener.MessageProducer.class)); - listener.messagesAvailable(mock(StreamListener.MessageProducer.class)); - try { - executor.runDueTasks(); - fail("Expected exception"); - } catch (RuntimeException t) { - assertSame(expectedT, t); - ensureServerStateNotLeaked(); - } - verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); - Status status = statusCaptor.getValue(); - assertEquals(Code.UNKNOWN, status.getCode()); - assertEquals("Application error processing RPC", status.getDescription()); - assertEquals(expectedT, status.getCause()); - assertTrue(metadataCaptor.getValue().keys().isEmpty()); - } - - @Test - public void testInternalClose_propagatesStatusRuntimeException() { - JumpToApplicationThreadServerStreamListener listener - = new JumpToApplicationThreadServerStreamListener( - executor.getScheduledExecutorService(), - executor.getScheduledExecutorService(), - stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); - ServerStreamListener mockListener = mock(ServerStreamListener.class); - listener.setListener(mockListener); - - StatusRuntimeException statusRuntimeException - = new StatusRuntimeException(Status.RESOURCE_EXHAUSTED - .withDescription("Decompressed gRPC message exceeds maximum size")); - doThrow(statusRuntimeException).when(mockListener) - .messagesAvailable(any(StreamListener.MessageProducer.class)); - listener.messagesAvailable(mock(StreamListener.MessageProducer.class)); - try { - executor.runDueTasks(); - fail("Expected exception"); - } catch (RuntimeException t) { - assertSame(statusRuntimeException, t); - } - verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); - Status status = statusCaptor.getValue(); - assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); - assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription()); - assertEquals(statusRuntimeException, status.getCause()); - assertTrue(metadataCaptor.getValue().keys().isEmpty()); - } - - @Test - public void testInternalClose_propagatesStatusException() - throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - JumpToApplicationThreadServerStreamListener listener - = new JumpToApplicationThreadServerStreamListener( - executor.getScheduledExecutorService(), - executor.getScheduledExecutorService(), - stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); - - StatusException statusException - = new StatusException(Status.RESOURCE_EXHAUSTED - .withDescription("Decompressed gRPC message exceeds maximum size")); - java.lang.reflect.Method internalClose = - JumpToApplicationThreadServerStreamListener.class.getDeclaredMethod( - "internalClose", Throwable.class); - internalClose.setAccessible(true); - - internalClose.invoke(listener, statusException); - verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); - Status status = statusCaptor.getValue(); - assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); - assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription()); - assertEquals(statusException, status.getCause()); - assertTrue(metadataCaptor.getValue().keys().isEmpty()); - } - private void createAndStartServer() throws IOException { createServer(); server.start(); From 92b75a853d78b5811ebda8fbb44729a9e7355a3a Mon Sep 17 00:00:00 2001 From: vimanikag Date: Fri, 17 Oct 2025 09:01:55 +0000 Subject: [PATCH 08/14] 11246:: added the serialVersionUID or new static class to fix the PR check failures --- core/src/main/java/io/grpc/internal/MessageDeframer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java index 66bc7c80ba0..75272469489 100644 --- a/core/src/main/java/io/grpc/internal/MessageDeframer.java +++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java @@ -544,6 +544,8 @@ public InputStream next() { } static class TooLongDecompressedMessageException extends StatusRuntimeException { + private static final long serialVersionUID = 1L; + public TooLongDecompressedMessageException(Status status) { super(status); } From 894dc66981f0fc510bb3e835f3ddbc108fb12826 Mon Sep 17 00:00:00 2001 From: vimanikag Date: Mon, 20 Oct 2025 16:41:37 +0000 Subject: [PATCH 09/14] 11246:: addressed the review comments excluding junit's. --- .../io/grpc/internal/MessageDeframer.java | 15 +++---------- .../java/io/grpc/internal/ServerCallImpl.java | 21 +++++++++---------- .../io/grpc/internal/MessageDeframerTest.java | 17 --------------- 3 files changed, 13 insertions(+), 40 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java index 75272469489..13a01efec0a 100644 --- a/core/src/main/java/io/grpc/internal/MessageDeframer.java +++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java @@ -24,7 +24,6 @@ import io.grpc.Codec; import io.grpc.Decompressor; import io.grpc.Status; -import io.grpc.StatusRuntimeException; import java.io.Closeable; import java.io.FilterInputStream; import java.io.IOException; @@ -520,9 +519,9 @@ private void reportCount() { private void verifySize() { if (count > maxMessageSize) { - throw new TooLongDecompressedMessageException( - Status.RESOURCE_EXHAUSTED - .withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize)); + throw Status.RESOURCE_EXHAUSTED + .withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize) + .asRuntimeException(); } } } @@ -542,12 +541,4 @@ public InputStream next() { return messageToReturn; } } - - static class TooLongDecompressedMessageException extends StatusRuntimeException { - private static final long serialVersionUID = 1L; - - public TooLongDecompressedMessageException(Status status) { - super(status); - } - } } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index ef9fa7e29c5..df2ba243088 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -24,7 +24,6 @@ import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; -import static io.grpc.internal.MessageDeframer.TooLongDecompressedMessageException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; @@ -74,7 +73,6 @@ final class ServerCallImpl extends ServerCall { private boolean closeCalled; private Compressor compressor; private boolean messageSent; - private Status exceptionStatus = null; ServerCallImpl(ServerStream stream, MethodDescriptor method, Metadata inboundHeaders, Context.CancellableContext context, @@ -272,11 +270,6 @@ public SecurityLevel getSecurityLevel() { * on. */ private void handleInternalError(Throwable internalError) { - if (exceptionStatus != null) { - stream.close(exceptionStatus, new Metadata()); - exceptionStatus = null; - return; - } log.log(Level.WARNING, "Cancelling the stream because of internal error", internalError); Status status = (internalError instanceof StatusRuntimeException) ? ((StatusRuntimeException) internalError).getStatus() @@ -337,17 +330,23 @@ private void messagesAvailableInternal(final MessageProducer producer) { InputStream message; try { while ((message = producer.next()) != null) { + ReqT parsedMessage; + try { + parsedMessage = call.method.parseRequest(message); + } catch (StatusRuntimeException e) { + GrpcUtil.closeQuietly(message); + call.cancelled = true; + call.close(e.getStatus(), new Metadata()); + return; + } try { - listener.onMessage(call.method.parseRequest(message)); + listener.onMessage(parsedMessage); } catch (Throwable t) { GrpcUtil.closeQuietly(message); throw t; } message.close(); } - } catch (TooLongDecompressedMessageException e) { - this.call.exceptionStatus = e.getStatus(); - this.call.handleInternalError(e); } catch (Throwable t) { GrpcUtil.closeQuietly(producer); Throwables.throwIfUnchecked(t); diff --git a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java index 64cd83706e7..54758bc096f 100644 --- a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java +++ b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java @@ -18,7 +18,6 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; -import static io.grpc.internal.MessageDeframer.TooLongDecompressedMessageException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -492,22 +491,6 @@ public void sizeEnforcingInputStream_markReset() throws IOException { stream.close(); checkSizeEnforcingInputStreamStats(tracer, 3); } - - @Test - public void testThrows_TooLongDecompressedMessageException() throws IOException { - ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8)); - SizeEnforcingInputStream stream = - new MessageDeframer.SizeEnforcingInputStream(in, 2, statsTraceCtx); - - try { - StatusRuntimeException e = - assertThrows(TooLongDecompressedMessageException.class, () -> stream.skip(4)); - assertThat(e).hasMessageThat() - .isEqualTo("RESOURCE_EXHAUSTED: Decompressed gRPC message exceeds maximum size 2"); - } finally { - stream.close(); - } - } } /** From 65fbebb3686d4d63bcf19702ac1efc8987f3156c Mon Sep 17 00:00:00 2001 From: vimanikag Date: Tue, 21 Oct 2025 14:07:29 +0000 Subject: [PATCH 10/14] 11246:: added the junit's for the latest changes. --- .../io/grpc/internal/ServerCallImplTest.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index 7394c83eab2..0234337f231 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -48,9 +48,11 @@ import io.grpc.SecurityLevel; import io.grpc.ServerCall; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import org.junit.Before; @@ -69,6 +71,8 @@ public class ServerCallImplTest { @Mock private ServerStream stream; @Mock private ServerCall.Listener callListener; + @Mock private StreamListener.MessageProducer messageProducer; + @Mock private InputStream message; private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create(); private ServerCallImpl call; @@ -493,6 +497,43 @@ public void streamListener_unexpectedRuntimeException() { assertThat(e).hasMessageThat().isEqualTo("unexpected exception"); } + @Test + public void streamListener_statusRuntimeException() throws IOException { + MethodDescriptor badMethod = MethodDescriptor.newBuilder() + .setType(MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(new LongMarshaller() { + @Override + public Long parse(InputStream stream) { + throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED + .withDescription("Decompressed gRPC message exceeds maximum size")); + } + }) + .setResponseMarshaller(new LongMarshaller()) + .build(); + + call = new ServerCallImpl<>(stream, badMethod, requestHeaders, context, + DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), + serverCallTracer, PerfMark.createTag()); + + ServerStreamListenerImpl streamListener = + new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context); + + when(messageProducer.next()).thenReturn(message, (InputStream) null); + streamListener.messagesAvailable(messageProducer); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); + + verify(stream).close(statusCaptor.capture(),metadataCaptor.capture()); + Status status = statusCaptor.getValue(); + assertEquals(Status.RESOURCE_EXHAUSTED.getCode(), status.getCode()); + assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription()); + + verify(messageProducer).next(); + verify(message).close(); + verify(callListener,never()).onMessage(any()); + } + private static class LongMarshaller implements Marshaller { @Override public InputStream stream(Long value) { From 3b9ebbe505117ce44eee395bf3183db2324a366a Mon Sep 17 00:00:00 2001 From: vimanikag Date: Thu, 23 Oct 2025 10:05:49 +0000 Subject: [PATCH 11/14] 11246:: Updated the JUnit tests based on the recent review comments. --- .../io/grpc/internal/ServerCallImplTest.java | 41 ------------------- .../integration/AbstractInteropTest.java | 2 +- .../integration/TransportCompressionTest.java | 29 ++++++++++++- 3 files changed, 29 insertions(+), 43 deletions(-) diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index 0234337f231..7394c83eab2 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -48,11 +48,9 @@ import io.grpc.SecurityLevel; import io.grpc.ServerCall; import io.grpc.Status; -import io.grpc.StatusRuntimeException; import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import org.junit.Before; @@ -71,8 +69,6 @@ public class ServerCallImplTest { @Mock private ServerStream stream; @Mock private ServerCall.Listener callListener; - @Mock private StreamListener.MessageProducer messageProducer; - @Mock private InputStream message; private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create(); private ServerCallImpl call; @@ -497,43 +493,6 @@ public void streamListener_unexpectedRuntimeException() { assertThat(e).hasMessageThat().isEqualTo("unexpected exception"); } - @Test - public void streamListener_statusRuntimeException() throws IOException { - MethodDescriptor badMethod = MethodDescriptor.newBuilder() - .setType(MethodType.UNARY) - .setFullMethodName("service/method") - .setRequestMarshaller(new LongMarshaller() { - @Override - public Long parse(InputStream stream) { - throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED - .withDescription("Decompressed gRPC message exceeds maximum size")); - } - }) - .setResponseMarshaller(new LongMarshaller()) - .build(); - - call = new ServerCallImpl<>(stream, badMethod, requestHeaders, context, - DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer, PerfMark.createTag()); - - ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context); - - when(messageProducer.next()).thenReturn(message, (InputStream) null); - streamListener.messagesAvailable(messageProducer); - ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); - ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); - - verify(stream).close(statusCaptor.capture(),metadataCaptor.capture()); - Status status = statusCaptor.getValue(); - assertEquals(Status.RESOURCE_EXHAUSTED.getCode(), status.getCode()); - assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription()); - - verify(messageProducer).next(); - verify(message).close(); - verify(callListener,never()).onMessage(any()); - } - private static class LongMarshaller implements Marshaller { @Override public InputStream stream(Long value) { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 11455790497..843019433aa 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -2030,7 +2030,7 @@ private void assertPayload(Payload expected, Payload actual) { } } - private static void assertCodeEquals(Status.Code expected, Status actual) { + protected static void assertCodeEquals(Status.Code expected, Status actual) { assertWithMessage("Unexpected status: %s", actual).that(actual.getCode()).isEqualTo(expected); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java index b9692383254..33cd624aebb 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java @@ -17,6 +17,7 @@ package io.grpc.testing.integration; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; @@ -37,6 +38,8 @@ import io.grpc.ServerCall.Listener; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; import io.grpc.internal.GrpcUtil; import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.InternalNettyServerBuilder; @@ -53,7 +56,9 @@ import java.io.OutputStream; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -84,10 +89,16 @@ public static void registerCompressors() { compressors.register(Codec.Identity.NONE); } + @Rule + public final TestName currentTest = new TestName(); + @Override protected ServerBuilder getServerBuilder() { NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create()) - .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) + .maxInboundMessageSize( + DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME.equals(currentTest.getMethodName()) + ? 1000 + : AbstractInteropTest.MAX_MESSAGE_SIZE) .compressorRegistry(compressors) .decompressorRegistry(decompressors) .intercept(new ServerInterceptor() { @@ -126,6 +137,22 @@ public void compresses() { assertTrue(FZIPPER.anyWritten); } + private static final String DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME = + "decompressedMessageTooLong"; + + @Test + public void decompressedMessageTooLong() { + assertEquals(DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME, currentTest.getMethodName()); + final SimpleRequest bigRequest = SimpleRequest.newBuilder() + .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[10_000]))) + .build(); + StatusRuntimeException e = assertThrows(StatusRuntimeException.class, + () -> blockingStub.withCompression("gzip").unaryCall(bigRequest)); + assertCodeEquals(Code.RESOURCE_EXHAUSTED, e.getStatus()); + assertEquals("Decompressed gRPC message exceeds maximum size 1000", + e.getStatus().getDescription()); + } + @Override protected NettyChannelBuilder createChannelBuilder() { NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) From 55741e4c2257455d34af8f915f0cadd8e445ca1c Mon Sep 17 00:00:00 2001 From: vimanikag Date: Fri, 24 Oct 2025 08:29:03 +0000 Subject: [PATCH 12/14] 11246:: added the JUnit tests based on the recent review comments. --- .../io/grpc/internal/ServerCallImplTest.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index 7394c83eab2..87f7aa84bd3 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -48,9 +48,11 @@ import io.grpc.SecurityLevel; import io.grpc.ServerCall; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import org.junit.Before; @@ -69,6 +71,8 @@ public class ServerCallImplTest { @Mock private ServerStream stream; @Mock private ServerCall.Listener callListener; + @Mock private StreamListener.MessageProducer messageProducer; + @Mock private InputStream message; private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create(); private ServerCallImpl call; @@ -493,6 +497,45 @@ public void streamListener_unexpectedRuntimeException() { assertThat(e).hasMessageThat().isEqualTo("unexpected exception"); } + @Test + public void streamListener_statusRuntimeException() throws IOException { + MethodDescriptor failingParseMethod = MethodDescriptor.newBuilder() + .setType(MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(new LongMarshaller() { + @Override + public Long parse(InputStream stream) { + throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED + .withDescription("Decompressed gRPC message exceeds maximum size")); + } + }) + .setResponseMarshaller(new LongMarshaller()) + .build(); + + call = new ServerCallImpl<>(stream, failingParseMethod, requestHeaders, context, + DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), + serverCallTracer, PerfMark.createTag()); + + ServerStreamListenerImpl streamListener = + new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context); + + when(messageProducer.next()).thenReturn(message, (InputStream) null); + streamListener.messagesAvailable(messageProducer); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); + + verify(stream).close(statusCaptor.capture(),metadataCaptor.capture()); + Status status = statusCaptor.getValue(); + assertEquals(Status.RESOURCE_EXHAUSTED.getCode(), status.getCode()); + assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription()); + + streamListener.halfClosed(); + verify(callListener, never()).onHalfClose(); + verify(messageProducer).next(); + verify(message).close(); + verify(callListener,never()).onMessage(any()); + } + private static class LongMarshaller implements Marshaller { @Override public InputStream stream(Long value) { From 26095ef6b7305ad8b77aa3a506015e2fb7ce23e8 Mon Sep 17 00:00:00 2001 From: vimanikag Date: Mon, 27 Oct 2025 08:37:12 +0000 Subject: [PATCH 13/14] 11246:: Addressing the latest review comments --- core/src/main/java/io/grpc/internal/ServerCallImpl.java | 5 +++-- core/src/test/java/io/grpc/internal/ServerCallImplTest.java | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index df2ba243088..124322af28f 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -327,14 +327,15 @@ private void messagesAvailableInternal(final MessageProducer producer) { return; } - InputStream message; try { + InputStream message; while ((message = producer.next()) != null) { ReqT parsedMessage; try { parsedMessage = call.method.parseRequest(message); } catch (StatusRuntimeException e) { GrpcUtil.closeQuietly(message); + GrpcUtil.closeQuietly(producer); call.cancelled = true; call.close(e.getStatus(), new Metadata()); return; @@ -343,9 +344,9 @@ private void messagesAvailableInternal(final MessageProducer producer) { listener.onMessage(parsedMessage); } catch (Throwable t) { GrpcUtil.closeQuietly(message); + message.close(); throw t; } - message.close(); } } catch (Throwable t) { GrpcUtil.closeQuietly(producer); diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index 87f7aa84bd3..ca2d3317946 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -531,7 +531,6 @@ public Long parse(InputStream stream) { streamListener.halfClosed(); verify(callListener, never()).onHalfClose(); - verify(messageProducer).next(); verify(message).close(); verify(callListener,never()).onMessage(any()); } From f16cd6b317b503246d57ed53bdc4fac9ef7f36fc Mon Sep 17 00:00:00 2001 From: vimanikag Date: Tue, 28 Oct 2025 08:08:52 +0000 Subject: [PATCH 14/14] 11246:: Addressing the latest review comments --- .../src/main/java/io/grpc/internal/ServerCallImpl.java | 10 ++-------- .../test/java/io/grpc/internal/ServerCallImplTest.java | 1 - 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 124322af28f..22d5912050a 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -331,7 +331,7 @@ private void messagesAvailableInternal(final MessageProducer producer) { InputStream message; while ((message = producer.next()) != null) { ReqT parsedMessage; - try { + try (InputStream ignored = message) { parsedMessage = call.method.parseRequest(message); } catch (StatusRuntimeException e) { GrpcUtil.closeQuietly(message); @@ -340,13 +340,7 @@ private void messagesAvailableInternal(final MessageProducer producer) { call.close(e.getStatus(), new Metadata()); return; } - try { - listener.onMessage(parsedMessage); - } catch (Throwable t) { - GrpcUtil.closeQuietly(message); - message.close(); - throw t; - } + listener.onMessage(parsedMessage); } } catch (Throwable t) { GrpcUtil.closeQuietly(producer); diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index ca2d3317946..ef5ff7d48ea 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -531,7 +531,6 @@ public Long parse(InputStream stream) { streamListener.halfClosed(); verify(callListener, never()).onHalfClose(); - verify(message).close(); verify(callListener,never()).onMessage(any()); }