From 2d9b712221e70dbade97d719eaa8554b275198ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D1=8C=D0=B5=D0=B2=20=D0=90?= =?UTF-8?q?=D0=BD=D0=B4=D1=80=D0=B5=D0=B9=20=D0=93=D0=B5=D0=BE=D1=80=D0=B3?= =?UTF-8?q?=D0=B8=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 3 Jun 2025 19:44:52 +0500 Subject: [PATCH 1/4] =?UTF-8?q?=D0=94=D0=97-34:=20grpc=20-=20done?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- L34-grpc/build.gradle.kts | 87 +++++++++++++++++++ .../java/ru/otus/protobuf/NumServiceImpl.java | 36 ++++++++ .../java/ru/otus/protobuf/NumsClient.java | 64 ++++++++++++++ .../protobuf/NumsClientStreamObserver.java | 32 +++++++ .../java/ru/otus/protobuf/NumsServer.java | 22 +++++ L34-grpc/src/main/proto/NumService.proto | 18 ++++ settings.gradle.kts | 1 + 7 files changed, 260 insertions(+) create mode 100644 L34-grpc/build.gradle.kts create mode 100644 L34-grpc/src/main/java/ru/otus/protobuf/NumServiceImpl.java create mode 100644 L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java create mode 100644 L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java create mode 100644 L34-grpc/src/main/java/ru/otus/protobuf/NumsServer.java create mode 100644 L34-grpc/src/main/proto/NumService.proto diff --git a/L34-grpc/build.gradle.kts b/L34-grpc/build.gradle.kts new file mode 100644 index 0000000..1ac581e --- /dev/null +++ b/L34-grpc/build.gradle.kts @@ -0,0 +1,87 @@ +import com.google.protobuf.gradle.generateProtoTasks +import com.google.protobuf.gradle.ofSourceSet +import com.google.protobuf.gradle.id +import com.google.protobuf.gradle.plugins +import com.google.protobuf.gradle.protobuf +import com.google.protobuf.gradle.protoc + +plugins { + id("idea") + id("com.google.protobuf") +} + +val errorProneAnnotations: String by project +val tomcatAnnotationsApi: String by project + +dependencies { + implementation("io.grpc:grpc-netty") + implementation("io.grpc:grpc-protobuf") + implementation("io.grpc:grpc-stub") + implementation("com.google.protobuf:protobuf-java") + implementation("com.google.errorprone:error_prone_annotations:$errorProneAnnotations") + + implementation("org.apache.tomcat:annotations-api:$tomcatAnnotationsApi") + + implementation ("ch.qos.logback:logback-classic") +} + +val protoSrcDir = "$projectDir/build/generated" + +idea { + module { + sourceDirs = sourceDirs.plus(file(protoSrcDir)) + } +} + +sourceSets { + main { + proto { + srcDir(protoSrcDir) + } + } +} + +protobuf { + generatedFilesBaseDir = protoSrcDir + + protoc { + artifact = "com.google.protobuf:protoc:3.19.4" + } + + generateProtoTasks { + ofSourceSet("main") + } +} + +afterEvaluate { + tasks { + getByName("generateProto").dependsOn(processResources) + } +} + +protobuf { + generatedFilesBaseDir = protoSrcDir + + protoc { + artifact = "com.google.protobuf:protoc:3.19.4" + } + plugins { + id("grpc") { + artifact = "io.grpc:protoc-gen-grpc-java:1.56.1" + } + } + + generateProtoTasks { + ofSourceSet("main").forEach { + it.plugins { + id("grpc") + } + } + } +} + +afterEvaluate { + tasks { + getByName("generateProto").dependsOn(processResources) + } +} \ No newline at end of file diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumServiceImpl.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumServiceImpl.java new file mode 100644 index 0000000..1fcd922 --- /dev/null +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumServiceImpl.java @@ -0,0 +1,36 @@ +package ru.otus.protobuf; + +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"squid:S2142"}) +public class NumServiceImpl extends NumServiceGrpc.NumServiceImplBase { + + private static final Logger logger = LoggerFactory.getLogger(NumServiceImpl.class); + + private static final long NEXT_NUM_DELAY_MILLIS = 2_000; + + @Override + public void getNumbers(final GetNumsRequest request, final StreamObserver responseObserver) { + + logger.info("Received request: firstValue={}, lastValue={}", request.getFirstNum(), request.getLastNum()); + + try { + for (long i = request.getFirstNum() + 1; i <= request.getLastNum(); i++) { + GetNumResponse response = GetNumResponse.newBuilder().setNum(i).build(); + responseObserver.onNext(response); + + logger.info("Sent value: {}", i); + + Thread.sleep(NEXT_NUM_DELAY_MILLIS); + } + } catch (InterruptedException e) { + logger.error("Interrupted while streaming", e); + Thread.currentThread().interrupt(); + } + + responseObserver.onCompleted(); + logger.info("Stream completed"); + } +} diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java new file mode 100644 index 0000000..58c9d2a --- /dev/null +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java @@ -0,0 +1,64 @@ +package ru.otus.protobuf; + +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NumsClient { + private static final Logger logger = LoggerFactory.getLogger(NumsClient.class); + private static final String SERVER_HOST = "localhost"; + private static final int SERVER_PORT = 8190; + + private static final NumsClientStreamObserver streamObserver = new NumsClientStreamObserver(); + private static volatile long lastServerValue = 0L; + + private static final long NEXT_NUM_DELAY_MILLIS = 1_000; + + public static void main(String[] args) { + logger.info("Starting client"); + + startObservingServerValues(); + + long currentValue = 0; + for (int i = 0; i <= 50; i++) { + try { + currentValue = calculateCurrentValue(currentValue); + + logger.info("currentValue: {}", currentValue); + Thread.sleep(NEXT_NUM_DELAY_MILLIS); + + } catch (InterruptedException e) { + logger.error("Interrupted while running", e); + Thread.currentThread().interrupt(); + break; + } + } + } + + private static long calculateCurrentValue(final long currentValue) { + long serverValue = streamObserver.getLastValue(); + logger.info("serverValue: {}", currentValue); + + long nextVal = currentValue; + + if (serverValue != lastServerValue && serverValue > 0) { + lastServerValue = serverValue; + nextVal += serverValue + 1; + } else { + nextVal += 1; + } + + return nextVal; + } + + private static void startObservingServerValues() { + GetNumsRequest request = + GetNumsRequest.newBuilder().setFirstNum(0).setLastNum(30).build(); + + var channel = ManagedChannelBuilder.forAddress(SERVER_HOST, SERVER_PORT) + .usePlaintext() + .build(); + + NumServiceGrpc.newStub(channel).getNumbers(request, streamObserver); + } +} diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java new file mode 100644 index 0000000..9d384a8 --- /dev/null +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java @@ -0,0 +1,32 @@ +package ru.otus.protobuf; + +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NumsClientStreamObserver implements StreamObserver { + private static final Logger logger = LoggerFactory.getLogger(NumsClientStreamObserver.class); + private volatile long lastValue; + + @Override + public void onNext(GetNumResponse numberResponse) { + synchronized (this) { + lastValue = numberResponse.getNum(); + } + logger.info("new value: {}", lastValue); + } + + @Override + public void onError(Throwable throwable) { + logger.error("Error occurred: ", throwable); + } + + @Override + public void onCompleted() { + logger.info("request completed"); + } + + public long getLastValue() { + return lastValue; + } +} diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsServer.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsServer.java new file mode 100644 index 0000000..be41d3b --- /dev/null +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsServer.java @@ -0,0 +1,22 @@ +package ru.otus.protobuf; + +import io.grpc.ServerBuilder; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NumsServer { + private static final Logger logger = LoggerFactory.getLogger(NumsServer.class); + public static final int SERVER_PORT = 8190; + + public static void main(String[] args) throws IOException, InterruptedException { + + var server = ServerBuilder.forPort(SERVER_PORT) + .addService(new NumServiceImpl()) + .build() + .start(); + + logger.info("server waiting for client connections..."); + server.awaitTermination(); + } +} diff --git a/L34-grpc/src/main/proto/NumService.proto b/L34-grpc/src/main/proto/NumService.proto new file mode 100644 index 0000000..de4c2c6 --- /dev/null +++ b/L34-grpc/src/main/proto/NumService.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package ru.otus.protobuf; + +option java_multiple_files = true; + +message GetNumsRequest { + int64 firstNum = 1; + int64 lastNum = 2; +} + +message GetNumResponse { + int64 num = 1; +} + +service NumService { + rpc getNumbers(GetNumsRequest) returns (stream GetNumResponse); +} diff --git a/settings.gradle.kts b/settings.gradle.kts index f144627..0f30f9b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -18,6 +18,7 @@ include( "L31-executors", "L32-concurrentCollections:ConcurrentCollections", "L32-concurrentCollections:QueueDemo", + "L34-grpc", ) From dcaf68f105072638117d823b88e8239843a24730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D1=8C=D0=B5=D0=B2=20=D0=90?= =?UTF-8?q?=D0=BD=D0=B4=D1=80=D0=B5=D0=B9=20=D0=93=D0=B5=D0=BE=D1=80=D0=B3?= =?UTF-8?q?=D0=B8=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 3 Jun 2025 19:45:18 +0500 Subject: [PATCH 2/4] =?UTF-8?q?=D0=94=D0=97-34:=20grpc=20-=20done?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java index 58c9d2a..27300e9 100644 --- a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java @@ -22,7 +22,7 @@ public static void main(String[] args) { long currentValue = 0; for (int i = 0; i <= 50; i++) { try { - currentValue = calculateCurrentValue(currentValue); + currentValue = calculateNextValue(currentValue); logger.info("currentValue: {}", currentValue); Thread.sleep(NEXT_NUM_DELAY_MILLIS); @@ -35,7 +35,7 @@ public static void main(String[] args) { } } - private static long calculateCurrentValue(final long currentValue) { + private static long calculateNextValue(final long currentValue) { long serverValue = streamObserver.getLastValue(); logger.info("serverValue: {}", currentValue); From 4b3c0616c20486e065b354a25993cc24f1065e1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D1=8C=D0=B5=D0=B2=20=D0=90?= =?UTF-8?q?=D0=BD=D0=B4=D1=80=D0=B5=D0=B9=20=D0=93=D0=B5=D0=BE=D1=80=D0=B3?= =?UTF-8?q?=D0=B8=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 3 Jun 2025 19:49:02 +0500 Subject: [PATCH 3/4] =?UTF-8?q?=D0=94=D0=97-34:=20grpc=20-=20done?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java | 2 +- .../main/java/ru/otus/protobuf/NumsClientStreamObserver.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java index 27300e9..e69a22c 100644 --- a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java @@ -10,7 +10,7 @@ public class NumsClient { private static final int SERVER_PORT = 8190; private static final NumsClientStreamObserver streamObserver = new NumsClientStreamObserver(); - private static volatile long lastServerValue = 0L; + private static long lastServerValue = 0L; private static final long NEXT_NUM_DELAY_MILLIS = 1_000; diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java index 9d384a8..b04202a 100644 --- a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java @@ -6,7 +6,7 @@ public class NumsClientStreamObserver implements StreamObserver { private static final Logger logger = LoggerFactory.getLogger(NumsClientStreamObserver.class); - private volatile long lastValue; + private long lastValue; @Override public void onNext(GetNumResponse numberResponse) { From 50e56f1e2267ffaba0c1f27bace55930d4d72ccf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D1=8C=D0=B5=D0=B2=20=D0=90?= =?UTF-8?q?=D0=BD=D0=B4=D1=80=D0=B5=D0=B9=20=D0=93=D0=B5=D0=BE=D1=80=D0=B3?= =?UTF-8?q?=D0=B8=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 5 Jun 2025 23:03:50 +0500 Subject: [PATCH 4/4] =?UTF-8?q?=D0=94=D0=97-34:=20grpc=20-=20AtomicLong=20?= =?UTF-8?q?=D0=B8=20=D0=BD=D0=B5=D0=B1=D0=BE=D0=BB=D1=8C=D1=88=D0=BE=D0=B9?= =?UTF-8?q?=20=D1=80=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE=D1=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/ru/otus/protobuf/NumsClient.java | 14 ++++++-------- .../ru/otus/protobuf/NumsClientStreamObserver.java | 4 +--- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java index e69a22c..112ce8d 100644 --- a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java @@ -1,6 +1,7 @@ package ru.otus.protobuf; import io.grpc.ManagedChannelBuilder; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,7 +11,7 @@ public class NumsClient { private static final int SERVER_PORT = 8190; private static final NumsClientStreamObserver streamObserver = new NumsClientStreamObserver(); - private static long lastServerValue = 0L; + private static AtomicLong lastServerValue = new AtomicLong(); private static final long NEXT_NUM_DELAY_MILLIS = 1_000; @@ -39,16 +40,13 @@ private static long calculateNextValue(final long currentValue) { long serverValue = streamObserver.getLastValue(); logger.info("serverValue: {}", currentValue); - long nextVal = currentValue; + long previousServerValue = lastServerValue.getAndSet(serverValue); - if (serverValue != lastServerValue && serverValue > 0) { - lastServerValue = serverValue; - nextVal += serverValue + 1; + if (serverValue != previousServerValue) { + return currentValue + serverValue + 1; } else { - nextVal += 1; + return currentValue + 1; } - - return nextVal; } private static void startObservingServerValues() { diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java index b04202a..16636a4 100644 --- a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java @@ -10,9 +10,7 @@ public class NumsClientStreamObserver implements StreamObserver @Override public void onNext(GetNumResponse numberResponse) { - synchronized (this) { - lastValue = numberResponse.getNum(); - } + lastValue = numberResponse.getNum(); logger.info("new value: {}", lastValue); }