Skip to content

Commit 7fbeca4

Browse files
committed
update to the latest RSocket-Java
1 parent 5948ac6 commit 7fbeca4

File tree

5 files changed

+55
-33
lines changed

5 files changed

+55
-33
lines changed

rsocket-rpc-core/src/main/java/io/rsocket/rpc/AbstractRSocketService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.rsocket.AbstractRSocket;
44
import io.rsocket.Payload;
5+
import org.reactivestreams.Publisher;
56
import reactor.core.publisher.Flux;
67

78
public abstract class AbstractRSocketService extends AbstractRSocket implements RSocketRpcService {
@@ -11,7 +12,7 @@ public String getService() {
1112
}
1213

1314
@Override
14-
public Flux<Payload> requestChannel(Payload payload, Flux<Payload> publisher) {
15+
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
1516
return Flux.error(new UnsupportedOperationException("Request-Channel not implemented."));
1617
}
1718

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
package io.rsocket.rpc;
22

3-
import io.rsocket.Payload;
4-
import io.rsocket.RSocket;
5-
import reactor.core.publisher.Flux;
3+
import io.rsocket.ResponderRSocket;
64

7-
public interface RSocketRpcService extends RSocket {
5+
public interface RSocketRpcService extends ResponderRSocket {
86
String getService();
9-
10-
Flux<Payload> requestChannel(Payload payload, Flux<Payload> publisher);
117
}

rsocket-rpc-core/src/main/java/io/rsocket/rpc/rsocket/RequestHandlingRSocket.java

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.netty.util.ReferenceCountUtil;
55
import io.rsocket.AbstractRSocket;
66
import io.rsocket.Payload;
7-
import io.rsocket.internal.SwitchTransformFlux;
7+
import io.rsocket.ResponderRSocket;
88
import io.rsocket.rpc.RSocketRpcService;
99
import io.rsocket.rpc.exception.ServiceNotFound;
1010
import io.rsocket.rpc.frames.Metadata;
@@ -14,7 +14,7 @@
1414
import reactor.core.publisher.Flux;
1515
import reactor.core.publisher.Mono;
1616

17-
public class RequestHandlingRSocket extends AbstractRSocket {
17+
public class RequestHandlingRSocket extends AbstractRSocket implements ResponderRSocket {
1818
private final ConcurrentMap<String, RSocketRpcService> registeredServices =
1919
new ConcurrentHashMap<>();
2020

@@ -102,25 +102,50 @@ public Flux<Payload> requestStream(Payload payload) {
102102

103103
@Override
104104
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
105-
return new SwitchTransformFlux<>(
106-
payloads,
107-
(payload, flux) -> {
108-
try {
109-
ByteBuf metadata = payload.sliceMetadata();
110-
String service = Metadata.getService(metadata);
111-
112-
RSocketRpcService rsocketService = registeredServices.get(service);
113-
114-
if (rsocketService == null) {
115-
ReferenceCountUtil.safeRelease(payload);
116-
return Flux.error(new ServiceNotFound(service));
117-
}
118-
119-
return rsocketService.requestChannel(payload, flux);
120-
} catch (Throwable t) {
121-
ReferenceCountUtil.safeRelease(payload);
122-
return Flux.error(t);
123-
}
124-
});
105+
return Flux.from(payloads)
106+
.switchOnFirst(
107+
(firstSignal, flux) -> {
108+
if (firstSignal.hasValue()) {
109+
Payload payload = firstSignal.get();
110+
try {
111+
ByteBuf metadata = payload.sliceMetadata();
112+
String service = Metadata.getService(metadata);
113+
114+
RSocketRpcService rsocketService = registeredServices.get(service);
115+
116+
if (rsocketService == null) {
117+
ReferenceCountUtil.safeRelease(payload);
118+
return Flux.error(new ServiceNotFound(service));
119+
}
120+
121+
return rsocketService.requestChannel(payload, flux);
122+
} catch (Throwable t) {
123+
ReferenceCountUtil.safeRelease(payload);
124+
return Flux.error(t);
125+
}
126+
}
127+
128+
return flux;
129+
});
130+
}
131+
132+
@Override
133+
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
134+
try {
135+
ByteBuf metadata = payload.sliceMetadata();
136+
String service = Metadata.getService(metadata);
137+
138+
RSocketRpcService rsocketService = registeredServices.get(service);
139+
140+
if (rsocketService == null) {
141+
ReferenceCountUtil.safeRelease(payload);
142+
return Flux.error(new ServiceNotFound(service));
143+
}
144+
145+
return rsocketService.requestChannel(payload, payloads);
146+
} catch (Throwable t) {
147+
ReferenceCountUtil.safeRelease(payload);
148+
return Flux.error(t);
149+
}
125150
}
126151
}

rsocket-rpc-protobuf/src/java_plugin/cpp/blocking_java_generator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ static void PrintServer(const ServiceDescriptor* service,
924924
p->Print(
925925
*vars,
926926
"@$Override$\n"
927-
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Flux$<$Payload$> publisher) {\n");
927+
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Publisher$<$Payload$> publisher) {\n");
928928
p->Indent();
929929
if (request_channel.empty()) {
930930
p->Print(
@@ -956,7 +956,7 @@ static void PrintServer(const ServiceDescriptor* service,
956956
p->Indent();
957957
p->Print(
958958
*vars,
959-
"publisher.map(deserializer($input_type$.parser()));\n");
959+
"$Flux$.from(publisher).map(deserializer($input_type$.parser()));\n");
960960
p->Outdent();
961961
if (method->server_streaming()) {
962962
p->Print(

rsocket-rpc-protobuf/src/java_plugin/cpp/java_generator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,7 +1311,7 @@ static void PrintServer(const ServiceDescriptor* service,
13111311
p->Print(
13121312
*vars,
13131313
"@$Override$\n"
1314-
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Flux$<$Payload$> publisher) {\n");
1314+
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Publisher$<$Payload$> publisher) {\n");
13151315
p->Indent();
13161316
if (request_channel.empty()) {
13171317
p->Print(
@@ -1344,7 +1344,7 @@ static void PrintServer(const ServiceDescriptor* service,
13441344
p->Indent();
13451345
p->Print(
13461346
*vars,
1347-
"publisher.map(deserializer($input_type$.parser()));\n");
1347+
"$Flux$.from(publisher).map(deserializer($input_type$.parser()));\n");
13481348
p->Outdent();
13491349
if (method->server_streaming()) {
13501350
p->Print(

0 commit comments

Comments
 (0)