#nullable enable using Cysharp.Threading.Tasks; using Newtonsoft.Json.Linq; using SampleProject.Config; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using UnityEngine; using UVC.Data.Core; using UVC.Data.Http; using UVC.Log; using UVC.Tests; namespace UVC.Data.Mqtt { /// /// MQTT 통신을 통해 데이터를 수신하고 처리하는 파이프라인을 관리하는 클래스입니다. /// /// /// 이 클래스는 MQTT 브로커에 연결하여 등록된 토픽으로 들어오는 메시지를 수신하고, /// 해당 메시지를 지정된 데이터 매퍼를 통해 변환한 후 핸들러에게 전달합니다. /// 여러 MQTT 토픽을 동시에 관리하고 각각에 대한 처리 방식을 개별적으로 설정할 수 있습니다. /// /// /// /// // 1. 데이터 구조를 정의하는 DataMask 생성 /// var dataMask = new DataMask(); /// dataMask["deviceId"] = ""; /// dataMask["temperature"] = 0.0; /// dataMask["humidity"] = 0.0; /// dataMask["timestamp"] = DateTime.Now; /// /// // 2. DataMapper 생성 /// var dataMapper = new DataMapper(dataMask); /// /// // 3. 데이터 처리 핸들러 정의 /// Action dataHandler = (data) => /// { /// if (data != null) /// { /// // 데이터 처리 로직 /// Console.WriteLine($"Received data: {data.ToJson()}"); /// } /// }; /// /// // 4. MqttSubscriptionConfig 생성 및 설정 /// var pipelineInfo = new MqttSubscriptionConfig("sensor/+/data") /// .SetDataMapper(dataMapper) /// .SetHandler(dataHandler); /// /// // 5. MqttDataReceiver 인스턴스 생성 /// var mqttReceiver = new MqttDataReceiver("mqtt.eclipseprojects.io", 1883); /// /// // 6. 파이프라인 정보 추가 /// mqttReceiver.Add(pipelineInfo); /// /// // 7. 파이프라인 실행 /// mqttReceiver.Start(); /// /// // ... 애플리케이션 로직 수행 ... /// /// // 8. 파이프라인 중지 및 리소스 해제 /// mqttReceiver.Exit(); /// mqttReceiver.Dispose(); /// /// public class MqttDataReceiver { /// /// 테스트를 위한 목업 모드 활성화 여부를 설정하거나 가져옵니다. /// /// /// true로 설정하면 실제 MQTT 요청 대신 MockMQTTService를 사용합니다. /// 테스트 환경에서 외부 의존성 없이 MQTT 통신을 시뮬레이션할 때 유용합니다. /// public bool UseMockup { get; set; } = false; /// /// MQTT 브로커의 도메인 주소 /// private string domain = "localhost"; /// /// MQTT 브로커의 포트 번호 /// private int port = 1883; private List topics = new List(); /// /// 토픽별 파이프라인 정보를 저장하는 딕셔너리 /// private ConcurrentDictionary configList; private MqttWorker mqttWorker; //private MockMQTTService? mockupMQTT; public string LastMessage { get; private set; } = string.Empty; //mqtt 연결 후 topic 별 첫 번째 메시지를 수신 했는지 여부를 저장하는 딕셔너리 private ConcurrentDictionary firstMessageReceived = new ConcurrentDictionary(); /// /// MqttDataReceiver 인스턴스를 생성합니다. /// public MqttDataReceiver() { mqttWorker = new MqttWorker(); configList = new ConcurrentDictionary(); } /// /// MQTT 연결의 도메인과 포트를 설정합니다. /// /// MQTT 브로커의 도메인 주소, 기본값은 "localhost"입니다. /// MQTT 브로커의 포트 번호, 기본값은 1883입니다. public void SetDomainPort(string domain, int port) { this.domain = string.IsNullOrEmpty(domain) ? Constants.MQTT_DOMAIN : domain; this.port = port; mqttWorker.SetDomainPort(this.domain, this.port); } /// /// 컬렉션에 주제가 없으면 추가합니다. /// /// 이 메서드는 컬렉션에 중복된 주제가 추가되지 않도록 합니다. 지정된 주제가 이미 있으면 /// 컬렉션은 변경되지 않습니다. /// 컬렉션에 추가할 주제입니다. null이거나 비어 있을 수 없습니다. public void AddTopic(string topic) { if (!topics.Contains(topic)) { topics.Add(topic); } } /// /// 토픽에 대한 파이프라인 정보를 추가합니다. /// /// 추가할 MqttSubscriptionConfig 객체 /// /// 동일한 토픽에 대한 정보가 이미 존재하는 경우 덮어씁니다. /// public void Add(MqttSubscriptionConfig info) { configList[info.Topic] = info; } /// /// 지정된 토픽에 대한 파이프라인 정보를 제거합니다. /// /// 제거할 토픽 이름 public void Remove(string topic) { configList.TryRemove(topic, out _); } /// /// 파이프라인을 실행하여 MQTT 브로커에 연결하고 등록된 모든 토픽을 구독합니다. /// public void Start() { if (!UseMockup) { if (mqttWorker.IsRunning) return; foreach (var topic in topics) { mqttWorker.AddListener(topic, OnTopicPacketMessage); } mqttWorker.Start(); } else { // Mockup 모드인 경우 MockMQTTService를 사용하여 테스트 환경을 설정합니다. //mockupMQTT = new MockMQTTService(); //foreach (var topic in configList.Keys) //{ // mockupMQTT.AddTopicHandler(topic, OnTopicMessage); //} //mockupMQTT.Connect(); } } /// /// 토픽에서 수신된 MQTT 데이터 패킷 목록을 처리합니다. /// /// 이 메서드는 구독된 MQTT 토픽에서 메시지가 수신될 때 호출됩니다. /// 이 메서드를 호출하기 전에 매개변수가 null이 아닌지 확인하십시오. /// 처리할 객체 목록입니다. null일 수 없습니다. private void OnTopicPacketMessage(string topic, List packets) { OnTopicMessage(topic, MqttDataPacket.ToJsonFromList(packets)); } private void OnTopicMessage(string topic, string message) { bool isMainThread = PlayerLoopHelper.IsMainThread; //Debug.Log($"OnTopicMessage isMainThread: {isMainThread}, topic: {topic}"); if (isMainThread) { // 메시지 처리를 백그라운드 스레드에서 실행하여 메인 스레드 부하를 줄입니다. UniTask.RunOnThreadPool(() => { OnTopicMessageLogic(topic, message); }).Forget(); } else { OnTopicMessageLogic(topic, message); } } /// /// MQTT 토픽으로 메시지가 수신되었을 때 호출되는 콜백 메서드입니다. /// /// 수신된 메시지의 토픽 /// 수신된 메시지 내용 /// /// 이 메서드는 수신된 메시지의 형식(JSON 객체 또는 배열)에 따라 적절한 파싱을 수행하고, /// 등록된 데이터 매퍼를 통해 메시지를 변환한 후, 해당 토픽에 등록된 핸들러에게 전달합니다. /// 'UpdatedDataOnly' 설정에 따라 데이터가 변경된 경우에만 핸들러를 호출할 수도 있습니다. /// private void OnTopicMessageLogic(string topic, string message) { //Debug.Log($"OnTopicMessageLogic topic: {topic}, configList.ContainsKey(topic): {configList.ContainsKey(topic)}"); // 토픽이 infoList와 readyHandlerList에 존재하고, 준비 상태가 true인 경우에만 처리합니다. if (configList.TryGetValue(topic, out var config)) { bool isFirstMessage = false; // 첫 번째 메시지 수신 여부를 확인하고, 아직 수신하지 않았다면 처리합니다. if (!firstMessageReceived.ContainsKey(topic) || !firstMessageReceived[topic]) { firstMessageReceived[topic] = true; // 첫 번째 메시지를 수신했음을 기록합니다. isFirstMessage = true; // 첫 번째 메시지로 처리합니다. } IDataObject? mappedObject = null; message = message.Trim(); LastMessage = message; // 마지막 메시지를 저장하여 나중에 사용할 수 있습니다. if (!string.IsNullOrEmpty(message)) { try { if (message.StartsWith("{")) { mappedObject = HttpDataProcessor.ProcessObjectResponse(message, null, config.DataMapper, config.Validator, null); } else if (message.StartsWith("[")) { mappedObject = HttpDataProcessor.ProcessArrayResponse(message, null, config.DataMapper, config.Validator, null); } //Debug.Log($"OnTopicMessageLogic topic: {topic}, mappedObject == null: {mappedObject == null}, config.DataMapper:{config.DataMapper == null}, config.Validator:{config.Validator == null}"); if (mappedObject == null) return; //즉시 업데이트 여부를 설정합니다. if (mappedObject is DataObject dataObj) { dataObj.IsUpdateImmediately = isFirstMessage; } else if (mappedObject is DataArray dataArray) { dataArray.IsUpdateImmediately = isFirstMessage; } // DataRepository는 내부적으로 데이터를 복사/업데이트하므로, mappedObject는 여기서 임시 객체가 됩니다. var repoObject = DataRepository.Instance.AddOrUpdateData(topic, mappedObject, config.UpdatedDataOnly); if (repoObject == mappedObject) repoObject = mappedObject.Clone(fromPool: false); // 핸들러 호출이 필요한지 확인 bool shouldInvoke = !config.UpdatedDataOnly || (repoObject != null && repoObject.UpdatedCount > 0); if (shouldInvoke && config.Handler != null) { var handlerData = repoObject; // 핸들러를 메인 스레드에서 안전하게 호출 UniTask.Post(() => config.Handler?.Invoke(handlerData)); } } catch (Exception ex) { // 예외 발생 시 로깅 또는 처리 // 예외 로깅도 메인 스레드에서 처리하여 Unity API 호출에 대한 스레드 안정성 확보 //UniTask.Post(() => ULog.Error($"Error processing message for topic '{topic}': {ex.Message}", ex)); ULog.Error($"Error processing message for topic '{topic}': {ex.Message}", ex); } finally { // DataMapper가 생성한 임시 객체를 풀에 반환합니다. if (mappedObject != null) { mappedObject.ReturnToPool(); } } } } } /// /// 파이프라인을 중지하고 모든 토픽 구독을 해제한 후 MQTT 브로커와의 연결을 종료합니다. /// public void Stop() { if (!UseMockup) { if (!mqttWorker.IsRunning) return; foreach (var topic in configList.Keys) { mqttWorker.RemoveListener(topic, OnTopicPacketMessage); } mqttWorker.Stop(); firstMessageReceived.Clear(); } else { // Mockup 모드인 경우 MockMQTTService를 사용하여 연결을 종료합니다. //mockupMQTT?.Disconnect(); } } /// /// 현재 인스턴스에서 사용되는 모든 리소스를 해제하고 진행 중인 모든 작업을 중지합니다. /// /// 이 메서드는 모든 반복 작업을 중단하고, 내부 상태를 지우고, 관련 리소스를 삭제합니다. /// 를 호출한 후에는 해당 인스턴스를 더 이상 사용할 수 없습니다. public void Dispose() { if (!UseMockup) mqttWorker.Dispose(); //else mockupMQTT?.Disconnect(); configList.Clear(); firstMessageReceived.Clear(); } } }