Skip to content

Commit 8125ee0

Browse files
committed
Topics are stores per connection
1 parent ed2a1c5 commit 8125ee0

File tree

2 files changed

+93
-55
lines changed

2 files changed

+93
-55
lines changed

src/MQTTLib/MqttClient.cs

Lines changed: 78 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ namespace MQTTLib
2020
public class MqttClient
2121
{
2222
static Dictionary<Guid, MqttClient> s_instances = new Dictionary<Guid, MqttClient>();
23-
static Dictionary<string, string> s_topicProcs = new Dictionary<string, string>();
23+
Dictionary<string, string> s_topicProcs = new Dictionary<string, string>();
2424

2525
readonly IMqttClient m_mqttClient;
2626
readonly MqttConfig m_config;
@@ -43,6 +43,67 @@ static Dictionary<Guid, MqttClient> Connections
4343
}
4444
}
4545

46+
void AddTopic(string topic, string gxProc)
47+
{
48+
s_topicProcs[topic] = gxProc;
49+
}
50+
51+
void RemoveTopic(string topic)
52+
{
53+
if (s_topicProcs.ContainsKey(topic))
54+
s_topicProcs.Remove(topic);
55+
}
56+
57+
string GetProc(string topic)
58+
{
59+
if (s_topicProcs.ContainsKey(topic))
60+
return s_topicProcs[topic];
61+
62+
return null;
63+
}
64+
65+
public void ProcessMessage(MqttApplicationMessageReceivedEventArgs msg)
66+
{
67+
if (msg == null || msg.ApplicationMessage == null || msg.ApplicationMessage.Payload == null)
68+
return;
69+
#if DEBUG
70+
Console.WriteLine("");
71+
Console.WriteLine($"Message arrived! Topic:{msg.ApplicationMessage.Topic} Payload:{Encoding.UTF8.GetString(msg.ApplicationMessage.Payload)}");
72+
Console.WriteLine("");
73+
#endif
74+
75+
string fullPath = GetProc(msg.ApplicationMessage.Topic);
76+
try
77+
{
78+
FileInfo info = new FileInfo(fullPath);
79+
string className = info.Name.Substring(1).Replace(info.Extension, string.Empty); //it's a main. There's a stub (a)
80+
81+
Assembly asm = Assembly.LoadFrom(fullPath);
82+
Type procType = asm.GetTypes().FirstOrDefault(t => t.FullName.EndsWith(className, StringComparison.InvariantCultureIgnoreCase));
83+
84+
if (procType == null)
85+
throw new InvalidDataException("Data type not found");
86+
87+
var methodInfo = procType.GetMethod("execute", new Type[] { typeof(string), typeof(string) });
88+
if (methodInfo == null)
89+
throw new NotImplementedException("Method 'execute' not found");
90+
91+
var obj = Activator.CreateInstance(procType);
92+
methodInfo.Invoke(obj, new object[] { msg.ApplicationMessage.Topic, Encoding.UTF8.GetString(msg.ApplicationMessage.Payload) });
93+
}
94+
catch (Exception ex)
95+
{
96+
Console.WriteLine($"Error executing the procedure at '{fullPath}'");
97+
Console.WriteLine(ex.Message);
98+
Exception inner = ex.InnerException;
99+
while (inner != null)
100+
{
101+
Console.WriteLine(inner.Message);
102+
inner = inner.InnerException;
103+
}
104+
}
105+
}
106+
46107
public static MqttStatus Connect(string url, MqttConfig config)
47108
{
48109
Guid key = Guid.NewGuid();
@@ -194,9 +255,9 @@ private static void SubscribePreviousConnection(Guid key, MqttClientAuthenticate
194255
foreach (PreviousSubscription subscription in list)
195256
{
196257
MqttClient client = GetClient(key);
197-
string procKey = GetProcKey(client, subscription.topic);
198-
if (s_topicProcs.ContainsKey(procKey))
199-
Subscribe(key, subscription.topic, s_topicProcs[procKey], subscription.qos);
258+
string gxproc = client.GetProc(subscription.topic);
259+
if (!string.IsNullOrEmpty(gxproc))
260+
Subscribe(key, subscription.topic, gxproc, subscription.qos);
200261
}
201262
}
202263
}
@@ -210,11 +271,16 @@ public static MqttStatus Subscribe(Guid key, string topic, string gxproc, int qo
210271
if (string.IsNullOrEmpty(gxproc))
211272
throw new ArgumentNullException(nameof(gxproc), "GeneXus procedure parameter cannot be null");
212273

213-
string fileName = $"a{gxproc}.dll";
214-
string baseDir = !string.IsNullOrEmpty(AppDomain.CurrentDomain.RelativeSearchPath) ? AppDomain.CurrentDomain.RelativeSearchPath : AppDomain.CurrentDomain.BaseDirectory;
215-
string fullPath = Path.Combine(baseDir, fileName);
274+
string fullPath = gxproc;
275+
if (!Path.IsPathRooted(gxproc))
276+
{
277+
string fileName = $"a{gxproc}.dll";
278+
string baseDir = !string.IsNullOrEmpty(AppDomain.CurrentDomain.RelativeSearchPath) ? AppDomain.CurrentDomain.RelativeSearchPath : AppDomain.CurrentDomain.BaseDirectory;
279+
fullPath = Path.Combine(baseDir, fileName);
280+
}
281+
216282
if (!File.Exists(fullPath))
217-
throw new FileNotFoundException($"File not found at {fullPath}", fileName);
283+
throw new FileNotFoundException($"File not found at {fullPath}", fullPath);
218284

219285
MqttClient mqtt = GetClient(key);
220286

@@ -229,46 +295,11 @@ public static MqttStatus Subscribe(Guid key, string topic, string gxproc, int qo
229295
UserProperties = new List<MqttUserProperty> { new MqttUserProperty("gxrocedure", $"{{\"gxrocedure\":\"{gxproc}\"}}") }
230296
};
231297

232-
mqtt.m_mqttClient.UseApplicationMessageReceivedHandler(msg =>
233-
{
234-
if (msg == null || msg.ApplicationMessage == null || msg.ApplicationMessage.Payload == null)
235-
return;
236-
#if DEBUG
237-
Console.WriteLine("");
238-
Console.WriteLine($"Message arrived! Topic:{msg.ApplicationMessage.Topic} Payload:{Encoding.UTF8.GetString(msg.ApplicationMessage.Payload)}");
239-
Console.WriteLine("");
240-
#endif
241-
try
242-
{
243-
Assembly asm = Assembly.LoadFrom(fullPath);
244-
Type procType = asm.GetTypes().FirstOrDefault(t => t.FullName.EndsWith(gxproc, StringComparison.InvariantCultureIgnoreCase));
245-
246-
if (procType == null)
247-
throw new InvalidDataException("Data type not found");
248-
249-
var methodInfo = procType.GetMethod("execute", new Type[] { typeof(string), typeof(string) });
250-
if (methodInfo == null)
251-
throw new NotImplementedException("Method 'execute' not found");
252-
253-
var obj = Activator.CreateInstance(procType);
254-
methodInfo.Invoke(obj, new object[] { msg.ApplicationMessage.Topic, Encoding.UTF8.GetString(msg.ApplicationMessage.Payload) });
255-
}
256-
catch (Exception ex)
257-
{
258-
Console.WriteLine($"Error executing the procedure at '{fullPath}'");
259-
Console.WriteLine(ex.Message);
260-
Exception inner = ex.InnerException;
261-
while (inner != null)
262-
{
263-
Console.WriteLine(inner.Message);
264-
inner = inner.InnerException;
265-
}
266-
}
267-
268-
});
298+
if (mqtt.m_mqttClient.ApplicationMessageReceivedHandler == null)
299+
mqtt.m_mqttClient.UseApplicationMessageReceivedHandler(mqtt.ProcessMessage);
269300

270301
MqttClientSubscribeResult result = mqtt.m_mqttClient.SubscribeAsync(options).GetAwaiter().GetResult();
271-
s_topicProcs[GetProcKey(mqtt, topic)] = gxproc;
302+
mqtt.AddTopic(topic, fullPath);// GetClient s_topicProcs[GetProcKey(mqtt, topic)] = gxproc;
272303

273304
}
274305
catch (Exception ex)
@@ -285,9 +316,7 @@ public static MqttStatus Unsubscribe(Guid key, string topic)
285316
{
286317
MqttClient mqtt = GetClient(key);
287318
mqtt.m_mqttClient.UnsubscribeAsync(topic).GetAwaiter().GetResult();
288-
string procKey = GetProcKey(mqtt, topic);
289-
if (s_topicProcs.ContainsKey(procKey))
290-
s_topicProcs.Remove(procKey);
319+
mqtt.RemoveTopic(topic);
291320
}
292321
catch (Exception ex)
293322
{
@@ -345,7 +374,5 @@ static MqttClient GetClient(Guid key)
345374

346375
return Connections[key];
347376
}
348-
349-
static string GetProcKey(MqttClient client, string topic) => $"{client.m_mqttClient.Options.ClientId}:{topic}";
350377
}
351378
}

