Skip to content

Commit ed2a1c5

Browse files
committed
Fixed way of reconnecting lost connections
CleanSession default value must be true
1 parent 10b48bc commit ed2a1c5

File tree

5 files changed

+25
-10
lines changed

5 files changed

+25
-10
lines changed

src/MQTTLib/MqttClient.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ public static MqttStatus Connect(string url, MqttConfig config)
132132
if (Connections.ContainsKey(key))
133133
{
134134
await Task.Delay(TimeSpan.FromSeconds(config.AutoReconnectDelay));
135-
await client.ReconnectAsync();
135+
MqttClientAuthenticateResult reconnectResult = await client.ConnectAsync(b.Build());
136+
SubscribePreviousConnection(key, reconnectResult);
136137
}
137138
}
138139
catch
@@ -232,9 +233,11 @@ public static MqttStatus Subscribe(Guid key, string topic, string gxproc, int qo
232233
{
233234
if (msg == null || msg.ApplicationMessage == null || msg.ApplicationMessage.Payload == null)
234235
return;
235-
236+
#if DEBUG
237+
Console.WriteLine("");
236238
Console.WriteLine($"Message arrived! Topic:{msg.ApplicationMessage.Topic} Payload:{Encoding.UTF8.GetString(msg.ApplicationMessage.Payload)}");
237-
239+
Console.WriteLine("");
240+
#endif
238241
try
239242
{
240243
Assembly asm = Assembly.LoadFrom(fullPath);
@@ -248,8 +251,6 @@ public static MqttStatus Subscribe(Guid key, string topic, string gxproc, int qo
248251
throw new NotImplementedException("Method 'execute' not found");
249252

250253
var obj = Activator.CreateInstance(procType);
251-
252-
253254
methodInfo.Invoke(obj, new object[] { msg.ApplicationMessage.Topic, Encoding.UTF8.GetString(msg.ApplicationMessage.Payload) });
254255
}
255256
catch (Exception ex)

src/MQTTLib/MqttConfig.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class MqttConfig
2020
public string PrivateKey { get; set; }
2121
public string ClientCerificatePassphrase { get; set; }
2222
public int ProtocolVersion { get; set; } = 500;
23-
public bool CleanSession { get; set; }
23+
public bool CleanSession { get; set; } = true;
2424
public bool AllowWildcardsInTopicFilters { get; set; }
2525
public int AutoReconnectDelay { get; set; } = 5;
2626
public int SessionExpiryInterval { get; set; } = 0;

tests/Publisher/Program.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ static void Main(string[] args)
2121
string command = Console.ReadLine();
2222
while (command != "exit")
2323
{
24-
MqttStatus status= MqttClient.Publish(key, topic, command, 2, true, 1);
24+
MqttStatus status= MqttClient.Publish(key, topic, command, 1, false, 60);
2525
if (status.Error)
2626
throw new Exception(status.ErrorMessage);
2727

2828
command = Console.ReadLine();
2929
}
3030

31+
MqttClient.Disconnect(key);
32+
3133
}
3234
catch (Exception ex)
3335
{

tests/Subscriber/CommonConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static Guid Connect()
4040
return Connect(config);
4141
}
4242

43-
static Guid Connect(MqttConfig config)
43+
public static Guid Connect(MqttConfig config)
4444
{
4545
string url = ConfigurationManager.AppSettings["url"];
4646

tests/Subscriber/Program.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,31 @@ static void Main(string[] args)
1010
{
1111
try
1212
{
13+
MqttConfig config = Common.CommonConnection.GetConfig();
14+
1315
//Guid key = Common.CommonConnection.ConnectTLS();
14-
Guid key = Common.CommonConnection.Connect();
16+
17+
18+
config.CleanSession = false;
19+
config.ClientId = "MyTest";
20+
21+
Guid key = Common.CommonConnection.Connect(config);
1522

1623
string topic = ConfigurationManager.AppSettings["topic"];
1724

18-
MqttStatus status = MqttClient.Subscribe(key, topic, "SaveMessage", 2);
25+
26+
Console.WriteLine($"Press <enter> to subscribe...");
27+
Console.ReadLine();
28+
29+
MqttStatus status = MqttClient.Subscribe(key, topic, "SaveMessage", 1);
1930

2031
if (status.Error)
2132
throw new Exception(status.ErrorMessage);
2233

2334
Console.WriteLine($"Subscribed to topic:{topic}");
2435
Console.WriteLine($"Press <enter> to exit...");
2536
Console.ReadLine();
37+
MqttClient.Disconnect(key);
2638
}
2739
catch (Exception ex)
2840
{

0 commit comments

Comments
 (0)