diff --git a/Assets/Scripts/UVC/Data/DataRepository.cs b/Assets/Scripts/UVC/Data/DataRepository.cs
index e999ec5d..d2a4f86b 100644
--- a/Assets/Scripts/UVC/Data/DataRepository.cs
+++ b/Assets/Scripts/UVC/Data/DataRepository.cs
@@ -1,9 +1,7 @@
-using Cysharp.Threading.Tasks;
-using Newtonsoft.Json;
+using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
-using UnityEngine;
using UVC.Log;
namespace UVC.Data
@@ -57,7 +55,7 @@ namespace UVC.Data
/// 저장할 데이터 객체
/// true인 경우 업데이트된 속성만 반환, false인 경우 전체 객체 반환
/// 새로 추가된 객체 또는 업데이트된 기존 객체
- public IDataObject AddData(string key, IDataObject dataObject, bool updatedDataOnly = true)
+ public IDataObject AddOrUpdateData(string key, IDataObject dataObject, bool updatedDataOnly = true)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentNullException(nameof(key), "키는 null이거나 빈 문자열일 수 없습니다.");
@@ -70,7 +68,7 @@ namespace UVC.Data
if (!dataObjects.ContainsKey(key))
{
var newData = dataObject.Clone(fromPool: false);
-
+
dataObjects.Add(key, newData);
dataObject.MarkAllAsUpdated();
//UniTask.Post(() => NotifyDataUpdate(key, newData));
@@ -212,7 +210,7 @@ namespace UVC.Data
/// 업데이트된 데이터의 키
/// 업데이트된 데이터 객체
///
- /// 이 메서드는 주로 내부적으로 AddData 메서드에서 호출되어
+ /// 이 메서드는 주로 내부적으로 AddOrUpdateData 메서드에서 호출되어
/// 특정 키의 데이터가 변경되었을 때 등록된 핸들러들에게 알립니다.
///
//private void NotifyDataUpdate(string key, IDataObject dataObject)
@@ -393,12 +391,12 @@ namespace UVC.Data
{
DataObject dataObject = DataObjectPool.Get();
dataObject.FromJObject(jObject);
- AddData(key, dataObject, false);
+ AddOrUpdateData(key, dataObject, false);
}
else if (value is JArray jArray)
{
DataArray dataArray = DataArrayPool.Get().FromJArray(jArray);
- AddData(key, dataArray, false);
+ AddOrUpdateData(key, dataArray, false);
}
}
}
diff --git a/Assets/Scripts/UVC/Data/HttpPipeLine.cs b/Assets/Scripts/UVC/Data/HttpPipeLine.cs
index aa2f0c3e..2a7aafc5 100644
--- a/Assets/Scripts/UVC/Data/HttpPipeLine.cs
+++ b/Assets/Scripts/UVC/Data/HttpPipeLine.cs
@@ -430,7 +430,7 @@ namespace UVC.Data
var repoObject = mappedObject;
if (mappedObject != null)
{
- repoObject = DataRepository.Instance.AddData(key, mappedObject, info.UpdatedDataOnly);
+ repoObject = DataRepository.Instance.AddOrUpdateData(key, mappedObject, info.UpdatedDataOnly);
if (repoObject == mappedObject) repoObject = mappedObject.Clone(fromPool: false);
}
diff --git a/Assets/Scripts/UVC/Data/MQTTPipeLine.cs b/Assets/Scripts/UVC/Data/MQTTPipeLine.cs
index 5291e0b6..e504c0f1 100644
--- a/Assets/Scripts/UVC/Data/MQTTPipeLine.cs
+++ b/Assets/Scripts/UVC/Data/MQTTPipeLine.cs
@@ -244,7 +244,7 @@ namespace UVC.Data
if (mappedObject == null) return;
// DataRepository는 내부적으로 데이터를 복사/업데이트하므로, mappedObject는 여기서 임시 객체가 됩니다.
- var repoObject = DataRepository.Instance.AddData(topic, mappedObject, info.UpdatedDataOnly);
+ var repoObject = DataRepository.Instance.AddOrUpdateData(topic, mappedObject, info.UpdatedDataOnly);
if(repoObject == mappedObject) repoObject = mappedObject.Clone(fromPool: false);
// 핸들러 호출이 필요한지 확인
bool shouldInvoke = !info.UpdatedDataOnly || (repoObject != null && repoObject.UpdatedCount > 0);
diff --git a/Assets/Scripts/UVC/Data/MqttDataPacket.cs b/Assets/Scripts/UVC/Data/MqttDataPacket.cs
new file mode 100644
index 00000000..9bd012fe
--- /dev/null
+++ b/Assets/Scripts/UVC/Data/MqttDataPacket.cs
@@ -0,0 +1,65 @@
+using System;
+
+namespace UVC.Data
+{
+ ///
+ /// 수신된 단일 MQTT 메시지에 대한 모든 정보를 담는 데이터 컨테이너 클래스입니다.
+ /// 이 객체는 MqttWorker가 메시지를 수신했을 때 생성되며, MqttDataManager를 통해
+ /// 최종적으로 데이터 소비자에게 전달됩니다.
+ ///
+ ///
+ ///
+ /// // MqttDataPacket 객체 생성 예시
+ /// var packet = new MqttDataPacket("sensor/temp", "25.5");
+ ///
+ /// // 데이터 접근 예시
+ /// Debug.Log($"수신 시간: {packet.Timestamp}");
+ /// Debug.Log($"토픽: {packet.Topic}");
+ /// Debug.Log($"내용: {packet.Payload}");
+ ///
+ /// // MqttDataManager가 이 패킷을 리스너에게 전달한 후,
+ /// // IsPropagated를 true로 설정하여 중복 전송을 방지합니다.
+ /// packet.IsPropagated = true;
+ /// Debug.Log($"처리 완료 여부: {packet.IsPropagated}");
+ ///
+ ///
+ public class MqttDataPacket
+ {
+ ///
+ /// 데이터가 수신된 시간 (UTC 기준)입니다.
+ /// 전 세계 어디서든 동일한 시간 기록을 보장하기 위해 협정 세계시(UTC)를 사용합니다.
+ /// 'init' 키워드는 객체가 처음 생성될 때만 값을 할당할 수 있도록 하여, 데이터의 불변성을 보장합니다.
+ ///
+ public DateTime Timestamp { get; private set; }
+
+ ///
+ /// 메시지가 발행된 MQTT 토픽입니다. (예: "home/livingroom/light")
+ ///
+ public string Topic { get; private set; }
+
+ ///
+ /// 메시지의 실제 내용(데이터)입니다. 보통 JSON 형식의 문자열이 담깁니다.
+ ///
+ public string Payload { get; private set; }
+
+ ///
+ /// 이 데이터가 리스너에게 전파(전달)되었는지 여부를 나타내는 플래그입니다.
+ /// MqttDataManager가 이 값을 사용하여 동일한 데이터를 중복으로 전달하는 것을 방지합니다.
+ ///
+ public bool IsPropagated { get; set; }
+
+ ///
+ /// 새로운 MqttDataPacket 인스턴스를 생성합니다.
+ ///
+ /// 메시지가 수신된 토픽
+ /// 메시지의 내용
+ public MqttDataPacket(string topic, string payload)
+ {
+ this.Timestamp = DateTime.UtcNow;
+ this.Topic = topic;
+ this.Payload = payload;
+ // 모든 패킷은 생성 시점에는 아직 리스너에게 전달되지 않았으므로 'false'로 초기화됩니다.
+ this.IsPropagated = false;
+ }
+ }
+}
diff --git a/Assets/Scripts/UVC/Data/MqttDataPacket.cs.meta b/Assets/Scripts/UVC/Data/MqttDataPacket.cs.meta
new file mode 100644
index 00000000..7d7490e5
--- /dev/null
+++ b/Assets/Scripts/UVC/Data/MqttDataPacket.cs.meta
@@ -0,0 +1,2 @@
+fileFormatVersion: 2
+guid: cea10eb46c464f14c9a7ac8a38b4d73e
\ No newline at end of file
diff --git a/Assets/Scripts/UVC/Data/MqttWorker.cs b/Assets/Scripts/UVC/Data/MqttWorker.cs
new file mode 100644
index 00000000..b9ffc164
--- /dev/null
+++ b/Assets/Scripts/UVC/Data/MqttWorker.cs
@@ -0,0 +1,312 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using UnityEngine;
+using UVC.network;
+
+namespace UVC.Data
+{
+ ///
+ /// 백그라운드 스레드에서 MQTT 통신, 데이터 처리 및 전파를 모두 담당하는 독립적인 워커 클래스입니다.
+ /// 생성 시 bufferDurationSec 값에 따라 두 가지 모드로 동작합니다.
+ ///
+ /// 1. 버퍼링 모드 (bufferDurationSec > 0):
+ /// - 수신된 메시지를 일정 주기(propagationIntervalSec)마다 모아서 리스너에게 전달합니다.
+ /// - 오래된 데이터는 자동으로 폐기됩니다.
+ ///
+ /// 2. 직접 전파 모드 (bufferDurationSec <= 0):
+ /// - 메시지를 수신하는 즉시 리스너에게 전달합니다. 버퍼링 과정이 생략됩니다.
+ ///
+ ///
+ /// 이 클래스에서 등록된 리스너(콜백)는 Unity의 메인 스레드가 아닌,
+ /// 별도의 백그라운드 스레드에서 호출됩니다. 따라서 리스너 내부에서 Unity API를 직접 호출하면 안 됩니다.
+ ///
+ public class MqttWorker
+ {
+ ///
+ /// 버퍼링 모드에서 사용되는 스레드 안전 큐입니다.
+ /// 네트워크 스레드에서 수신된 메시지를 워커의 처리 루프로 안전하게 전달하는 역할을 합니다.
+ ///
+ private ConcurrentQueue IncomingMessageQueue { get; set; }
+
+ private Thread workerThread;
+ private volatile bool isRunning = false;
+
+ ///
+ /// 스레드를 안전하게 종료시키기 위한 CancellationTokenSource 입니다.
+ /// Stop() 메서드가 호출되면 취소 신호를 보냅니다.
+ ///
+ private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
+
+ private readonly Dictionary> topicBuffers = new Dictionary>();
+ private readonly Dictionary>> listeners = new Dictionary>>();
+ private readonly object bufferAndListenerLock = new object();
+
+ ///
+ /// 데이터를 버퍼에 보관할 최대 시간(초)입니다. 이 시간이 지난 데이터는 폐기됩니다.
+ /// 이 값이 0보다 크면 버퍼링 모드로 동작합니다.
+ ///
+ private float bufferDurationSec;
+
+ ///
+ /// [버퍼링 모드 전용] 버퍼링된 데이터를 리스너에게 전파하는 주기(초)입니다.
+ ///
+ private float propagationIntervalSec;
+
+ private string domain;
+ private int port;
+
+ ///
+ /// MqttWorker의 생성자입니다.
+ ///
+ /// 접속할 MQTT 브로커의 도메인 주소입니다.
+ /// 접속할 MQTT 브로커의 포트 번호입니다.
+ /// 데이터 버퍼링 시간(초). 0 이하로 설정 시 버퍼링 없이 직접 전파 모드로 동작합니다.
+ /// 버퍼링 모드에서 데이터를 전파할 주기(초)입니다.
+ public MqttWorker(string domain = "localhost", int port = 1883, float bufferDurationSec = 0f, float propagationIntervalSec = 1f)
+ {
+ this.domain = domain;
+ this.port = port;
+ this.bufferDurationSec = bufferDurationSec;
+ this.propagationIntervalSec = propagationIntervalSec;
+ this.IncomingMessageQueue = new ConcurrentQueue();
+ }
+
+ ///
+ /// 백그라운드 워커 스레드를 시작합니다.
+ ///
+ public void Start()
+ {
+ if (isRunning) return;
+ isRunning = true;
+ workerThread = new Thread(Run);
+ workerThread.IsBackground = true; // 메인 앱 종료 시 스레드 자동 종료
+ workerThread.Start();
+ }
+
+ ///
+ /// 백그라운드 워커 스레드를 중지합니다.
+ ///
+ public void Stop()
+ {
+ if (!isRunning) return;
+ isRunning = false;
+ cancellationTokenSource.Cancel();
+ workerThread?.Join(); // 스레드가 완전히 종료될 때까지 대기
+ }
+
+ ///
+ /// 워커를 중지하고 관련된 리소스를 모두 해제합니다.
+ ///
+ public void Dispose()
+ {
+ Stop();
+ cancellationTokenSource.Dispose();
+ }
+
+ ///
+ /// 특정 토픽에 대한 데이터 수신을 시작합니다.
+ /// 중요: 여기서 등록된 리스너(콜백)는 백그라운드 스레드에서 호출됩니다.
+ /// Unity API(GameObject, Transform 등)에 직접 접근하면 안 됩니다.
+ ///
+ public void AddListener(string topic, Action> listener)
+ {
+ lock (bufferAndListenerLock)
+ {
+ if (!listeners.ContainsKey(topic))
+ {
+ listeners[topic] = listener;
+ topicBuffers[topic] = new List();
+ }
+ else
+ {
+ listeners[topic] += listener;
+ }
+ }
+ }
+
+
+ ///
+ /// 지정된 토픽에서 리스너를 제거합니다.
+ ///
+ /// 지정된 리스너가 토픽의 마지막 리스너인 경우, 토픽은 내부 컬렉션에서 제거됩니다.
+ ///
+ /// 리스너를 제거할 토픽입니다. null이거나 비어 있을 수 없습니다.
+ /// 제거할 리스너로, 으로 표현됩니다. null일 수 없습니다.
+ public void RemoveListener(string topic, Action> listener)
+ {
+ lock (bufferAndListenerLock)
+ {
+ if (listeners.ContainsKey(topic))
+ {
+ listeners[topic] -= listener;
+ if (listeners[topic] == null || listeners[topic].GetInvocationList().Length == 0)
+ {
+ listeners.Remove(topic);
+ topicBuffers.Remove(topic);
+ }
+ }
+ }
+ }
+
+ ///
+ /// 백그라운드 스레드에서 실행되는 메인 루프입니다.
+ ///
+ private void Run()
+ {
+ Debug.Log("[Worker] 백그라운드 스레드 시작.");
+ // MQTT 서비스 객체를 생성하고 설정합니다.
+ MQTTService mqtt = new MQTTService(domain, port);
+
+ // AddListener를 통해 미리 등록된 토픽들을 구독합니다.
+ lock (bufferAndListenerLock)
+ {
+ foreach (var topic in listeners.Keys)
+ {
+ mqtt.AddTopicHandler(topic, OnMqttMessageReceived);
+ }
+ }
+ mqtt.Connect();
+
+ // 설정된 bufferDurationSec 값에 따라 동작 모드를 결정합니다.
+ if (bufferDurationSec > 0)
+ {
+ // 버퍼링 모드: 주기적으로 메시지를 처리하고 전파합니다.
+ RunBufferingLoop();
+ }
+ else
+ {
+ // 직접 전파 모드: 스레드를 대기 상태로 두고, 메시지 수신 시 직접 리스너를 호출합니다.
+ RunDirectPropagationLoop();
+ }
+
+ // MQTT 연결을 해제하고 관련 리소스를 정리합니다.
+ mqtt.Dispose();
+ // ------------------------------------
+ Debug.Log("[Worker] 백그라운드 스레드 종료.");
+ }
+
+ ///
+ /// 버퍼링 모드의 메인 루프입니다.
+ ///
+ private void RunBufferingLoop()
+ {
+ var cancellationToken = cancellationTokenSource.Token;
+ var propagationIntervalMs = (int)(propagationIntervalSec * 1000);
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ ProcessIncomingMessages();
+ PropagateBufferedData();
+
+ try
+ {
+ // 다음 전파 주기까지 대기합니다.
+ if (cancellationToken.WaitHandle.WaitOne(propagationIntervalMs))
+ {
+ break; // Stop() 호출 시 루프 종료
+ }
+ }
+ catch (ObjectDisposedException) { break; }
+ }
+ }
+
+ ///
+ /// 직접 전파 모드의 메인 루프입니다. Stop()이 호출될 때까지 스레드를 대기시킵니다.
+ ///
+ private void RunDirectPropagationLoop()
+ {
+ try
+ {
+ cancellationTokenSource.Token.WaitHandle.WaitOne();
+ }
+ catch (ObjectDisposedException)
+ {
+ // 정상적인 종료 과정이므로 예외를 무시합니다.
+ }
+ }
+
+ ///
+ /// MQTT 메시지가 수신될 때마다 MQTTService에 의해 호출되는 콜백 메서드입니다.
+ /// 이 메서드는 워커 스레드에서 실행됩니다.
+ ///
+ /// 메시지가 수신된 토픽
+ /// 수신된 메시지 내용
+ private void OnMqttMessageReceived(string topic, string message)
+ {
+ if (bufferDurationSec > 0)
+ {
+ // 버퍼링 모드: 메시지를 큐에 넣어 워커 스레드로 전달합니다.
+ IncomingMessageQueue.Enqueue(new MqttDataPacket(topic, message));
+ }
+ else
+ {
+ // 직접 전파 모드: 즉시 리스너를 호출합니다.
+ lock (bufferAndListenerLock)
+ {
+ listeners[topic]?.Invoke(new List() { new MqttDataPacket(topic, message) });
+ }
+ }
+ }
+
+ ///
+ /// [버퍼링 모드 전용] 큐의 메시지를 토픽별 버퍼로 옮깁니다.
+ ///
+ private void ProcessIncomingMessages()
+ {
+ // 큐에서 메시지를 가져와 처리
+ while (IncomingMessageQueue.TryDequeue(out MqttDataPacket packet))
+ {
+ lock (bufferAndListenerLock)
+ {
+ if (topicBuffers.ContainsKey(packet.Topic))
+ {
+ topicBuffers[packet.Topic].Add(packet);
+ }
+ }
+ }
+ }
+
+ ///
+ /// [버퍼링 모드 전용] 버퍼링된 데이터를 리스너에게 전파합니다.
+ ///
+ private void PropagateBufferedData()
+ {
+ DateTime cutoffTime = DateTime.UtcNow.AddSeconds(-bufferDurationSec);
+
+ lock (bufferAndListenerLock)
+ {
+ foreach (var topic in listeners.Keys.ToList())
+ {
+ if (!topicBuffers.ContainsKey(topic)) continue;
+
+ var buffer = topicBuffers[topic];
+ // 오래된 데이터 제거
+ buffer.RemoveAll(p => p.Timestamp < cutoffTime);
+
+ var newPackets = buffer.Where(p => !p.IsPropagated).ToList();
+
+ if (newPackets.Count > 0)
+ {
+ try
+ {
+ // 중요: 이 콜백은 Worker 스레드에서 직접 호출됩니다.
+ listeners[topic]?.Invoke(newPackets);
+ }
+ catch (Exception ex)
+ {
+ Debug.LogError($"[Worker] 리스너 실행 중 오류 발생 (Topic: {topic}): {ex.Message}");
+ }
+
+ foreach (var packet in newPackets)
+ {
+ packet.IsPropagated = true;
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/Assets/Scripts/UVC/Data/MqttWorker.cs.meta b/Assets/Scripts/UVC/Data/MqttWorker.cs.meta
new file mode 100644
index 00000000..bdccc173
--- /dev/null
+++ b/Assets/Scripts/UVC/Data/MqttWorker.cs.meta
@@ -0,0 +1,2 @@
+fileFormatVersion: 2
+guid: 4031acfadc3ee0a4a852aa2b0681f696
\ No newline at end of file
diff --git a/Assets/Scripts/UVC/Network/MQTTService.cs b/Assets/Scripts/UVC/Network/MQTTService.cs
index da766cfe..a6a1068a 100644
--- a/Assets/Scripts/UVC/Network/MQTTService.cs
+++ b/Assets/Scripts/UVC/Network/MQTTService.cs
@@ -79,7 +79,6 @@ namespace UVC.network
client.OnStateChanged += OnStateChangedMQTT;
client.OnDisconnect += OnDisconnectedMQTT;
client.OnError += OnErrorMQTT;
-
}
///
@@ -327,8 +326,19 @@ namespace UVC.network
{
if (topicHandler.TryGetValue(topic.Filter.OriginalFilter, out var handler))
{
- // 등록된 핸들러가 있으면 호출
- handler.Invoke(topic.Filter.OriginalFilter, payload);
+ // 메인 스레드에서 실행 중인지 확인합니다.
+ bool isMainThread = UniTask.SwitchToMainThread().GetAwaiter().IsCompleted;
+ Debug.Log($"MQTT OnTopic {topic.Filter.OriginalFilter} => {payload} isMainThread={isMainThread}");
+ if (isMainThread)
+ {
+ // 메인 스레드이므로 핸들러를 직접 호출합니다.
+ handler.Invoke(topic.Filter.OriginalFilter, payload);
+ }
+ else
+ {
+ // 백그라운드 스레드이므로 UniTask.Post를 사용하여 메인 스레드로 작업을 보냅니다.
+ UniTask.Post(() => handler.Invoke(topic.Filter.OriginalFilter, payload));
+ }
}
}
}