430 lines
18 KiB
C#
430 lines
18 KiB
C#
#nullable enable
|
|
|
|
using Cysharp.Threading.Tasks;
|
|
using Newtonsoft.Json.Linq;
|
|
using SampleProject.Config;
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Threading;
|
|
using UnityEngine;
|
|
using UVC.Data.Core;
|
|
using UVC.Data.Http;
|
|
using UVC.Log;
|
|
using UVC.network;
|
|
using UVC.Tests;
|
|
|
|
namespace UVC.Data.Mqtt
|
|
{
|
|
/// <summary>
|
|
/// MQTT 통신을 통해 데이터를 수신하고 처리하는 파이프라인을 관리하는 클래스입니다.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// 이 클래스는 MQTT 브로커에 연결하여 등록된 토픽으로 들어오는 메시지를 수신하고,
|
|
/// 해당 메시지를 지정된 데이터 매퍼를 통해 변환한 후 핸들러에게 전달합니다.
|
|
/// 여러 MQTT 토픽을 동시에 관리하고 각각에 대한 처리 방식을 개별적으로 설정할 수 있습니다.
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// // 1. 데이터 구조를 정의하는 DataMask 생성
|
|
/// var dataMask = new DataMask();
|
|
/// dataMask["deviceId"] = "";
|
|
/// dataMask["temperature"] = 0.0;
|
|
/// dataMask["humidity"] = 0.0;
|
|
/// dataMask["timestamp"] = DateTime.Now;
|
|
///
|
|
/// // 2. DataMapper 생성
|
|
/// var dataMapper = new DataMapper(dataMask);
|
|
///
|
|
/// // 3. 데이터 처리 핸들러 정의
|
|
/// Action<IDataObject?> dataHandler = (data) =>
|
|
/// {
|
|
/// if (data != null)
|
|
/// {
|
|
/// // 데이터 처리 로직
|
|
/// Console.WriteLine($"Received data: {data.ToJson()}");
|
|
/// }
|
|
/// };
|
|
///
|
|
/// // 4. MqttSubscriptionConfig 생성 및 설정
|
|
/// var pipelineInfo = new MqttSubscriptionConfig("sensor/+/data")
|
|
/// .SetDataMapper(dataMapper)
|
|
/// .SetHandler(dataHandler);
|
|
///
|
|
/// // 5. MqttDataReceiver 인스턴스 생성
|
|
/// var mqttReceiver = new MqttDataReceiver("mqtt.eclipseprojects.io", 1883);
|
|
///
|
|
/// // 6. 파이프라인 정보 추가
|
|
/// mqttReceiver.AddChild(pipelineInfo);
|
|
///
|
|
/// // 7. 파이프라인 실행
|
|
/// mqttReceiver.Start();
|
|
///
|
|
/// // ... 애플리케이션 로직 수행 ...
|
|
///
|
|
/// // 8. 파이프라인 중지 및 리소스 해제
|
|
/// mqttReceiver.Exit();
|
|
/// mqttReceiver.Dispose();
|
|
/// </code>
|
|
/// </example>
|
|
public class MqttDataReceiver
|
|
{
|
|
|
|
/// <summary>
|
|
/// 테스트를 위한 목업 모드 활성화 여부를 설정하거나 가져옵니다.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// true로 설정하면 실제 MQTT 요청 대신 MockMQTTService를 사용합니다.
|
|
/// 테스트 환경에서 외부 의존성 없이 MQTT 통신을 시뮬레이션할 때 유용합니다.
|
|
/// </remarks>
|
|
public bool UseMockup { get; set; } = false;
|
|
|
|
/// <summary>
|
|
/// MQTT 브로커의 도메인 주소
|
|
/// </summary>
|
|
private string domain = "localhost";
|
|
|
|
/// <summary>
|
|
/// MQTT 브로커의 포트 번호
|
|
/// </summary>
|
|
private int port = 1883;
|
|
|
|
private List<string> topics = new List<string>();
|
|
|
|
/// <summary>
|
|
/// 토픽별 파이프라인 정보를 저장하는 딕셔너리
|
|
/// </summary>
|
|
private ConcurrentDictionary<string, MqttSubscriptionConfig> configList;
|
|
|
|
private MqttWorker mqttWorker;
|
|
|
|
//private MockMQTTService? mockupMQTT;
|
|
|
|
|
|
public string LastMessage { get; private set; } = string.Empty;
|
|
|
|
//mqtt 연결 후 topic 별 첫 번째 메시지를 수신 했는지 여부를 저장하는 딕셔너리
|
|
private ConcurrentDictionary<string, bool> firstMessageReceived = new ConcurrentDictionary<string, bool>();
|
|
|
|
private MqttDataPicker? defaultDataPicker;
|
|
|
|
#if UNITY_WEBGL && !UNITY_EDITOR
|
|
// WebGL: Thread 기반 MqttWorker 대체
|
|
private MQTTService? webGlMqttService;
|
|
// WebGL 대용량(JSON 배열) 처리 시 프레임 분할 청크 크기
|
|
private const int WebGlArrayChunkSize = 256;
|
|
#endif
|
|
|
|
/// <summary>
|
|
/// MqttDataReceiver 인스턴스를 생성합니다.
|
|
/// </summary>
|
|
public MqttDataReceiver()
|
|
{
|
|
mqttWorker = new MqttWorker();
|
|
configList = new ConcurrentDictionary<string, MqttSubscriptionConfig>();
|
|
}
|
|
|
|
/// <summary>
|
|
/// MQTT 연결의 도메인과 포트를 설정합니다.
|
|
/// </summary>
|
|
/// <param name="domain">MQTT 브로커의 도메인 주소, 기본값은 "localhost"입니다.</param>
|
|
/// <param name="port">MQTT 브로커의 포트 번호, 기본값은 1883입니다.</param>
|
|
public void SetDomainPort(string domain, int port)
|
|
{
|
|
this.domain = string.IsNullOrEmpty(domain) ? Constants.MQTT_DOMAIN : domain;
|
|
this.port = port;
|
|
#if UNITY_WEBGL && !UNITY_EDITOR
|
|
// WebGL에서는 mqttWorker 사용 안 함
|
|
#else
|
|
mqttWorker.SetDomainPort(this.domain, this.port);
|
|
#endif
|
|
}
|
|
|
|
/// <summary>
|
|
/// MqttDataPicker를 구성하고 MessagePack 직렬화를 활성화 또는 비활성화합니다.
|
|
/// </summary>
|
|
/// <param name="dataPicker">데이터 선택 및 직렬화 설정을 정의하는 <see cref="MqttDataPicker"/> 인스턴스입니다.
|
|
/// <see langword="null"/>일 수 없습니다.</param>
|
|
public void SetDataPicker(MqttDataPicker dataPicker)
|
|
{
|
|
defaultDataPicker = dataPicker;
|
|
#if UNITY_WEBGL && !UNITY_EDITOR
|
|
// MessagePack 설정은 필요 시 MQTTService 확장
|
|
#else
|
|
mqttWorker.SetEnableMessagePack(dataPicker.EnableMessagePack);
|
|
#endif
|
|
}
|
|
|
|
/// <summary>
|
|
/// 컬렉션에 주제가 없으면 추가합니다.
|
|
/// </summary>
|
|
/// <remarks>이 메서드는 컬렉션에 중복된 주제가 추가되지 않도록 합니다. 지정된 주제가 이미 있으면
|
|
/// 컬렉션은 변경되지 않습니다.</remarks>
|
|
/// <param name="topic">컬렉션에 추가할 주제입니다. null이거나 비어 있을 수 없습니다.</param>
|
|
public void AddTopic(string topic)
|
|
{
|
|
if (!topics.Contains(topic))
|
|
{
|
|
topics.Add(topic);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 토픽에 대한 파이프라인 정보를 추가합니다.
|
|
/// </summary>
|
|
/// <param name="info">추가할 MqttSubscriptionConfig 객체</param>
|
|
/// <remarks>
|
|
/// 동일한 토픽에 대한 정보가 이미 존재하는 경우 덮어씁니다.
|
|
/// </remarks>
|
|
public void Add(MqttSubscriptionConfig info)
|
|
{
|
|
configList[info.Topic] = info;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 지정된 토픽에 대한 파이프라인 정보를 제거합니다.
|
|
/// </summary>
|
|
/// <param name="topic">제거할 토픽 이름</param>
|
|
public void Remove(string topic)
|
|
{
|
|
configList.TryRemove(topic, out _);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 파이프라인을 실행하여 MQTT 브로커에 연결하고 등록된 모든 토픽을 구독합니다.
|
|
/// </summary>
|
|
public void Start()
|
|
{
|
|
#if UNITY_WEBGL && !UNITY_EDITOR
|
|
if (UseMockup)
|
|
{
|
|
Debug.LogWarning("WebGL Mockup 모드는 아직 구현되지 않았습니다.");
|
|
return;
|
|
}
|
|
if (webGlMqttService != null) return;
|
|
webGlMqttService = new MQTTService(domain, port, autoReconnect: true, onBackground: false);
|
|
foreach (var topic in topics)
|
|
{
|
|
webGlMqttService.AddTopicHandler(topic, OnWebGlRawTopicMessage);
|
|
}
|
|
webGlMqttService.Connect();
|
|
#else
|
|
if (!UseMockup)
|
|
{
|
|
if (mqttWorker.IsRunning) return;
|
|
foreach (var topic in topics)
|
|
{
|
|
mqttWorker.AddListener(topic, OnTopicPacketMessage);
|
|
}
|
|
mqttWorker.Start();
|
|
}
|
|
else
|
|
{
|
|
// Mockup 모드인 경우 MockMQTTService를 사용하여 테스트 환경을 설정합니다.
|
|
//mockupMQTT = new MockMQTTService();
|
|
//foreach (var topic in configList.Keys)
|
|
//{
|
|
// mockupMQTT.AddTopicHandler(topic, OnTopicMessage);
|
|
//}
|
|
//mockupMQTT.Connect();
|
|
}
|
|
#endif
|
|
}
|
|
|
|
/// <summary>
|
|
/// WebGL 환경에서 MQTTService가 직접 호출하는 Raw 메시지 처리 진입점
|
|
/// </summary>
|
|
private void OnWebGlRawTopicMessage(string topic, string message)
|
|
{
|
|
// WebGL: 스레드풀 없음 → 메인 스레드에서 부하 분산을 위해 한 프레임 양보
|
|
RunBackground(() => OnTopicMessageLogic(topic, message));
|
|
}
|
|
|
|
/// <summary>
|
|
/// 플랫폼별 백그라운드 실행 셔임
|
|
/// </summary>
|
|
private static void RunBackground(Action action)
|
|
{
|
|
#if UNITY_WEBGL && !UNITY_EDITOR
|
|
UniTask.Void(async () =>
|
|
{
|
|
await UniTask.NextFrame();
|
|
action();
|
|
});
|
|
#else
|
|
UniTask.RunOnThreadPool(action).Forget();
|
|
#endif
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// 토픽에서 수신된 MQTT 데이터 패킷 목록을 처리합니다.
|
|
/// </summary>
|
|
/// <remarks>이 메서드는 구독된 MQTT 토픽에서 메시지가 수신될 때 호출됩니다.
|
|
/// 이 메서드를 호출하기 전에 <paramref name="packets"/> 매개변수가 null이 아닌지 확인하십시오.</remarks>
|
|
/// <param name="topic">수신된 메시지의 토픽입니다. null일 수 없습니다.</param>
|
|
/// <param name="packets">처리할 <see cref="MqttDataPacket"/> 객체 목록입니다. null일 수 없습니다.</param>
|
|
/// <param name="isObject">수신된 메시지가 List에 한개의 객체인지 여부를 나타내는 부울 값입니다.</param>
|
|
private void OnTopicPacketMessage(string topic, List<MqttDataPacket> packets, bool isObject)
|
|
{
|
|
OnTopicMessage(topic, MqttDataPacket.ToJsonFromList(packets));
|
|
}
|
|
|
|
|
|
|
|
private void OnTopicMessage(string topic, string message)
|
|
{
|
|
bool isMainThread = PlayerLoopHelper.IsMainThread;
|
|
//Debug.Log($"OnTopicMessage isMainThread: {isMainThread}, topic: {topic}");
|
|
if (isMainThread)
|
|
{
|
|
// 메시지 처리를 백그라운드 스레드에서 실행하여 메인 스레드 부하를 줄입니다.
|
|
UniTask.RunOnThreadPool(() =>
|
|
{
|
|
OnTopicMessageLogic(topic, message);
|
|
}).Forget();
|
|
}
|
|
else
|
|
{
|
|
OnTopicMessageLogic(topic, message);
|
|
}
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// MQTT 토픽으로 메시지가 수신되었을 때 호출되는 콜백 메서드입니다.
|
|
/// </summary>
|
|
/// <param name="topic">수신된 메시지의 토픽</param>
|
|
/// <param name="message">수신된 메시지 내용</param>
|
|
/// <remarks>
|
|
/// 이 메서드는 수신된 메시지의 형식(JSON 객체 또는 배열)에 따라 적절한 파싱을 수행하고,
|
|
/// 등록된 데이터 매퍼를 통해 메시지를 변환한 후, 해당 토픽에 등록된 핸들러에게 전달합니다.
|
|
/// 'UpdatedDataOnly' 설정에 따라 데이터가 변경된 경우에만 핸들러를 호출할 수도 있습니다.
|
|
/// </remarks>
|
|
private void OnTopicMessageLogic(string topic, string message)
|
|
{
|
|
//Debug.Log($"OnTopicMessageLogic topic: {topic}, configList.ContainsKey(topic): {configList.ContainsKey(topic)}");
|
|
// 토픽이 infoList와 readyHandlerList에 존재하고, 준비 상태가 true인 경우에만 처리합니다.
|
|
if (configList.TryGetValue(topic, out var config))
|
|
{
|
|
if (defaultDataPicker != null)
|
|
{
|
|
string? dataMessage = defaultDataPicker.PickData(message);
|
|
if(dataMessage != null) message = dataMessage;
|
|
}
|
|
|
|
bool isFirstMessage = false;
|
|
// 첫 번째 메시지 수신 여부를 확인하고, 아직 수신하지 않았다면 처리합니다.
|
|
if (!firstMessageReceived.ContainsKey(topic) || !firstMessageReceived[topic])
|
|
{
|
|
firstMessageReceived[topic] = true; // 첫 번째 메시지를 수신했음을 기록합니다.
|
|
isFirstMessage = true; // 첫 번째 메시지로 처리합니다.
|
|
}
|
|
|
|
IDataObject? mappedObject = null;
|
|
message = message.Trim();
|
|
LastMessage = message; // 마지막 메시지를 저장하여 나중에 사용할 수 있습니다.
|
|
if (!string.IsNullOrEmpty(message))
|
|
{
|
|
try
|
|
{
|
|
if (message.StartsWith("{"))
|
|
{
|
|
mappedObject = HttpDataProcessor.ProcessObjectResponse(message, null, config.DataMapper, config.Validator, null);
|
|
}
|
|
else if (message.StartsWith("["))
|
|
{
|
|
mappedObject = HttpDataProcessor.ProcessArrayResponse(message, null, config.DataMapper, config.Validator, null);
|
|
}
|
|
//Debug.Log($"OnTopicMessageLogic topic: {topic}, mappedObject == null: {mappedObject == null}, config.DataMapper:{config.DataMapper == null}, config.Validator:{config.Validator == null}");
|
|
if (mappedObject == null) return;
|
|
|
|
//즉시 업데이트 여부를 설정합니다.
|
|
mappedObject.IsUpdateImmediately = isFirstMessage;
|
|
|
|
// DataRepository는 내부적으로 데이터를 복사/업데이트하므로, mappedObject는 여기서 임시 객체가 됩니다.
|
|
var repoObject = DataRepository.Instance.AddOrUpdateData(topic, mappedObject, config.UpdatedDataOnly);
|
|
if (repoObject == mappedObject) repoObject = mappedObject.Clone(fromPool: false);
|
|
// 핸들러 호출이 필요한지 확인
|
|
bool shouldInvoke = !config.UpdatedDataOnly || (repoObject != null && repoObject.UpdatedCount > 0);
|
|
if (shouldInvoke && config.Handler != null)
|
|
{
|
|
var handlerData = repoObject;
|
|
handlerData.IsUpdateImmediately = isFirstMessage; // 첫 번째 메시지 여부를 핸들러 데이터에 설정합니다.
|
|
// 핸들러를 메인 스레드에서 안전하게 호출
|
|
UniTask.Post(() => config.Handler?.Invoke(handlerData));
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// 예외 발생 시 로깅 또는 처리
|
|
// 예외 로깅도 메인 스레드에서 처리하여 Unity API 호출에 대한 스레드 안정성 확보
|
|
//UniTask.Post(() => ULog.Error($"Error processing message for topic '{topic}': {ex.Message}", ex));
|
|
ULog.Error($"Error processing message for topic '{topic}': {ex.Message}", ex);
|
|
}
|
|
finally
|
|
{
|
|
// DataMapper가 생성한 임시 객체를 풀에 반환합니다.
|
|
if (mappedObject != null)
|
|
{
|
|
mappedObject.ReturnToPool();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// 파이프라인을 중지하고 모든 토픽 구독을 해제한 후 MQTT 브로커와의 연결을 종료합니다.
|
|
/// </summary>
|
|
public void Stop()
|
|
{
|
|
#if UNITY_WEBGL && !UNITY_EDITOR
|
|
if (webGlMqttService != null)
|
|
{
|
|
webGlMqttService.Disconnect();
|
|
webGlMqttService.ClearTopicHandlers();
|
|
webGlMqttService = null;
|
|
}
|
|
firstMessageReceived.Clear();
|
|
#else
|
|
if (!UseMockup)
|
|
{
|
|
if (!mqttWorker.IsRunning) return;
|
|
foreach (var topic in configList.Keys)
|
|
{
|
|
mqttWorker.RemoveListener(topic, OnTopicPacketMessage);
|
|
}
|
|
mqttWorker.Stop();
|
|
firstMessageReceived.Clear();
|
|
}
|
|
else
|
|
{
|
|
// Mockup 모드인 경우 MockMQTTService를 사용하여 연결을 종료합니다.
|
|
//mockupMQTT?.Disconnect();
|
|
}
|
|
#endif
|
|
}
|
|
|
|
/// <summary>
|
|
/// 현재 인스턴스에서 사용되는 모든 리소스를 해제하고 진행 중인 모든 작업을 중지합니다.
|
|
/// </summary>
|
|
/// <remarks>이 메서드는 모든 반복 작업을 중단하고, 내부 상태를 지우고, 관련 리소스를 삭제합니다.
|
|
/// <see cref="Dispose"/>를 호출한 후에는 해당 인스턴스를 더 이상 사용할 수 없습니다.</remarks>
|
|
public void Dispose()
|
|
{
|
|
#if UNITY_WEBGL && !UNITY_EDITOR
|
|
webGlMqttService?.Dispose();
|
|
webGlMqttService = null;
|
|
#else
|
|
if (!UseMockup) mqttWorker.Dispose();
|
|
#endif
|
|
configList.Clear();
|
|
firstMessageReceived.Clear();
|
|
topics.Clear();
|
|
}
|
|
|
|
}
|
|
}
|