Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using rpcx.net.Client.Generator;
using rpcx.net.Shared;
using rpcx.net.Shared.Codecs;
using rpcx.net.Shared.Protocol;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -95,17 +94,19 @@ public bool Connect(string network, string address) {
return true;
}

public async Task<TReply> Go<TArgs, TReply>(string servicePath, string serviceMethod, TArgs args, CancellationToken cancellationToken = default) {
public async Task<TReply> Go<TArgs, TReply>(IContext ctx, string servicePath, string serviceMethod, TArgs args, CancellationToken cancellationToken = default) {
var header = Header.NewRequest(_option.Types);

var msg = new Message(header) {
ServicePath = servicePath,
ServiceMethod = serviceMethod,
Metadata = args is WithMetadata a ? a._metadata : null,
Context = ctx,
Payload = GetSerializer(header.SerializeType).Serialize(args),
};

msg.Header.Seq = IdGen.Next();
var tcs = new TaskCompletionSource(typeof(TReply));
// 附加原始调用参数数据
var tcs = new TaskCompletionSource(typeof(TReply), msg);
_pending.TryAdd(msg.Header.Seq, tcs);
_ = _chan.WriteAndFlushAsync(msg);
return (TReply)await tcs.Task.ConfigureAwait(false);
Expand Down Expand Up @@ -139,12 +140,34 @@ protected override void ChannelRead0(IChannelHandlerContext ctx, Message msg) {
tcs.TrySetResult(msg.Payload);
} else {
var obj = GetSerializer(msg.Header.SerializeType).Deserialize(tcs.ResultType, msg.Payload);
if (obj is WithMetadata o)
o._metadata = msg.Metadata;

// 使用附加数据处理
//if (obj is WithMetadata o)
// o._metadata = msg.Metadata;

var msgCtx = msg.Context;
if (msgCtx == null) {
return;
}
var tcsMsg = tcs.Message;
if (tcsMsg == null) {
return;
}
var tcsCtx = tcsMsg.Context;
if (tcsCtx == null) {
return;
}
while (msgCtx != null) {
var key = msgCtx.Key();
tcsCtx.SetValue(key, msgCtx.Value(key));
msgCtx = msgCtx.Parent();
}

tcs.TrySetResult(obj);
}
}
}

#endregion
}
}
9 changes: 4 additions & 5 deletions Client/IRPCClient.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
using DotNetty.Transport.Channels;
using System;
using System.Net;
using System.Threading;
using rpcx.net.Shared.Protocol;
using System.Threading.Tasks;
using rpcx.net.Shared;