tests/Subscriber/Program.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ static void Main(string[] args)
1515
//Guid key = Common.CommonConnection.ConnectTLS();
1616

1717

18-
config.CleanSession = false;
19-
config.ClientId = "MyTest";
18+
//config.CleanSession = false;
19+
//config.ClientId = "MyTest";
2020

2121
Guid key = Common.CommonConnection.Connect(config);
2222

@@ -26,12 +26,23 @@ static void Main(string[] args)
2626
Console.WriteLine($"Press <enter> to subscribe...");
2727
Console.ReadLine();
2828

29-
MqttStatus status = MqttClient.Subscribe(key, topic, "SaveMessage", 1);
29+
MqttStatus status = MqttClient.Subscribe(key, "/test", "SaveMessage", 1);
3030

3131
if (status.Error)
3232
throw new Exception(status.ErrorMessage);
3333

34-
Console.WriteLine($"Subscribed to topic:{topic}");
34+
Console.WriteLine($"Subscribed to topic:/test");
35+
Console.WriteLine($"Press <enter> to subscribe (again)...");
36+
37+
Console.ReadLine();
38+
39+
status = MqttClient.Subscribe(key, "/test2", "SaveMessage2", 1);
40+
41+
if (status.Error)
42+
throw new Exception(status.ErrorMessage);
43+
44+
Console.WriteLine($"Subscribed to topic:/test2");
45+
3546
Console.WriteLine($"Press <enter> to exit...");
3647
Console.ReadLine();
3748
MqttClient.Disconnect(key);

0 commit comments

Comments
 (0)