Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}

ext {
rlibVersion = "10.0.alpha"
rlibVersion = "10.0.alpha3"
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
rootProject.version = "10.0.alpha2"
rootProject.version = "10.0.alpha3"
group = 'javasabr.rlib'

allprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javasabr.rlib.network.client.ClientNetwork;
import javasabr.rlib.network.impl.DefaultBufferAllocator;
import javasabr.rlib.network.impl.StringDataConnection;
import javasabr.rlib.network.packet.impl.StringReadableNetworkPacket;
import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket;
import javasabr.rlib.network.server.ServerNetwork;
import lombok.CustomLog;
Expand Down Expand Up @@ -77,7 +78,7 @@ void connectAndSendMessages(
int delay = random.nextInt(MAX_SEND_DELAY);
ScheduledFuture<?> schedule = executor.schedule(
() -> {
StringWritableNetworkPacket message = newMessage(10, 10240);
var message = newMessage(10, 10240);
connection.send(message);
}, delay, TimeUnit.MILLISECONDS);
tasks.add(schedule);
Expand Down Expand Up @@ -120,7 +121,7 @@ void testServerWithMultiplyClients() {

var serverConfig = SimpleServerNetworkConfig
.builder()
.threadGroupSize(10)
.threadGroupMaxSize(10)
.writeBufferSize(1024)
.readBufferSize(1024)
.pendingBufferSize(2048)
Expand All @@ -141,10 +142,11 @@ void testServerWithMultiplyClients() {

serverNetwork.onAccept(accepted -> accepted
.onReceive((connection, packet) -> {
StringReadableNetworkPacket<StringDataConnection> receivedPacket = (StringReadableNetworkPacket<StringDataConnection>) packet;
statistics
.receivedClientPackersPerSecond()
.accumulate(1);
connection.send(new StringWritableNetworkPacket("Echo: " + packet.data()));
connection.send(new StringWritableNetworkPacket<>("Echo: " + receivedPacket.data()));
statistics
.sentEchoPackersPerSecond()
.accumulate(1);
Expand Down Expand Up @@ -200,7 +202,9 @@ private static void initReceivedMessagesTracker(
}, 1, 1, TimeUnit.SECONDS);
}

private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) {
return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength));
private static StringWritableNetworkPacket<StringDataConnection> newMessage(
int minMessageLength,
int maxMessageLength) {
return new StringWritableNetworkPacket<>(StringUtils.generate(minMessageLength, maxMessageLength));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import javasabr.rlib.network.ServerNetworkConfig.SimpleServerNetworkConfig;
import javasabr.rlib.network.client.ClientNetwork;
import javasabr.rlib.network.impl.DefaultBufferAllocator;
import javasabr.rlib.network.impl.StringDataConnection;
import javasabr.rlib.network.impl.StringDataSslConnection;
import javasabr.rlib.network.packet.impl.AbstractSslNetworkPacketReader;
import javasabr.rlib.network.packet.impl.AbstractSslNetworkPacketWriter;
import javasabr.rlib.network.packet.impl.StringReadableNetworkPacket;
import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket;
import javasabr.rlib.network.server.ServerNetwork;
import javasabr.rlib.network.util.NetworkUtils;
Expand Down Expand Up @@ -83,7 +81,7 @@ void connectAndSendMessages(
int delay = random.nextInt(MAX_SEND_DELAY);
ScheduledFuture<?> schedule = executor.schedule(
() -> {
StringWritableNetworkPacket message = newMessage(10, 10240); // 10240
var message = newMessage(10, 10240); // 10240
connection.send(message);
}, delay, TimeUnit.MILLISECONDS);
tasks.add(schedule);
Expand Down Expand Up @@ -128,7 +126,7 @@ void testServerWithMultiplyClients() {

var serverConfig = SimpleServerNetworkConfig
.builder()
.threadGroupSize(10)
.threadGroupMaxSize(10)
.writeBufferSize(1024)
.readBufferSize(1024)
.pendingBufferSize(2048)
Expand All @@ -153,10 +151,11 @@ void testServerWithMultiplyClients() {

serverNetwork.onAccept(accepted -> accepted
.onReceive((connection, packet) -> {
StringReadableNetworkPacket<StringDataSslConnection> receivedPacket = (StringReadableNetworkPacket<StringDataSslConnection>) packet;
statistics
.receivedClientPackersPerSecond()
.accumulate(1);
connection.send(new StringWritableNetworkPacket("Echo: " + packet.data()));
connection.send(new StringWritableNetworkPacket<>("Echo: " + receivedPacket.data()));
statistics
.sentEchoPackersPerSecond()
.accumulate(1);
Expand Down Expand Up @@ -215,7 +214,9 @@ private static void initReceivedMessagesTracker(
}, 1, 1, TimeUnit.SECONDS);
}

private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) {
return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength));
private static StringWritableNetworkPacket<StringDataSslConnection> newMessage(
int minMessageLength,
int maxMessageLength) {
return new StringWritableNetworkPacket<>(StringUtils.generate(minMessageLength, maxMessageLength));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package javasabr.rlib.network;

import java.nio.ByteBuffer;
import org.jspecify.annotations.Nullable;

/**
* The interface to implement a buffer allocator for network things.
Expand Down
34 changes: 25 additions & 9 deletions rlib-network/src/main/java/javasabr/rlib/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
*
* @author JavaSaBr
*/
public interface Connection<R extends ReadableNetworkPacket, W extends WritableNetworkPacket> {

record ReceivedPacketEvent<C extends Connection<?, ?>, R extends ReadableNetworkPacket>(
C connection, R packet) {
public interface Connection<C extends Connection<C>> {

record ReceivedPacketEvent<C, R>(C connection, R packet) {
@Override
public String toString() {
return "[" + connection + "|" + packet + ']';
Expand Down Expand Up @@ -45,27 +43,45 @@ public String toString() {
/**
* Send a packet to connection's owner.
*/
void send(W packet);
void send(WritableNetworkPacket<C> packet);

/**
* Send a packet to connection's owner with async feedback of this sending.
*
* @return the async result with true if the packet was sent or false if sending was failed.
*/
CompletableFuture<Boolean> sendWithFeedback(W packet);
CompletableFuture<Boolean> sendWithFeedback(WritableNetworkPacket<C> packet);

/**
* Register a consumer to handle received packets.
*/
void onReceive(BiConsumer<? super Connection<R, W>, ? super R> consumer);
void onReceive(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer);

/**
* Get a stream of received packet events.
*/
Flux<ReceivedPacketEvent<? extends Connection<R, W>, ? extends R>> receivedEvents();
Flux<ReceivedPacketEvent<C, ? extends ReadableNetworkPacket<C>>> receivedEvents();

/**
* Get a stream of received packet events with expected packet type.
*/
default <R extends ReadableNetworkPacket<C>> Flux<ReceivedPacketEvent<C, R>> receivedEvents(Class<R> packetType) {
return receivedEvents()
.filter(event -> packetType.isInstance(event.packet()))
.map(event -> (ReceivedPacketEvent<C, R>) event);
}

/**
* Get a stream of received packets.
*/
Flux<? extends R> receivedPackets();
Flux<? extends ReadableNetworkPacket<C>> receivedPackets();

/**
* Get a stream of received packets with expected type.
*/
default <R extends ReadableNetworkPacket<C>> Flux<R> receivedPackets(Class<R> packetType) {
return receivedPackets()
.filter(packetType::isInstance)
.map(networkPacket -> (R) networkPacket);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package javasabr.rlib.network;

import java.util.concurrent.ScheduledExecutorService;

/**
* The interface to implement an asynchronous network.
*
* @author JavaSaBr
*/
public interface Network<C extends Connection<?, ?>> {
public interface Network<C extends Connection<C>> {

ScheduledExecutorService scheduledExecutor();

NetworkConfig config();

/**
* Shutdown this network.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class SimpleNetworkConfig implements NetworkConfig {
private int pendingBufferSize = 4096;
@Builder.Default
private int writeBufferSize = 2048;
@Builder.Default
private int retryDelayInMs = 1000;
}

NetworkConfig DEFAULT_CLIENT = new NetworkConfig() {
Expand All @@ -46,34 +48,53 @@ default String threadGroupName() {
}

/**
* Get size of buffer with used to collect received data from network.
* Get a group name of scheduling network threads.
*/
default String scheduledThreadGroupName() {
return "ScheduledNetworkThread";
}

/**
* Get size of buffer with will be used to collect received data from network.
*/
default int readBufferSize() {
return 2048;
}

/**
* Get size of buffer with pending data. Pending buffer allows to construct a packet with bigger data than
* Gets size of buffer with pending reading data. Pending buffer allows to construct a packet with bigger data part than
* {@link #readBufferSize()}. It should be at least 2x of {@link #readBufferSize()}
*
* @return the pending buffer's size.
*/
default int pendingBufferSize() {
return readBufferSize() * 2;
}

/**
* Get size of buffer which used to serialize packets to bytes.
* Gets a size of buffer which will be used for packet serialization.
*/
default int writeBufferSize() {
return 2048;
}

/**
* Gets a timeout for retry read/write operation.
*/
default int retryDelayInMs() {
return 1000;
}

/**
* Gets a max allowed empty reads from socket channel before closing a connection.
*/
default int maxEmptyReadsBeforeClose() {
return 3;
}

default ByteOrder byteOrder() {
return ByteOrder.BIG_ENDIAN;
}

default boolean isDirectByteBuffer() {
default boolean useDirectByteBuffer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
@UtilityClass
public final class NetworkFactory {

public static <C extends UnsafeConnection<?, ?>> ClientNetwork<C> clientNetwork(
public static <C extends UnsafeConnection<C>> ClientNetwork<C> clientNetwork(
NetworkConfig networkConfig,
BiFunction<Network<C>, AsynchronousSocketChannel, C> channelToConnection) {
return new DefaultClientNetwork<>(networkConfig, channelToConnection);
}

public static <C extends UnsafeConnection<?, ?>> ServerNetwork<C> serverNetwork(
public static <C extends UnsafeConnection<C>> ServerNetwork<C> serverNetwork(
ServerNetworkConfig networkConfig,
BiFunction<Network<C>, AsynchronousSocketChannel, C> channelToConnection) {
return new DefaultServerNetwork<>(networkConfig, channelToConnection);
Expand Down Expand Up @@ -65,7 +65,7 @@ public static ClientNetwork<StringDataConnection> stringDataClientNetwork(
* Create id based packet default asynchronous client network.
*/
public static ClientNetwork<DefaultConnection> defaultClientNetwork(
ReadableNetworkPacketRegistry<DefaultReadableNetworkPacket> packetRegistry) {
ReadableNetworkPacketRegistry<DefaultReadableNetworkPacket<DefaultConnection>, DefaultConnection> packetRegistry) {
return defaultClientNetwork(
NetworkConfig.DEFAULT_CLIENT,
new DefaultBufferAllocator(NetworkConfig.DEFAULT_CLIENT),
Expand All @@ -78,8 +78,8 @@ public static ClientNetwork<DefaultConnection> defaultClientNetwork(
public static ClientNetwork<DefaultConnection> defaultClientNetwork(
NetworkConfig networkConfig,
BufferAllocator bufferAllocator,
ReadableNetworkPacketRegistry<DefaultReadableNetworkPacket> packetRegistry) {
return clientNetwork(
ReadableNetworkPacketRegistry<DefaultReadableNetworkPacket<DefaultConnection>, DefaultConnection> packetRegistry) {
return NetworkFactory.clientNetwork(
networkConfig,
(network, channel) -> new DefaultConnection(network, channel, bufferAllocator, packetRegistry));
}
Expand Down Expand Up @@ -139,7 +139,7 @@ public static ServerNetwork<StringDataSslConnection> stringDataSslServerNetwork(
* Create id based packet default asynchronous server network.
*/
public static ServerNetwork<DefaultConnection> defaultServerNetwork(
ReadableNetworkPacketRegistry<DefaultReadableNetworkPacket> packetRegistry) {
ReadableNetworkPacketRegistry<DefaultReadableNetworkPacket<DefaultConnection>, DefaultConnection> packetRegistry) {
return defaultServerNetwork(
ServerNetworkConfig.DEFAULT_SERVER,
new DefaultBufferAllocator(ServerNetworkConfig.DEFAULT_SERVER),
Expand All @@ -152,7 +152,7 @@ public static ServerNetwork<DefaultConnection> defaultServerNetwork(
public static ServerNetwork<DefaultConnection> defaultServerNetwork(
ServerNetworkConfig networkConfig,
BufferAllocator bufferAllocator,
ReadableNetworkPacketRegistry<DefaultReadableNetworkPacket> packetRegistry) {
ReadableNetworkPacketRegistry<DefaultReadableNetworkPacket<DefaultConnection>, DefaultConnection> packetRegistry) {
return serverNetwork(
networkConfig,
(network, channel) -> new DefaultConnection(network, channel, bufferAllocator, packetRegistry));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,27 @@ class SimpleServerNetworkConfig implements ServerNetworkConfig {
@Builder.Default
private int writeBufferSize = 2048;
@Builder.Default
private int threadGroupSize = 1;
private int retryDelayInMs = 1000;
@Builder.Default
private int threadGroupMinSize = 1;
@Builder.Default
private int threadGroupMaxSize = 1;
@Builder.Default
private int scheduledThreadGroupSize = 1;
@Builder.Default
private int threadPriority = Thread.NORM_PRIORITY;
}

ServerNetworkConfig DEFAULT_SERVER = new ServerNetworkConfig() {

@Override
public int threadGroupMinSize() {
return 1;
public String threadGroupName() {
return "ServerNetworkThread";
}

@Override
public String threadGroupName() {
return "ServerNetworkThread";
public String scheduledThreadGroupName() {
return "ServerScheduledNetworkThread";
}
};

Expand All @@ -64,6 +70,13 @@ default int threadGroupMaxSize() {
return threadGroupMinSize();
}

/**
* Get a size of network scheduled thread executor.
*/
default int scheduledThreadGroupSize() {
return 1;
}

/**
* Get a thread constructor which should be used to create network threads.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package javasabr.rlib.network;

import javasabr.rlib.network.packet.ReadableNetworkPacket;
import javasabr.rlib.network.packet.WritableNetworkPacket;
public interface UnsafeConnection<C extends UnsafeConnection<C>> extends Connection<C> {

public interface UnsafeConnection<R extends ReadableNetworkPacket, W extends WritableNetworkPacket>
extends Connection<R, W> {
Network<?> network();

void onConnected();
}
Loading
Loading