diff --git a/L34-grpc/build.gradle.kts b/L34-grpc/build.gradle.kts new file mode 100644 index 0000000..1ac581e --- /dev/null +++ b/L34-grpc/build.gradle.kts @@ -0,0 +1,87 @@ +import com.google.protobuf.gradle.generateProtoTasks +import com.google.protobuf.gradle.ofSourceSet +import com.google.protobuf.gradle.id +import com.google.protobuf.gradle.plugins +import com.google.protobuf.gradle.protobuf +import com.google.protobuf.gradle.protoc + +plugins { + id("idea") + id("com.google.protobuf") +} + +val errorProneAnnotations: String by project +val tomcatAnnotationsApi: String by project + +dependencies { + implementation("io.grpc:grpc-netty") + implementation("io.grpc:grpc-protobuf") + implementation("io.grpc:grpc-stub") + implementation("com.google.protobuf:protobuf-java") + implementation("com.google.errorprone:error_prone_annotations:$errorProneAnnotations") + + implementation("org.apache.tomcat:annotations-api:$tomcatAnnotationsApi") + + implementation ("ch.qos.logback:logback-classic") +} + +val protoSrcDir = "$projectDir/build/generated" + +idea { + module { + sourceDirs = sourceDirs.plus(file(protoSrcDir)) + } +} + +sourceSets { + main { + proto { + srcDir(protoSrcDir) + } + } +} + +protobuf { + generatedFilesBaseDir = protoSrcDir + + protoc { + artifact = "com.google.protobuf:protoc:3.19.4" + } + + generateProtoTasks { + ofSourceSet("main") + } +} + +afterEvaluate { + tasks { + getByName("generateProto").dependsOn(processResources) + } +} + +protobuf { + generatedFilesBaseDir = protoSrcDir + + protoc { + artifact = "com.google.protobuf:protoc:3.19.4" + } + plugins { + id("grpc") { + artifact = "io.grpc:protoc-gen-grpc-java:1.56.1" + } + } + + generateProtoTasks { + ofSourceSet("main").forEach { + it.plugins { + id("grpc") + } + } + } +} + +afterEvaluate { + tasks { + getByName("generateProto").dependsOn(processResources) + } +} \ No newline at end of file diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumServiceImpl.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumServiceImpl.java new file mode 100644 index 0000000..1fcd922 --- /dev/null +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumServiceImpl.java @@ -0,0 +1,36 @@ +package ru.otus.protobuf; + +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"squid:S2142"}) +public class NumServiceImpl extends NumServiceGrpc.NumServiceImplBase { + + private static final Logger logger = LoggerFactory.getLogger(NumServiceImpl.class); + + private static final long NEXT_NUM_DELAY_MILLIS = 2_000; + + @Override + public void getNumbers(final GetNumsRequest request, final StreamObserver responseObserver) { + + logger.info("Received request: firstValue={}, lastValue={}", request.getFirstNum(), request.getLastNum()); + + try { + for (long i = request.getFirstNum() + 1; i <= request.getLastNum(); i++) { + GetNumResponse response = GetNumResponse.newBuilder().setNum(i).build(); + responseObserver.onNext(response); + + logger.info("Sent value: {}", i); + + Thread.sleep(NEXT_NUM_DELAY_MILLIS); + } + } catch (InterruptedException e) { + logger.error("Interrupted while streaming", e); + Thread.currentThread().interrupt(); + } + + responseObserver.onCompleted(); + logger.info("Stream completed"); + } +} diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java new file mode 100644 index 0000000..112ce8d --- /dev/null +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClient.java @@ -0,0 +1,62 @@ +package ru.otus.protobuf; + +import io.grpc.ManagedChannelBuilder; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NumsClient { + private static final Logger logger = LoggerFactory.getLogger(NumsClient.class); + private static final String SERVER_HOST = "localhost"; + private static final int SERVER_PORT = 8190; + + private static final NumsClientStreamObserver streamObserver = new NumsClientStreamObserver(); + private static AtomicLong lastServerValue = new AtomicLong(); + + private static final long NEXT_NUM_DELAY_MILLIS = 1_000; + + public static void main(String[] args) { + logger.info("Starting client"); + + startObservingServerValues(); + + long currentValue = 0; + for (int i = 0; i <= 50; i++) { + try { + currentValue = calculateNextValue(currentValue); + + logger.info("currentValue: {}", currentValue); + Thread.sleep(NEXT_NUM_DELAY_MILLIS); + + } catch (InterruptedException e) { + logger.error("Interrupted while running", e); + Thread.currentThread().interrupt(); + break; + } + } + } + + private static long calculateNextValue(final long currentValue) { + long serverValue = streamObserver.getLastValue(); + logger.info("serverValue: {}", currentValue); + + long previousServerValue = lastServerValue.getAndSet(serverValue); + + if (serverValue != previousServerValue) { + return currentValue + serverValue + 1; + } else { + return currentValue + 1; + } + } + + private static void startObservingServerValues() { + GetNumsRequest request = + GetNumsRequest.newBuilder().setFirstNum(0).setLastNum(30).build(); + + var channel = ManagedChannelBuilder.forAddress(SERVER_HOST, SERVER_PORT) + .usePlaintext() + .build(); + + NumServiceGrpc.newStub(channel).getNumbers(request, streamObserver); + } +} diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java new file mode 100644 index 0000000..16636a4 --- /dev/null +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsClientStreamObserver.java @@ -0,0 +1,30 @@ +package ru.otus.protobuf; + +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NumsClientStreamObserver implements StreamObserver { + private static final Logger logger = LoggerFactory.getLogger(NumsClientStreamObserver.class); + private long lastValue; + + @Override + public void onNext(GetNumResponse numberResponse) { + lastValue = numberResponse.getNum(); + logger.info("new value: {}", lastValue); + } + + @Override + public void onError(Throwable throwable) { + logger.error("Error occurred: ", throwable); + } + + @Override + public void onCompleted() { + logger.info("request completed"); + } + + public long getLastValue() { + return lastValue; + } +} diff --git a/L34-grpc/src/main/java/ru/otus/protobuf/NumsServer.java b/L34-grpc/src/main/java/ru/otus/protobuf/NumsServer.java new file mode 100644 index 0000000..be41d3b --- /dev/null +++ b/L34-grpc/src/main/java/ru/otus/protobuf/NumsServer.java @@ -0,0 +1,22 @@ +package ru.otus.protobuf; + +import io.grpc.ServerBuilder; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NumsServer { + private static final Logger logger = LoggerFactory.getLogger(NumsServer.class); + public static final int SERVER_PORT = 8190; + + public static void main(String[] args) throws IOException, InterruptedException { + + var server = ServerBuilder.forPort(SERVER_PORT) + .addService(new NumServiceImpl()) + .build() + .start(); + + logger.info("server waiting for client connections..."); + server.awaitTermination(); + } +} diff --git a/L34-grpc/src/main/proto/NumService.proto b/L34-grpc/src/main/proto/NumService.proto new file mode 100644 index 0000000..de4c2c6 --- /dev/null +++ b/L34-grpc/src/main/proto/NumService.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package ru.otus.protobuf; + +option java_multiple_files = true; + +message GetNumsRequest { + int64 firstNum = 1; + int64 lastNum = 2; +} + +message GetNumResponse { + int64 num = 1; +} + +service NumService { + rpc getNumbers(GetNumsRequest) returns (stream GetNumResponse); +} diff --git a/settings.gradle.kts b/settings.gradle.kts index f144627..0f30f9b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -18,6 +18,7 @@ include( "L31-executors", "L32-concurrentCollections:ConcurrentCollections", "L32-concurrentCollections:QueueDemo", + "L34-grpc", )