From 6bac7d53b1900731c4c62c4689fb2bba7c6ab7f7 Mon Sep 17 00:00:00 2001 From: logonkhi Date: Mon, 14 Jul 2025 20:08:04 +0900 Subject: [PATCH] =?UTF-8?q?=EB=8D=B0=EC=9D=B4=ED=84=B0=20=EB=B2=84?= =?UTF-8?q?=ED=8D=BC=20=ED=81=B4=EB=9E=98=EC=8A=A4=20=EC=9E=91=EC=84=B1=20?= =?UTF-8?q?=EC=A4=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Assets/Scripts/UVC/Data/DataRepository.cs | 14 +- Assets/Scripts/UVC/Data/HttpPipeLine.cs | 2 +- Assets/Scripts/UVC/Data/MQTTPipeLine.cs | 2 +- Assets/Scripts/UVC/Data/MqttDataPacket.cs | 65 ++++ .../Scripts/UVC/Data/MqttDataPacket.cs.meta | 2 + Assets/Scripts/UVC/Data/MqttWorker.cs | 312 ++++++++++++++++++ Assets/Scripts/UVC/Data/MqttWorker.cs.meta | 2 + Assets/Scripts/UVC/Network/MQTTService.cs | 16 +- 8 files changed, 402 insertions(+), 13 deletions(-) create mode 100644 Assets/Scripts/UVC/Data/MqttDataPacket.cs create mode 100644 Assets/Scripts/UVC/Data/MqttDataPacket.cs.meta create mode 100644 Assets/Scripts/UVC/Data/MqttWorker.cs create mode 100644 Assets/Scripts/UVC/Data/MqttWorker.cs.meta diff --git a/Assets/Scripts/UVC/Data/DataRepository.cs b/Assets/Scripts/UVC/Data/DataRepository.cs index e999ec5d..d2a4f86b 100644 --- a/Assets/Scripts/UVC/Data/DataRepository.cs +++ b/Assets/Scripts/UVC/Data/DataRepository.cs @@ -1,9 +1,7 @@ -using Cysharp.Threading.Tasks; -using Newtonsoft.Json; +using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; -using UnityEngine; using UVC.Log; namespace UVC.Data @@ -57,7 +55,7 @@ namespace UVC.Data /// 저장할 데이터 객체 /// true인 경우 업데이트된 속성만 반환, false인 경우 전체 객체 반환 /// 새로 추가된 객체 또는 업데이트된 기존 객체 - public IDataObject AddData(string key, IDataObject dataObject, bool updatedDataOnly = true) + public IDataObject AddOrUpdateData(string key, IDataObject dataObject, bool updatedDataOnly = true) { if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key), "키는 null이거나 빈 문자열일 수 없습니다."); @@ -70,7 +68,7 @@ namespace UVC.Data if (!dataObjects.ContainsKey(key)) { var newData = dataObject.Clone(fromPool: false); - + dataObjects.Add(key, newData); dataObject.MarkAllAsUpdated(); //UniTask.Post(() => NotifyDataUpdate(key, newData)); @@ -212,7 +210,7 @@ namespace UVC.Data /// 업데이트된 데이터의 키 /// 업데이트된 데이터 객체 /// - /// 이 메서드는 주로 내부적으로 AddData 메서드에서 호출되어 + /// 이 메서드는 주로 내부적으로 AddOrUpdateData 메서드에서 호출되어 /// 특정 키의 데이터가 변경되었을 때 등록된 핸들러들에게 알립니다. /// //private void NotifyDataUpdate(string key, IDataObject dataObject) @@ -393,12 +391,12 @@ namespace UVC.Data { DataObject dataObject = DataObjectPool.Get(); dataObject.FromJObject(jObject); - AddData(key, dataObject, false); + AddOrUpdateData(key, dataObject, false); } else if (value is JArray jArray) { DataArray dataArray = DataArrayPool.Get().FromJArray(jArray); - AddData(key, dataArray, false); + AddOrUpdateData(key, dataArray, false); } } } diff --git a/Assets/Scripts/UVC/Data/HttpPipeLine.cs b/Assets/Scripts/UVC/Data/HttpPipeLine.cs index aa2f0c3e..2a7aafc5 100644 --- a/Assets/Scripts/UVC/Data/HttpPipeLine.cs +++ b/Assets/Scripts/UVC/Data/HttpPipeLine.cs @@ -430,7 +430,7 @@ namespace UVC.Data var repoObject = mappedObject; if (mappedObject != null) { - repoObject = DataRepository.Instance.AddData(key, mappedObject, info.UpdatedDataOnly); + repoObject = DataRepository.Instance.AddOrUpdateData(key, mappedObject, info.UpdatedDataOnly); if (repoObject == mappedObject) repoObject = mappedObject.Clone(fromPool: false); } diff --git a/Assets/Scripts/UVC/Data/MQTTPipeLine.cs b/Assets/Scripts/UVC/Data/MQTTPipeLine.cs index 5291e0b6..e504c0f1 100644 --- a/Assets/Scripts/UVC/Data/MQTTPipeLine.cs +++ b/Assets/Scripts/UVC/Data/MQTTPipeLine.cs @@ -244,7 +244,7 @@ namespace UVC.Data if (mappedObject == null) return; // DataRepository는 내부적으로 데이터를 복사/업데이트하므로, mappedObject는 여기서 임시 객체가 됩니다. - var repoObject = DataRepository.Instance.AddData(topic, mappedObject, info.UpdatedDataOnly); + var repoObject = DataRepository.Instance.AddOrUpdateData(topic, mappedObject, info.UpdatedDataOnly); if(repoObject == mappedObject) repoObject = mappedObject.Clone(fromPool: false); // 핸들러 호출이 필요한지 확인 bool shouldInvoke = !info.UpdatedDataOnly || (repoObject != null && repoObject.UpdatedCount > 0); diff --git a/Assets/Scripts/UVC/Data/MqttDataPacket.cs b/Assets/Scripts/UVC/Data/MqttDataPacket.cs new file mode 100644 index 00000000..9bd012fe --- /dev/null +++ b/Assets/Scripts/UVC/Data/MqttDataPacket.cs @@ -0,0 +1,65 @@ +using System; + +namespace UVC.Data +{ + /// + /// 수신된 단일 MQTT 메시지에 대한 모든 정보를 담는 데이터 컨테이너 클래스입니다. + /// 이 객체는 MqttWorker가 메시지를 수신했을 때 생성되며, MqttDataManager를 통해 + /// 최종적으로 데이터 소비자에게 전달됩니다. + /// + /// + /// + /// // MqttDataPacket 객체 생성 예시 + /// var packet = new MqttDataPacket("sensor/temp", "25.5"); + /// + /// // 데이터 접근 예시 + /// Debug.Log($"수신 시간: {packet.Timestamp}"); + /// Debug.Log($"토픽: {packet.Topic}"); + /// Debug.Log($"내용: {packet.Payload}"); + /// + /// // MqttDataManager가 이 패킷을 리스너에게 전달한 후, + /// // IsPropagated를 true로 설정하여 중복 전송을 방지합니다. + /// packet.IsPropagated = true; + /// Debug.Log($"처리 완료 여부: {packet.IsPropagated}"); + /// + /// + public class MqttDataPacket + { + /// + /// 데이터가 수신된 시간 (UTC 기준)입니다. + /// 전 세계 어디서든 동일한 시간 기록을 보장하기 위해 협정 세계시(UTC)를 사용합니다. + /// 'init' 키워드는 객체가 처음 생성될 때만 값을 할당할 수 있도록 하여, 데이터의 불변성을 보장합니다. + /// + public DateTime Timestamp { get; private set; } + + /// + /// 메시지가 발행된 MQTT 토픽입니다. (예: "home/livingroom/light") + /// + public string Topic { get; private set; } + + /// + /// 메시지의 실제 내용(데이터)입니다. 보통 JSON 형식의 문자열이 담깁니다. + /// + public string Payload { get; private set; } + + /// + /// 이 데이터가 리스너에게 전파(전달)되었는지 여부를 나타내는 플래그입니다. + /// MqttDataManager가 이 값을 사용하여 동일한 데이터를 중복으로 전달하는 것을 방지합니다. + /// + public bool IsPropagated { get; set; } + + /// + /// 새로운 MqttDataPacket 인스턴스를 생성합니다. + /// + /// 메시지가 수신된 토픽 + /// 메시지의 내용 + public MqttDataPacket(string topic, string payload) + { + this.Timestamp = DateTime.UtcNow; + this.Topic = topic; + this.Payload = payload; + // 모든 패킷은 생성 시점에는 아직 리스너에게 전달되지 않았으므로 'false'로 초기화됩니다. + this.IsPropagated = false; + } + } +} diff --git a/Assets/Scripts/UVC/Data/MqttDataPacket.cs.meta b/Assets/Scripts/UVC/Data/MqttDataPacket.cs.meta new file mode 100644 index 00000000..7d7490e5 --- /dev/null +++ b/Assets/Scripts/UVC/Data/MqttDataPacket.cs.meta @@ -0,0 +1,2 @@ +fileFormatVersion: 2 +guid: cea10eb46c464f14c9a7ac8a38b4d73e \ No newline at end of file diff --git a/Assets/Scripts/UVC/Data/MqttWorker.cs b/Assets/Scripts/UVC/Data/MqttWorker.cs new file mode 100644 index 00000000..b9ffc164 --- /dev/null +++ b/Assets/Scripts/UVC/Data/MqttWorker.cs @@ -0,0 +1,312 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using UnityEngine; +using UVC.network; + +namespace UVC.Data +{ + /// + /// 백그라운드 스레드에서 MQTT 통신, 데이터 처리 및 전파를 모두 담당하는 독립적인 워커 클래스입니다. + /// 생성 시 bufferDurationSec 값에 따라 두 가지 모드로 동작합니다. + /// + /// 1. 버퍼링 모드 (bufferDurationSec > 0): + /// - 수신된 메시지를 일정 주기(propagationIntervalSec)마다 모아서 리스너에게 전달합니다. + /// - 오래된 데이터는 자동으로 폐기됩니다. + /// + /// 2. 직접 전파 모드 (bufferDurationSec <= 0): + /// - 메시지를 수신하는 즉시 리스너에게 전달합니다. 버퍼링 과정이 생략됩니다. + /// + /// + /// 이 클래스에서 등록된 리스너(콜백)는 Unity의 메인 스레드가 아닌, + /// 별도의 백그라운드 스레드에서 호출됩니다. 따라서 리스너 내부에서 Unity API를 직접 호출하면 안 됩니다. + /// + public class MqttWorker + { + /// + /// 버퍼링 모드에서 사용되는 스레드 안전 큐입니다. + /// 네트워크 스레드에서 수신된 메시지를 워커의 처리 루프로 안전하게 전달하는 역할을 합니다. + /// + private ConcurrentQueue IncomingMessageQueue { get; set; } + + private Thread workerThread; + private volatile bool isRunning = false; + + /// + /// 스레드를 안전하게 종료시키기 위한 CancellationTokenSource 입니다. + /// Stop() 메서드가 호출되면 취소 신호를 보냅니다. + /// + private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + + private readonly Dictionary> topicBuffers = new Dictionary>(); + private readonly Dictionary>> listeners = new Dictionary>>(); + private readonly object bufferAndListenerLock = new object(); + + /// + /// 데이터를 버퍼에 보관할 최대 시간(초)입니다. 이 시간이 지난 데이터는 폐기됩니다. + /// 이 값이 0보다 크면 버퍼링 모드로 동작합니다. + /// + private float bufferDurationSec; + + /// + /// [버퍼링 모드 전용] 버퍼링된 데이터를 리스너에게 전파하는 주기(초)입니다. + /// + private float propagationIntervalSec; + + private string domain; + private int port; + + /// + /// MqttWorker의 생성자입니다. + /// + /// 접속할 MQTT 브로커의 도메인 주소입니다. + /// 접속할 MQTT 브로커의 포트 번호입니다. + /// 데이터 버퍼링 시간(초). 0 이하로 설정 시 버퍼링 없이 직접 전파 모드로 동작합니다. + /// 버퍼링 모드에서 데이터를 전파할 주기(초)입니다. + public MqttWorker(string domain = "localhost", int port = 1883, float bufferDurationSec = 0f, float propagationIntervalSec = 1f) + { + this.domain = domain; + this.port = port; + this.bufferDurationSec = bufferDurationSec; + this.propagationIntervalSec = propagationIntervalSec; + this.IncomingMessageQueue = new ConcurrentQueue(); + } + + /// + /// 백그라운드 워커 스레드를 시작합니다. + /// + public void Start() + { + if (isRunning) return; + isRunning = true; + workerThread = new Thread(Run); + workerThread.IsBackground = true; // 메인 앱 종료 시 스레드 자동 종료 + workerThread.Start(); + } + + /// + /// 백그라운드 워커 스레드를 중지합니다. + /// + public void Stop() + { + if (!isRunning) return; + isRunning = false; + cancellationTokenSource.Cancel(); + workerThread?.Join(); // 스레드가 완전히 종료될 때까지 대기 + } + + /// + /// 워커를 중지하고 관련된 리소스를 모두 해제합니다. + /// + public void Dispose() + { + Stop(); + cancellationTokenSource.Dispose(); + } + + /// + /// 특정 토픽에 대한 데이터 수신을 시작합니다. + /// 중요: 여기서 등록된 리스너(콜백)는 백그라운드 스레드에서 호출됩니다. + /// Unity API(GameObject, Transform 등)에 직접 접근하면 안 됩니다. + /// + public void AddListener(string topic, Action> listener) + { + lock (bufferAndListenerLock) + { + if (!listeners.ContainsKey(topic)) + { + listeners[topic] = listener; + topicBuffers[topic] = new List(); + } + else + { + listeners[topic] += listener; + } + } + } + + + /// + /// 지정된 토픽에서 리스너를 제거합니다. + /// + /// 지정된 리스너가 토픽의 마지막 리스너인 경우, 토픽은 내부 컬렉션에서 제거됩니다. + /// + /// 리스너를 제거할 토픽입니다. null이거나 비어 있을 수 없습니다. + /// 제거할 리스너로, 으로 표현됩니다. null일 수 없습니다. + public void RemoveListener(string topic, Action> listener) + { + lock (bufferAndListenerLock) + { + if (listeners.ContainsKey(topic)) + { + listeners[topic] -= listener; + if (listeners[topic] == null || listeners[topic].GetInvocationList().Length == 0) + { + listeners.Remove(topic); + topicBuffers.Remove(topic); + } + } + } + } + + /// + /// 백그라운드 스레드에서 실행되는 메인 루프입니다. + /// + private void Run() + { + Debug.Log("[Worker] 백그라운드 스레드 시작."); + // MQTT 서비스 객체를 생성하고 설정합니다. + MQTTService mqtt = new MQTTService(domain, port); + + // AddListener를 통해 미리 등록된 토픽들을 구독합니다. + lock (bufferAndListenerLock) + { + foreach (var topic in listeners.Keys) + { + mqtt.AddTopicHandler(topic, OnMqttMessageReceived); + } + } + mqtt.Connect(); + + // 설정된 bufferDurationSec 값에 따라 동작 모드를 결정합니다. + if (bufferDurationSec > 0) + { + // 버퍼링 모드: 주기적으로 메시지를 처리하고 전파합니다. + RunBufferingLoop(); + } + else + { + // 직접 전파 모드: 스레드를 대기 상태로 두고, 메시지 수신 시 직접 리스너를 호출합니다. + RunDirectPropagationLoop(); + } + + // MQTT 연결을 해제하고 관련 리소스를 정리합니다. + mqtt.Dispose(); + // ------------------------------------ + Debug.Log("[Worker] 백그라운드 스레드 종료."); + } + + /// + /// 버퍼링 모드의 메인 루프입니다. + /// + private void RunBufferingLoop() + { + var cancellationToken = cancellationTokenSource.Token; + var propagationIntervalMs = (int)(propagationIntervalSec * 1000); + + while (!cancellationToken.IsCancellationRequested) + { + ProcessIncomingMessages(); + PropagateBufferedData(); + + try + { + // 다음 전파 주기까지 대기합니다. + if (cancellationToken.WaitHandle.WaitOne(propagationIntervalMs)) + { + break; // Stop() 호출 시 루프 종료 + } + } + catch (ObjectDisposedException) { break; } + } + } + + /// + /// 직접 전파 모드의 메인 루프입니다. Stop()이 호출될 때까지 스레드를 대기시킵니다. + /// + private void RunDirectPropagationLoop() + { + try + { + cancellationTokenSource.Token.WaitHandle.WaitOne(); + } + catch (ObjectDisposedException) + { + // 정상적인 종료 과정이므로 예외를 무시합니다. + } + } + + /// + /// MQTT 메시지가 수신될 때마다 MQTTService에 의해 호출되는 콜백 메서드입니다. + /// 이 메서드는 워커 스레드에서 실행됩니다. + /// + /// 메시지가 수신된 토픽 + /// 수신된 메시지 내용 + private void OnMqttMessageReceived(string topic, string message) + { + if (bufferDurationSec > 0) + { + // 버퍼링 모드: 메시지를 큐에 넣어 워커 스레드로 전달합니다. + IncomingMessageQueue.Enqueue(new MqttDataPacket(topic, message)); + } + else + { + // 직접 전파 모드: 즉시 리스너를 호출합니다. + lock (bufferAndListenerLock) + { + listeners[topic]?.Invoke(new List() { new MqttDataPacket(topic, message) }); + } + } + } + + /// + /// [버퍼링 모드 전용] 큐의 메시지를 토픽별 버퍼로 옮깁니다. + /// + private void ProcessIncomingMessages() + { + // 큐에서 메시지를 가져와 처리 + while (IncomingMessageQueue.TryDequeue(out MqttDataPacket packet)) + { + lock (bufferAndListenerLock) + { + if (topicBuffers.ContainsKey(packet.Topic)) + { + topicBuffers[packet.Topic].Add(packet); + } + } + } + } + + /// + /// [버퍼링 모드 전용] 버퍼링된 데이터를 리스너에게 전파합니다. + /// + private void PropagateBufferedData() + { + DateTime cutoffTime = DateTime.UtcNow.AddSeconds(-bufferDurationSec); + + lock (bufferAndListenerLock) + { + foreach (var topic in listeners.Keys.ToList()) + { + if (!topicBuffers.ContainsKey(topic)) continue; + + var buffer = topicBuffers[topic]; + // 오래된 데이터 제거 + buffer.RemoveAll(p => p.Timestamp < cutoffTime); + + var newPackets = buffer.Where(p => !p.IsPropagated).ToList(); + + if (newPackets.Count > 0) + { + try + { + // 중요: 이 콜백은 Worker 스레드에서 직접 호출됩니다. + listeners[topic]?.Invoke(newPackets); + } + catch (Exception ex) + { + Debug.LogError($"[Worker] 리스너 실행 중 오류 발생 (Topic: {topic}): {ex.Message}"); + } + + foreach (var packet in newPackets) + { + packet.IsPropagated = true; + } + } + } + } + } + } +} diff --git a/Assets/Scripts/UVC/Data/MqttWorker.cs.meta b/Assets/Scripts/UVC/Data/MqttWorker.cs.meta new file mode 100644 index 00000000..bdccc173 --- /dev/null +++ b/Assets/Scripts/UVC/Data/MqttWorker.cs.meta @@ -0,0 +1,2 @@ +fileFormatVersion: 2 +guid: 4031acfadc3ee0a4a852aa2b0681f696 \ No newline at end of file diff --git a/Assets/Scripts/UVC/Network/MQTTService.cs b/Assets/Scripts/UVC/Network/MQTTService.cs index da766cfe..a6a1068a 100644 --- a/Assets/Scripts/UVC/Network/MQTTService.cs +++ b/Assets/Scripts/UVC/Network/MQTTService.cs @@ -79,7 +79,6 @@ namespace UVC.network client.OnStateChanged += OnStateChangedMQTT; client.OnDisconnect += OnDisconnectedMQTT; client.OnError += OnErrorMQTT; - } /// @@ -327,8 +326,19 @@ namespace UVC.network { if (topicHandler.TryGetValue(topic.Filter.OriginalFilter, out var handler)) { - // 등록된 핸들러가 있으면 호출 - handler.Invoke(topic.Filter.OriginalFilter, payload); + // 메인 스레드에서 실행 중인지 확인합니다. + bool isMainThread = UniTask.SwitchToMainThread().GetAwaiter().IsCompleted; + Debug.Log($"MQTT OnTopic {topic.Filter.OriginalFilter} => {payload} isMainThread={isMainThread}"); + if (isMainThread) + { + // 메인 스레드이므로 핸들러를 직접 호출합니다. + handler.Invoke(topic.Filter.OriginalFilter, payload); + } + else + { + // 백그라운드 스레드이므로 UniTask.Post를 사용하여 메인 스레드로 작업을 보냅니다. + UniTask.Post(() => handler.Invoke(topic.Filter.OriginalFilter, payload)); + } } } }