Files
XRLib/Assets/Scripts/UVC/Data/MQTTPipeLine.cs

75 lines
2.1 KiB
C#
Raw Normal View History

#nullable enable
using Newtonsoft.Json.Linq;
using SampleProject.Config;
using System.Collections.Generic;
using UnityEngine.InputSystem;
using UVC.network;
2025-06-05 20:09:28 +09:00
namespace UVC.Data
{
public class MQTTPipeLine
{
private Dictionary<string, MQTTPipeLineInfo> infoList = new Dictionary<string, MQTTPipeLineInfo>();
private MQTTService mqtt = new MQTTService(Constants.MQTT_DOMAIN, Constants.MQTT_PORT);
public void Add(MQTTPipeLineInfo info)
{
if (!infoList.ContainsKey(info.topic))
{
infoList.Add(info.topic, info);
}
else
{
infoList[info.topic] = info;
}
}
public void Remove(string topic)
{
if (infoList.ContainsKey(topic))
{
infoList.Remove(topic);
}
}
public void Execute()
{
foreach (var topic in infoList.Keys)
{
mqtt.AddTopicHandler(topic, OnTopicMessage);
}
mqtt.Connect();
}
private void OnTopicMessage(string topic, string message)
{
if (infoList.ContainsKey(topic))
{
MQTTPipeLineInfo info = infoList[topic];
IDataObject? dataObject = null;
if (!string.IsNullOrEmpty(message))
{
message = message.Trim();
if (message.StartsWith("{"))
{
JObject source = JObject.Parse(message);
if (info.dataMapper != null) dataObject = info.dataMapper.Mapping(source);
}
else if (message.StartsWith("["))
{
JArray source = JArray.Parse(message);
if (info.dataMapper != null) dataObject = info.dataMapper.Mapping(source);
}
}
if (dataObject != null) dataObject = DataRepository.Instance.AddData(topic, dataObject);
info.handler?.Invoke(dataObject);
}
}
2025-06-05 20:09:28 +09:00
}
}