#nullable enable using Newtonsoft.Json.Linq; using SampleProject.Config; using System; using System.Collections.Generic; using UVC.Log; using UVC.network; using UVC.Tests; namespace UVC.Data { /// /// MQTT 통신을 통해 데이터를 수신하고 처리하는 파이프라인을 관리하는 클래스입니다. /// /// /// 이 클래스는 MQTT 브로커에 연결하여 등록된 토픽으로 들어오는 메시지를 수신하고, /// 해당 메시지를 지정된 데이터 매퍼를 통해 변환한 후 핸들러에게 전달합니다. /// 여러 MQTT 토픽을 동시에 관리하고 각각에 대한 처리 방식을 개별적으로 설정할 수 있습니다. /// public class MQTTPipeLine { /// /// 테스트를 위한 목업 모드 활성화 여부를 설정하거나 가져옵니다. /// /// /// true로 설정하면 실제 MQTT 요청 대신 MQTTPipeLine를 사용합니다. /// 테스트 환경에서 외부 의존성 없이 MQTT 통신을 시뮬레이션할 때 유용합니다. /// public bool UseMockup { get; internal set; } = false; /// /// MQTT 브로커의 도메인 주소 /// private string domain; /// /// MQTT 브로커의 포트 번호 /// public int port; /// /// 토픽별 파이프라인 정보를 저장하는 딕셔너리 /// private Dictionary infoList; /// /// MQTT 통신을 처리하는 서비스 객체 /// private MQTTService mqtt; private MockMQTTService? mockupMQTT; /// /// MQTTPipeLine 인스턴스를 생성합니다. /// /// MQTT 브로커의 도메인 주소, 기본값은 "localhost"입니다. /// MQTT 브로커의 포트 번호, 기본값은 1883입니다. public MQTTPipeLine(string domain = "localhost", int port = 1883) { this.domain = string.IsNullOrEmpty(domain) ? Constants.MQTT_DOMAIN : domain; this.port = port; mqtt = new MQTTService(Constants.MQTT_DOMAIN, Constants.MQTT_PORT); infoList = new Dictionary(); } /// /// 토픽에 대한 파이프라인 정보를 추가합니다. /// /// 추가할 MQTTPipeLineInfo 객체 /// /// 동일한 토픽에 대한 정보가 이미 존재하는 경우 덮어씁니다. /// public void Add(MQTTPipeLineInfo info) { if (!infoList.ContainsKey(info.Topic)) { infoList.Add(info.Topic, info); } else { infoList[info.Topic] = info; } } /// /// 지정된 토픽에 대한 파이프라인 정보를 제거합니다. /// /// 제거할 토픽 이름 public void Remove(string topic) { if (infoList.ContainsKey(topic)) { infoList.Remove(topic); } } /// /// 파이프라인을 실행하여 MQTT 브로커에 연결하고 등록된 모든 토픽을 구독합니다. /// public void Execute() { if (!UseMockup) { foreach (var topic in infoList.Keys) { mqtt.AddTopicHandler(topic, OnTopicMessage); } mqtt.Connect(); } else { // Mockup 모드인 경우 MockMQTTService를 사용하여 테스트 환경을 설정합니다. mockupMQTT = new MockMQTTService(); foreach (var topic in infoList.Keys) { mockupMQTT.AddTopicHandler(topic, OnTopicMessage); } mockupMQTT.Connect(); } } /// /// MQTT 토픽으로 메시지가 수신되었을 때 호출되는 콜백 메서드입니다. /// /// 수신된 메시지의 토픽 /// 수신된 메시지 내용 /// /// 이 메서드는 수신된 메시지의 형식(JSON 객체 또는 배열)에 따라 적절한 파싱을 수행하고, /// 등록된 데이터 매퍼를 통해 메시지를 변환한 후, 해당 토픽에 등록된 핸들러에게 전달합니다. /// 'UpdatedDataOnly' 설정에 따라 데이터가 변경된 경우에만 핸들러를 호출할 수도 있습니다. /// private void OnTopicMessage(string topic, string message) { if (infoList.ContainsKey(topic)) { MQTTPipeLineInfo info = infoList[topic]; IDataObject? dataObject = null; message = message.Trim(); if (!string.IsNullOrEmpty(message)) { try { if (message.StartsWith("{")) { JObject source = JObject.Parse(message); if (info.DataMapper != null) dataObject = info.DataMapper.Map(source); } else if (message.StartsWith("[")) { JArray source = JArray.Parse(message); if (info.DataMapper != null) dataObject = info.DataMapper.Map(source); } if (dataObject != null) dataObject = DataRepository.Instance.AddData(topic, dataObject, info.UpdatedDataOnly); // 갱신 된 데이터가 있는 경우 핸들러 호출 if (info.UpdatedDataOnly) { if (dataObject != null && dataObject.UpdatedCount > 0) info.Handler?.Invoke(dataObject); } else { info.Handler?.Invoke(dataObject); } } catch (Exception ex) { // 예외 발생 시 로깅 또는 처리 ULog.Error($"Error processing message for topic '{topic}': {ex.Message}", ex); } } } } /// /// 파이프라인을 중지하고 모든 토픽 구독을 해제한 후 MQTT 브로커와의 연결을 종료합니다. /// public void Stop() { if (!UseMockup) { foreach (var topic in infoList.Keys) { mqtt.RemoveTopicHandler(topic, OnTopicMessage); } mqtt.Disconnect(); } else { // Mockup 모드인 경우 MockMQTTService를 사용하여 연결을 종료합니다. mockupMQTT?.Disconnect(); } } /// /// 현재 인스턴스에서 사용되는 모든 리소스를 해제하고 진행 중인 모든 작업을 중지합니다. /// /// 이 메서드는 모든 반복 작업을 중단하고, 내부 상태를 지우고, 관련 리소스를 삭제합니다. /// 를 호출한 후에는 해당 인스턴스를 더 이상 사용할 수 없습니다. public void Dispose() { if (!UseMockup) mqtt.Disconnect(); else mockupMQTT?.Disconnect(); infoList.Clear(); } } }