159 lines
5.1 KiB
C#
159 lines
5.1 KiB
C#
using UnityEngine;
|
|
using Best.MQTT;
|
|
using Best.MQTT.Packets;
|
|
using Best.MQTT.Packets.Builders;
|
|
using Newtonsoft.Json;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Text;
|
|
|
|
namespace Octopus.Simulator.Networks
|
|
{
|
|
public class MQTTManager : MonoBehaviour
|
|
{
|
|
public static MQTTManager mqttManager;
|
|
public Dictionary<string, MQTTClient> clientTable = new Dictionary<string, MQTTClient>();
|
|
public BaseSimulationMessage basemessage;
|
|
public MQTTDataDistributer distributer;
|
|
|
|
public event Action<string, string> onMessageReceived;
|
|
|
|
[SerializeField]
|
|
string MQTTpath = "";
|
|
MQTTClient client;
|
|
|
|
public event Action onLogicUpdated;
|
|
bool flag = true;
|
|
// Start is called once before the first execution of Update after the MonoBehaviour is created
|
|
void Awake()
|
|
{
|
|
mqttManager = this;
|
|
#if UNITY_EDITOR
|
|
SetMqttConfig();
|
|
#else
|
|
FindAnyObjectByType<WebReceiver>().onMqttConfigReceived += SetMqttConfig;
|
|
#endif
|
|
}
|
|
|
|
private void Start()
|
|
{
|
|
distributer = MQTTDataDistributer.Instance;
|
|
}
|
|
|
|
public void SetMqttConfig()
|
|
{
|
|
TextAsset json = Resources.Load<TextAsset>(MQTTpath);
|
|
MQTTConfigList ConfigList = JsonConvert.DeserializeObject<MQTTConfigList>(json.text);
|
|
var config = ConfigList.configs[0];
|
|
string clientName = config.name;
|
|
string host = config.host;
|
|
int port = config.port;
|
|
|
|
Connect(clientName, host, port);
|
|
}
|
|
|
|
public void SetMqttConfig(string json)
|
|
{
|
|
MQTTConfigList ConfigList = JsonConvert.DeserializeObject<MQTTConfigList>(json);
|
|
var config = ConfigList.configs[0];
|
|
string clientName = config.name;
|
|
string host = config.host;
|
|
int port = config.port;
|
|
|
|
Connect(clientName, host, port);
|
|
}
|
|
|
|
public void SubscribeTopic(string topic)
|
|
{
|
|
Debug.Log($"Try Subscribe {topic}");
|
|
|
|
client.AddTopicAlias(topic);
|
|
|
|
client.CreateSubscriptionBuilder(topic)
|
|
.WithMessageCallback(OnMessage)
|
|
.WithMaximumQoS(QoSLevels.ExactlyOnceDelivery)
|
|
.BeginSubscribe();
|
|
}
|
|
|
|
public void UnsubscribeTopic(string topic)
|
|
{
|
|
client.CreateUnsubscribePacketBuilder(topic).BeginUnsubscribe();
|
|
}
|
|
|
|
public void Connect(string clientName, string host, int port)
|
|
{
|
|
client = new MQTTClientBuilder()
|
|
.WithOptions(new ConnectionOptionsBuilder().WithWebSocket(host,port).WithPath("/mqtt").WithTLS())
|
|
.WithEventHandler(client => OnConnected(client, clientName))
|
|
.WithEventHandler(OnStateChange)
|
|
.WithEventHandler(OnDisconnected)
|
|
.WithEventHandler(OnError)
|
|
.CreateClient();
|
|
|
|
client.BeginConnect(ConnectPacketBuilderCallback);
|
|
}
|
|
|
|
public event Action<MQTTClient> onConnected;
|
|
private void OnConnected(MQTTClient client, string clientName)
|
|
{
|
|
clientTable.Add(clientName, client);
|
|
onConnected?.Invoke(client);
|
|
|
|
if (flag)
|
|
{
|
|
SubscribeTopic($"middleware/#");
|
|
flag = false;
|
|
}
|
|
}
|
|
|
|
private ConnectPacketBuilder ConnectPacketBuilderCallback(MQTTClient client, ConnectPacketBuilder builder)
|
|
{
|
|
return builder;
|
|
}
|
|
|
|
private void OnError(MQTTClient client, string error)
|
|
{
|
|
Debug.Log($"OnError! :{error}");
|
|
}
|
|
|
|
private void OnMessage(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
|
|
{
|
|
var payload = Encoding.UTF8.GetString(message.Payload.Data, message.Payload.Offset, message.Payload.Count);
|
|
if (topicName.Contains("completed"))
|
|
{
|
|
return;
|
|
}
|
|
if (payload.Contains("component_id"))
|
|
{
|
|
basemessage = JsonConvert.DeserializeObject<BaseSimulationMessage>(payload);
|
|
MQTTDataBase.Instance.AddDict(basemessage.component_id, payload);
|
|
distributer.Distribute(basemessage.component_id, payload);
|
|
}
|
|
if (topicName.Contains("middleware"))
|
|
{
|
|
onLogicUpdated?.Invoke();
|
|
Debug.Log("logicupdated");
|
|
}
|
|
}
|
|
|
|
private void OnDisconnected(MQTTClient client, DisconnectReasonCodes reasonCode, string reasonMessage)
|
|
{
|
|
Debug.Log(reasonMessage);
|
|
//throw new NotImplementedException();
|
|
}
|
|
|
|
private void OnStateChange(MQTTClient client, ClientStates oldState, ClientStates newState)
|
|
{
|
|
//throw new NotImplementedException();
|
|
}
|
|
|
|
void OnDestroy()
|
|
{
|
|
client?.CreateDisconnectPacketBuilder()
|
|
.WithReasonCode(DisconnectReasonCodes.NormalDisconnection)
|
|
.WithReasonString("Bye")
|
|
.BeginDisconnect();
|
|
}
|
|
}
|
|
}
|