namespace rpcx.net.Client
{
public interface IRPCClient
namespace rpcx.net.Client {
public interface IRPCClient
{
bool Connect(string network, string address);
Task<TReply> Go<TArgs, TReply>(string servicePath, string serviceMethod, TArgs args, CancellationToken cancellationToken = default);
Task<TReply> Go<TArgs, TReply>(IContext ctx, string servicePath, string serviceMethod, TArgs args, CancellationToken cancellationToken = default);
Task SendRaw(Message message, CancellationToken cancellationToken = default);
void Close();

Expand Down
16 changes: 9 additions & 7 deletions Client/TaskCompletionSource.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
using System;
using rpcx.net.Shared.Protocol;
using System;
using System.Threading.Tasks;

namespace rpcx.net.Client
{
public class TaskCompletionSource : TaskCompletionSource<object>
{
namespace rpcx.net.Client {
public class TaskCompletionSource : TaskCompletionSource<object> {
public Type ResultType { get; }

public TaskCompletionSource(Type resultType) : this()
{
// 增加原始参数数据
public Message Message { get; }

public TaskCompletionSource(Type resultType, Message message) : this() {
Message = message;
ResultType = resultType;
}

Expand Down
24 changes: 0 additions & 24 deletions Client/WithMetadata.cs

This file was deleted.

34 changes: 16 additions & 18 deletions Shared/Codecs/RpcxMessageDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@
using System.Text;
using static rpcx.net.Shared.Utils;

namespace rpcx.net.Shared.Codecs
{
public class RpcxMessageDecoder : LengthFieldBasedFrameDecoder
{
namespace rpcx.net.Shared.Codecs {
public class RpcxMessageDecoder : LengthFieldBasedFrameDecoder {
public RpcxMessageDecoder(int maxFrameLength = 10240) :
base(maxFrameLength, 12, 4) { }

protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
if (base.Decode(context, input) is IByteBuffer buf)
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) {
if (base.Decode(context, input) is IByteBuffer buf) {
int nLen;

var byHeader = new byte[16];
Expand All @@ -34,34 +30,36 @@ protected override void Decode(IChannelHandlerContext context, IByteBuffer input
Dictionary<string, string> metadata = null;
if (0 != (nLen = buf.ReadInt()))
metadata = DecodeMetadata(buf, nLen, Encoding.UTF8);

byte[] payload = null;
if (0 != (nLen = buf.ReadInt()))
{
if (0 != (nLen = buf.ReadInt())) {
payload = new byte[nLen];
buf.ReadBytes(payload, 0, nLen);
var compressor = GetCompressor(header.CompressType);
if (compressor != null)
payload = compressor.Unzip(payload);
}

output.Add(new Message(header) {
// 处理 Message
var msg = new Message(header) {
ServicePath = servicePath,
ServiceMethod = serviceMethod,
Metadata = metadata,
Payload = payload,
});
};
// todo: 现在只处理 ResMetaDataKey, 以后可能会出现其他的上下文处理
if (metadata != null) {
msg.Context = Context.WithValue(msg.Context, Context.ResMetaDataKey, metadata);
};
output.Add(msg);
}
}

protected Dictionary<string, string> DecodeMetadata(IByteBuffer buf, int lenght, Encoding encoding)
{
protected Dictionary<string, string> DecodeMetadata(IByteBuffer buf, int lenght, Encoding encoding) {
var res = new Dictionary<string, string>(10);
var endIdx = buf.ReaderIndex + lenght;
int len;
string k, v;
while(endIdx > buf.ReaderIndex)
{
while (endIdx > buf.ReaderIndex) {
len = buf.ReadInt();
k = buf.ReadString(len, encoding);
len = buf.ReadInt();
Expand Down
26 changes: 13 additions & 13 deletions Shared/Codecs/RpcxMessageEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@
using System.Text;
using static rpcx.net.Shared.Utils;

namespace rpcx.net.Shared.Codecs
{
public class RpcxMessageEncoder : MessageToByteEncoder<Message>
{
protected override void Encode(IChannelHandlerContext context, Message message, IByteBuffer output)
{
namespace rpcx.net.Shared.Codecs {
public class RpcxMessageEncoder : MessageToByteEncoder<Message> {
protected override void Encode(IChannelHandlerContext context, Message message, IByteBuffer output) {
var byHeader = message.Header.GetBytes();
var byServicePath = Encoding.UTF8.GetBytes(message.ServicePath);
var byServiceMethod = Encoding.UTF8.GetBytes(message.ServiceMethod);
//var byMetadata = message.Metadata.GetBytes();
var bufMetadata = context.Allocator.Buffer();
var nLenMetadata = EncodeMetadata(bufMetadata, message.Metadata, Encoding.UTF8);

var nLenMetadata = EncodeMetadata(bufMetadata, message.Context, Encoding.UTF8);

var compressor = GetCompressor(message.Header.CompressType);
var byPayload = compressor is null ? message.Payload : compressor.Zip(message.Payload);
Expand All @@ -37,13 +33,17 @@ protected override void Encode(IChannelHandlerContext context, Message message,
output.WriteBytes(byPayload);
}

protected int EncodeMetadata(IByteBuffer buf, Dictionary<string, string> metadata, Encoding encoding)
{
if (metadata is null) return 0;
protected int EncodeMetadata(IByteBuffer buf, IContext ctx, Encoding encoding) {
if (ctx == null) { return 0; }

// todo: 如果ReqMetaDataKey对应的类型不是IDictionary<string, string>, 应当怎样处理
var metadata = ctx.Value(Context.ReqMetaDataKey) as IDictionary<string, string>;
if (metadata == null) { return 0; }

var startIdx = buf.WriterIndex;

byte[] k, v;
foreach(var kv in metadata)
{
foreach (var kv in metadata) {
k = encoding.GetBytes(kv.Key);
v = encoding.GetBytes(kv.Value);
buf.WriteInt(k.Length);
Expand Down
26 changes: 11 additions & 15 deletions Shared/Codecs/Serializer/JsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
using System;

namespace rpcx.net.Shared.Codecs.Serializer
{
public class JsonSerializer : ISerializer
{
namespace rpcx.net.Shared.Codecs.Serializer {
public class JsonSerializer : ISerializer {
private static JsonSerializer _default;
public static JsonSerializer Default
{
get
{
public static JsonSerializer Default {
get {
if (_default is null)
_default = new JsonSerializer();
return _default;
}
}

public object Deserialize(Type type, ReadOnlyMemory<byte> bytes)
{
var json = System.Text.Encoding.UTF8.GetString(bytes.ToArray());
return System.Text.Json.JsonSerializer.Deserialize(json, type);
public object Deserialize(Type type, ReadOnlyMemory<byte> bytes) {
return Utf8Json.JsonSerializer.NonGeneric.Deserialize(type, bytes.ToArray());
}
//System.Text.Json.JsonSerializer.Deserialize(bytes.Span, type);

public byte[] Serialize(object value) =>
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(value);
// https://michaelscodingspot.com/the-battle-of-c-to-json-serializers-in-net-core-3/
// 替换成 Utf8Json 序列化
// 因为 在序列化 Message WithMetadata 时, System.Text.Json 会崩溃

public byte[] Serialize(object value) => Utf8Json.JsonSerializer.Serialize(value);
}
}
87 changes: 87 additions & 0 deletions Shared/Context.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Collections.Generic;

namespace rpcx.net.Shared {
public interface IContext {
object Key();
object Value(object key);
void SetValue(object key, object val);

IContext Parent();
IContext FindParent(object key);
}

public sealed class Context {
/// <summary>
/// ReqMetaDataKey is used to set metatdata in context of requests.
/// refrence: rpcx -> ReqMetaDataKey
/// </summary>
public const string ReqMetaDataKey = "__req_metadata";

/// <summary>
/// ResMetaDataKey is used to set metatdata in context of responses.
/// refrence: rpcx -> ResMetaDataKey
/// </summary>
public const string ResMetaDataKey = "__res_metadata";

public static IContext WithValue(IContext parent, object key, object val = null) {
return new valueCtx {
_parent = parent,
_key = key,
_val = val
};
}

/// <summary>
/// 设置 medadata
/// </summary>
/// <param name="parent"></param>
/// <param name="val">Medatada</param>
/// <returns></returns>
public static IContext WithMetadata(IContext parent, IDictionary<string, string> val) {
return WithValue(parent, ReqMetaDataKey, val);
}
}

class valueCtx : IContext {
internal IContext _parent;

internal object _key;
internal object _val;

public object Key() => _key;

public object Value(object key) {
if (_key == key) {
return _val;
}
if (_parent == null) {
return null;
}
return _parent.Value(key);
}

public void SetValue(object key, object val) {
var ctx = FindParent(key) as valueCtx;
if (ctx == null) {
throw new Exception($"当前对象上下文中,无法找到 [Key = {key} ] 对应的节点。");
}
ctx._val = val;
}

public IContext Parent() => _parent;

public IContext FindParent(object key) {
if (this._key == key) {
return this;
}

var parent = this._parent;
if (parent == null) {
this._parent = Context.WithValue(null, key);
return this._parent;
}
return parent.FindParent(key);
}
}
}
Loading