data package 정리

This commit is contained in:
logonkhi
2025-07-15 15:25:17 +09:00
parent 6bac7d53b1
commit cf97c6b61b
81 changed files with 909 additions and 466 deletions

View File

@@ -857,7 +857,7 @@ namespace SQLite4Unity3d
/// <example>
/// <code>
/// // SQL 쿼리 직접 실행
/// int rowsAffected = db.Execute("UPDATE Person SET Name = ? WHERE Id = ?", "홍길동", 1);
/// int rowsAffected = db.Start("UPDATE Person SET Name = ? WHERE Id = ?", "홍길동", 1);
/// Console.WriteLine($"업데이트된 행 수: {rowsAffected}");
/// </code>
/// </example>

View File

@@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: 0a05b40cec113104b822ea961797e6bf
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -5,7 +5,7 @@ using System.Collections.ObjectModel;
using System.Linq;
using UnityEngine;
namespace UVC.Data
namespace UVC.Data.Core
{
/// <summary>
/// DataObject 객체 컬렉션의 변경사항을 추적하는 데이터 배열 클래스
@@ -116,6 +116,35 @@ namespace UVC.Data
FromJArray(jArray);
}
/// <summary>
/// 배열의 JSON 문자열 표현을 사용하여 현재 <see cref="DataArray"/> 인스턴스를 채웁니다.
/// </summary>
/// <remarks>이 메서드는 제공된 JSON 문자열을 구문 분석하고 각 요소를 데이터 객체로 변환한 다음
/// 현재 <see cref="DataArray"/>에 추가합니다. <paramref name="jsonString"/>이 null이거나
/// 비어 있는 경우, 메서드는 수정 없이 현재 인스턴스를 반환합니다.</remarks>
/// <param name="jsonString">구문 분석하여 데이터 객체로 변환할 JSON 문자열입니다. 유효한 JSON 배열을 나타내야 합니다.</param>
/// <returns>JSON 문자열에서 파싱된 데이터 객체로 채워진 현재 <see cref="DataArray"/> 인스턴스입니다.</returns>
/// <exception cref="ArgumentException"> <paramref name="jsonString"/>이 유효한 JSON 배열 형식이 아닌 경우 발생합니다.</exception>
public DataArray FromJsonString(string jsonString)
{
if (!string.IsNullOrEmpty(jsonString))
{
try
{
JArray jArray = JArray.Parse(jsonString);
foreach (var item in jArray)
{
Add(ConvertToDataObject(item));
}
}
catch (Exception ex)
{
throw new ArgumentException("Invalid JSON string format.", nameof(jsonString), ex);
}
}
return this;
}
public DataArray FromCapacity(int capacity)
{
Capacity = capacity;

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 7e41ece4170d1944c8fd17dbc49e39c7

View File

@@ -2,7 +2,7 @@
using System.Collections.Concurrent;
using UnityEngine;
namespace UVC.Data
namespace UVC.Data.Core
{
/// <summary>
/// DataArray 객체의 재사용을 위한 풀링 클래스입니다.
@@ -73,12 +73,15 @@ namespace UVC.Data
{
if (array == null || array.IsInPool) return;
bool shouldReturnToPool;
lock (_statsLock)
{
_inUseCount--;
shouldReturnToPool = _pool.Count < _maxPoolSize;
}
if (_pool.Count < _maxPoolSize)
if (shouldReturnToPool)
{
array.Reset();
array.IsInPool = true;
@@ -93,7 +96,10 @@ namespace UVC.Data
/// <returns>풀 통계 (최대 사용량, 현재 사용량, 풀 비어있을 때 생성 횟수, 현재 풀 크기)</returns>
public static string GetStats()
{
return $"최대 사용량: {_peakUsage}, 현재 사용량: {_inUseCount}, 풀 비어있을 때 생성 횟수: {_poolMisses}, 현재 풀 크기: {_pool.Count}, Max Size: {_maxPoolSize}";
lock (_statsLock)
{
return $"최대 사용량: {_peakUsage}, 현재 사용량: {_inUseCount}, 풀 비어있을 때 생성 횟수: {_poolMisses}, 현재 풀 크기: {_pool.Count}, Max Size: {_maxPoolSize}";
}
}
/// <summary>

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: e0e552efbedfd6844ad5e456c56bd6c2

View File

@@ -1,6 +1,6 @@
using System.Collections.Generic;
namespace UVC.Data
namespace UVC.Data.Core
{
/// <summary>
/// DataMapper에서 사용되는 데이터 값 매핑을 위한 클래스입니다.

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 25291682863149443812c3e18313c553

View File

@@ -5,7 +5,7 @@ using System.Linq;
using System.Threading.Tasks;
using UVC.Log;
namespace UVC.Data
namespace UVC.Data.Core
{
/// <summary>
/// 서로 다른 JSON 데이터 구조 간에 매핑 기능을 제공하는 클래스입니다.

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 12218b58d1612044a8398ce54d327b67

View File

@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
namespace UVC.Data
namespace UVC.Data.Core
{
/// <summary>
/// JSON 데이터의 구조와 변환 규칙을 정의하는 마스크 클래스입니다.

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: c926ffeb681fcfd42a737feadd09bb28

View File

@@ -4,10 +4,9 @@ using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using UnityEngine;
using UVC.Extention;
namespace UVC.Data
namespace UVC.Data.Core
{
/// <summary>
/// 키-값 쌍의 데이터를 관리하고 변경 사항을 추적하는 동적 데이터 객체입니다.
@@ -62,11 +61,11 @@ namespace UVC.Data
isInPool = value;
foreach (var item in this)
{
if(item.Value is DataObject dataObject)
if (item.Value is DataObject dataObject)
{
dataObject.isInPool = value; // 내부 DataObject도 풀에 있다고 표시합니다.
}
else if(item.Value is DataArray dataArray)
else if (item.Value is DataArray dataArray)
{
dataArray.IsInPool = value; // 내부 DataArray도 풀에 있다고 표시합니다.
}
@@ -122,7 +121,7 @@ namespace UVC.Data
/// </summary>
public DataObject() { }
public DataObject(string jsonString): this(JObject.Parse(jsonString))
public DataObject(string jsonString) : this(JObject.Parse(jsonString))
{
}
@@ -142,7 +141,7 @@ namespace UVC.Data
var sourceObject = serializer.Deserialize<JObject>(reader);
// 수정된 코드: 생성자를 호출하는 대신 FromJObject 메서드를 사용
if(sourceObject != null) FromJObject(sourceObject);
if (sourceObject != null) FromJObject(sourceObject);
}
}
@@ -315,7 +314,7 @@ namespace UVC.Data
// 기존에 풀링 가능한 객체가 있었다면 풀에 반환합니다.
if (oldValue is DataObject oldDataObject)
{
if(oldDataObject.IsInPool) oldDataObject.ReturnToPool();
if (oldDataObject.IsInPool) oldDataObject.ReturnToPool();
}
else if (oldValue is DataArray oldDataArray)
{
@@ -633,7 +632,7 @@ namespace UVC.Data
// 제거된 객체가 풀링 가능한 경우 풀에 반환합니다.
if (oldValue is DataObject dataObject)
{
if(dataObject.IsInPool) dataObject.ReturnToPool();
if (dataObject.IsInPool) dataObject.ReturnToPool();
}
else if (oldValue is DataArray dataArray)
{
@@ -653,7 +652,7 @@ namespace UVC.Data
{
foreach (var value in Values.ToList())
{
if(value is DataObject dataObject)
if (value is DataObject dataObject)
{
dataObject.ReturnToPool(); // DataObject를 풀에 반환합니다.
}
@@ -700,8 +699,8 @@ namespace UVC.Data
/// <returns>현재 객체의 깊은 복사본인 새로운 <see cref="DataObject"/> 인스턴스를 반환합니다.</returns>
public DataObject Copy(bool fromPool = true)
{
DataObject clone = DataObjectPool.Get();
if(fromPool) clone = DataObjectPool.Get();
DataObject clone;
if (fromPool) clone = DataObjectPool.Get();
else clone = new DataObject();
clone.Name = Name;
clone.IdKey = IdKey;
@@ -826,7 +825,7 @@ namespace UVC.Data
{
if (this.ContainsKey(key))
{
if(this[key] is DataObject dataObject)
if (this[key] is DataObject dataObject)
{
updated[key] = dataObject.Copy(fromPool); // DataObject는 복사합니다.
}
@@ -878,6 +877,15 @@ namespace UVC.Data
return string.Join(", ", this.Select(kvp => $"{kvp.Key}:{kvp.Value}"));
}
/// <summary>
/// 현재 객체를 JSON 문자열 표현으로 변환합니다.
/// </summary>
/// <returns>현재 객체를 나타내는 JSON 문자열입니다.</returns>
public string ToJson()
{
return ToJObject().ToString(Newtonsoft.Json.Formatting.None);
}
}
}

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 3b57ffd33c60d9343b3f5ca203f1b793

View File

@@ -1,8 +1,6 @@
using System;
using System.Collections.Concurrent;
using UnityEngine;
using System.Collections.Concurrent;
namespace UVC.Data
namespace UVC.Data.Core
{
/// <summary>
/// DataObject 인스턴스를 재사용하기 위한 객체 풀 클래스입니다.
@@ -104,12 +102,15 @@ namespace UVC.Data
{
if (obj == null || obj.IsInPool) return;
bool shouldReturnToPool;
lock (_statsLock)
{
_inUseCount--;
shouldReturnToPool = pool.Count < maxPoolSize;
}
if (pool.Count < maxPoolSize)
if (shouldReturnToPool)
{
obj.Reset(); // 재사용 전 완벽한 초기화
obj.IsInPool = true;
@@ -124,7 +125,10 @@ namespace UVC.Data
/// <returns>풀 통계 (최대 사용량, 현재 사용량, 풀 비어있을 때 생성 횟수, 현재 풀 크기)</returns>
public static string GetStats()
{
return $"최대 사용량: {_peakUsage}, 현재 사용량: {_inUseCount}, 풀 비어있을 때 생성 횟수: {_poolMisses}, 현재 풀 크기: {pool.Count}, Max Size: {maxPoolSize}";
lock (_statsLock)
{
return $"최대 사용량: {_peakUsage}, 현재 사용량: {_inUseCount}, 풀 비어있을 때 생성 횟수: {_poolMisses}, 현재 풀 크기: {pool.Count}, Max Size: {maxPoolSize}";
}
}
/// <summary>

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: ff0561495cf616e4aa661b30f75d4651

View File

@@ -3,7 +3,7 @@ using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
namespace UVC.Data
namespace UVC.Data.Core
{
/// <summary>
/// JSON 데이터의 유효성을 검사하는 클래스입니다.

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 1464a011e89176348ac6c4438b23b5e1

View File

@@ -1,4 +1,4 @@
namespace UVC.Data
namespace UVC.Data.Core
{
/// <summary>
/// 서로 다른 JSON 데이터 구조 간에 매핑 기능을 제공하는 클래스입니다.

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 35540da191ee95e48a0205c46ec34499

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 5ea1772f89851a04fa8a65b43169561b

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: b61a9a50cc22ac946aa9e175cb06451c

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 0a87f36aa32645a40927e6e3ddfc1fc8

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: f9f3db6ee7bbb234982df4138828fe94

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 505d9da15c2b309419069d612aa06f15

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: ca43f1365775ee443b6290a4079de628

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: ccfcb80fa0900584b8569695e4fb8f21

View File

@@ -2,6 +2,9 @@
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using UVC.Data.Core;
using UVC.Data.Http;
using UVC.Data.Mqtt;
using UVC.Log;
namespace UVC.Data
@@ -24,7 +27,11 @@ namespace UVC.Data
/// <summary>
/// 외부에서의 인스턴스 생성을 방지하는 보호된 생성자입니다.
/// </summary>
protected DataRepository() { }
protected DataRepository()
{
// Best MQTT 초기화 작업을 Main 스레드에서 호출 해야 한다.
Best.HTTP.Shared.HTTPManager.Setup();
}
/// <summary>
/// DataRepository의 단일 인스턴스에 대한 접근자입니다.
/// </summary>
@@ -48,6 +55,13 @@ namespace UVC.Data
private Dictionary<string, Action<IDataObject>> dataUpdateHandlers = new Dictionary<string, Action<IDataObject>>();
private HttpDataFetcher httpFetcher = new HttpDataFetcher();
public HttpDataFetcher HttpFetcher => httpFetcher;
private MqttDataReceiver mqttReceiver = new MqttDataReceiver();
public MqttDataReceiver MqttReceiver => mqttReceiver;
/// <summary>
/// 저장소에 데이터 객체를 추가하거나 기존 객체를 업데이트합니다.
/// </summary>
@@ -55,7 +69,7 @@ namespace UVC.Data
/// <param name="dataObject">저장할 데이터 객체</param>
/// <param name="updatedDataOnly">true인 경우 업데이트된 속성만 반환, false인 경우 전체 객체 반환</param>
/// <returns>새로 추가된 객체 또는 업데이트된 기존 객체</returns>
public IDataObject AddOrUpdateData(string key, IDataObject dataObject, bool updatedDataOnly = true)
internal IDataObject AddOrUpdateData(string key, IDataObject dataObject, bool updatedDataOnly = true)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentNullException(nameof(key), "키는 null이거나 빈 문자열일 수 없습니다.");

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 74866795406b7c74a86591446c9b455a

View File

@@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: c9dc193db5888ec429275b344bc14b6c
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -9,19 +9,20 @@ using System.IO;
using System.Text;
using System.Threading;
using UnityEngine;
using UVC.Data.Core;
using UVC.Log;
using UVC.Network;
using UVC.Tests;
using UVC.Threading;
namespace UVC.Data
namespace UVC.Data.Http
{
/// <summary>
/// HTTP 요청 파이프라인을 관리하는 클래스입니다.
/// </summary>
/// <remarks>
/// 이 클래스는 HTTP 요청의 실행 및 반복 요청을 관리합니다.
/// 등록된 요청(HttpPipeLineInfo)을 키 값으로 관리하며,
/// 등록된 요청(HttpRequestConfig)을 키 값으로 관리하며,
/// 주기적 데이터 수집을 위한 반복 요청 기능을 제공합니다.
///
/// 주요 기능:
@@ -40,8 +41,8 @@ namespace UVC.Data
/// </remarks>
/// <example>
/// <code>
/// // HttpPipeLine 인스턴스 생성
/// var httpPipeline = new HttpPipeLine();
/// // HttpDataFetcher 인스턴스 생성
/// var httpFetcher = new HttpDataFetcher();
///
/// // 데이터 매퍼 설정 (응답 데이터 변환용)
/// var dataMask = new DataMask();
@@ -50,36 +51,36 @@ namespace UVC.Data
/// var dataMapper = new DataMapper(dataMask);
///
/// // 단일 요청 설정 및 등록
/// var singleRequest = new HttpPipeLineInfo("https://api.example.com/data")
/// var singleRequest = new HttpRequestConfig("https://api.example.com/data")
/// .setDataMapper(dataMapper)
/// .setHandler(data => {
/// // 데이터 처리 로직
/// ULog.Debug($"데이터 수신: {data?.ToString() ?? "null"}");
/// });
/// httpPipeline.Add("dataRequest", singleRequest);
/// httpFetcher.Add("dataRequest", singleRequest);
///
/// // 반복 요청 설정 및 등록
/// var repeatingRequest = new HttpPipeLineInfo("https://api.example.com/status")
/// var repeatingRequest = new HttpRequestConfig("https://api.example.com/status")
/// .setDataMapper(dataMapper)
/// .setHandler(data => {
/// // 상태 데이터 처리
/// ULog.Debug($"상태 업데이트: {data?.ToString() ?? "null"}");
/// })
/// .setRepeat(true, 0, 5000); // 5초마다 무한 반복
/// httpPipeline.Add("statusMonitor", repeatingRequest);
/// httpFetcher.Add("statusMonitor", repeatingRequest);
///
/// // 요청 실행
/// await httpPipeline.Excute("dataRequest"); // 단일 실행
/// await httpPipeline.Excute("statusMonitor"); // 반복 실행 시작
/// await httpFetcher.Excute("dataRequest"); // 단일 실행
/// await httpFetcher.Excute("statusMonitor"); // 반복 실행 시작
///
/// // 나중에 반복 요청 중지
/// httpPipeline.StopRepeat("statusMonitor");
/// httpFetcher.StopRepeat("statusMonitor");
///
/// // 더 이상 필요없는 요청 제거
/// httpPipeline.Remove("dataRequest");
/// httpFetcher.Remove("dataRequest");
/// </code>
/// </example>
public class HttpPipeLine
public class HttpDataFetcher
{
/// <summary>
/// 테스트를 위한 목업 모드 활성화 여부를 설정하거나 가져옵니다.
@@ -93,7 +94,7 @@ namespace UVC.Data
/// <summary>
/// 등록된 HTTP 파이프라인 정보를 저장하는 사전
/// </summary>
private Dictionary<string, HttpPipeLineInfo> infoList = new Dictionary<string, HttpPipeLineInfo>();
private Dictionary<string, HttpRequestConfig> infoList = new Dictionary<string, HttpRequestConfig>();
/// <summary>
/// 실행 중인 반복 작업의 취소 토큰을 관리하는 사전
/// </summary>
@@ -116,7 +117,7 @@ namespace UVC.Data
/// <remarks>
/// 동일한 키가 이미 존재하는 경우 새로운 정보로 대체됩니다.
/// </remarks>
public void Add(string key, HttpPipeLineInfo info)
public void Add(string key, HttpRequestConfig info)
{
if (!infoList.ContainsKey(key))
{
@@ -165,7 +166,7 @@ namespace UVC.Data
throw new KeyNotFoundException($"No HTTP request found with key '{key}'.");
}
HttpPipeLineInfo info = infoList[key];
HttpRequestConfig info = infoList[key];
// 반복 설정에 관계없이 이전에 실행 중인 반복 작업이 있다면 중지
await StopRepeat(key);
@@ -211,7 +212,7 @@ namespace UVC.Data
/// <exception cref="OperationCanceledException">작업이 취소된 경우 발생</exception>
/// <exception cref="JsonException">JSON 응답 파싱 중 오류가 발생한 경우</exception>
/// <exception cref="Exception">HTTP 요청 중 다른 예외가 발생한 경우</exception>
private async UniTask ExecuteSingle(string key, HttpPipeLineInfo info, CancellationToken cancellationToken = default)
private async UniTask ExecuteSingle(string key, HttpRequestConfig info, CancellationToken cancellationToken = default)
{
int retryCount = 0;
@@ -538,7 +539,7 @@ namespace UVC.Data
throw new KeyNotFoundException($"No HTTP request found with key '{key}'.");
}
HttpPipeLineInfo info = infoList[key];
HttpRequestConfig info = infoList[key];
if (!info.Repeat) return;
// 새 취소 토큰 생성
@@ -677,11 +678,11 @@ namespace UVC.Data
/// </remarks>
/// <example>
/// <code>
/// var httpPipeline = new HttpPipeLine();
/// var httpFetcher = new HttpDataFetcher();
/// // 파이프라인에 요청 추가 후...
///
/// // 모든 활성 요청 확인
/// var activeRequests = httpPipeline.GetActiveRequests();
/// var activeRequests = httpFetcher.GetActiveRequests();
/// foreach (var request in activeRequests)
/// {
/// ULog.Debug($"요청 키: {request.Key}, 활성 상태: {request.Value.IsActive}, " +
@@ -689,13 +690,13 @@ namespace UVC.Data
/// }
/// </code>
/// </example>
public Dictionary<string, HttpPipeLineRequestStatus> GetActiveRequests()
public Dictionary<string, HttpRequestStatus> GetActiveRequests()
{
var result = new Dictionary<string, HttpPipeLineRequestStatus>();
var result = new Dictionary<string, HttpRequestStatus>();
foreach (var key in infoList.Keys)
{
bool isRepeating = repeatTokenSources.ContainsKey(key);
result[key] = new HttpPipeLineRequestStatus
result[key] = new HttpRequestStatus
{
IsActive = isRepeating,
IsRepeating = isRepeating,
@@ -736,7 +737,7 @@ namespace UVC.Data
/// 이 클래스는 HTTP 파이프라인에 등록된 요청의 활성 상태, 반복 설정,
/// 반복 횟수, 반복 간격에 관한 정보를 제공합니다.
/// </remarks>
public class HttpPipeLineRequestStatus
public class HttpRequestStatus
{
/// <summary>
/// 요청이 현재 활성 상태인지 여부를 나타냅니다.

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 6475c685c25ddcd41ba12d969cc60238

View File

@@ -3,8 +3,9 @@
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using UVC.Data.Core;
namespace UVC.Data
namespace UVC.Data.Http
{
/// <summary>
/// HTTP 요청 파이프라인 정보를 관리하는 클래스입니다.
@@ -17,7 +18,7 @@ namespace UVC.Data
/// </remarks>
/// <example>
/// <code>
/// var pipelineInfo = new HttpPipeLineInfo("https://api.example.com/data", "GET")
/// var config = new HttpRequestConfig("https://api.example.com/data", "GET")
/// .setDataMapper(new DataMapper(dataMask))
/// .setSuccessHandler(data => Console.WriteLine(data)) // 성공 핸들러 예시
/// .setFailHandler(errorData => Console.Error.WriteLine(errorData)) // 실패 핸들러 예시
@@ -25,7 +26,7 @@ namespace UVC.Data
/// .setRepeat(true, 10, 5000);
/// </code>
/// </example>
public class HttpPipeLineInfo
public class HttpRequestConfig
{
private string _url;
private string _method;
@@ -68,7 +69,7 @@ namespace UVC.Data
/// });
///
/// // 파이프라인에 validator 설정
/// var pipeline = new HttpPipeLineInfo("https://api.weather.com/current")
/// var config = new HttpRequestConfig("https://api.weather.com/current")
/// .setValidator(validator)
/// .setSuccessHandler(data => {
/// // 여기에 전달되는 데이터는 이미 유효성 검사를 통과한 데이터만 포함됩니다.
@@ -160,13 +161,13 @@ namespace UVC.Data
public bool UpdatedDataOnly => _updatedDataOnly;
/// <summary>
/// HttpPipeLineInfo 클래스의 새 인스턴스를 초기화합니다.
/// HttpRequestConfig 클래스의 새 인스턴스를 초기화합니다.
/// </summary>
/// <param name="url">HTTP 요청을 보낼 URL</param>
/// <param name="method">HTTP 요청 메서드 (기본값: "post")</param>
/// <param name="headers">HTTP 요청 헤더 (선택사항)</param>
/// <param name="body">HTTP 요청 본문 (선택사항)</param>
public HttpPipeLineInfo(string url, string method = "post", Dictionary<string, string>? headers = null, string? body = null)
public HttpRequestConfig(string url, string method = "post", Dictionary<string, string>? headers = null, string? body = null)
{
_url = url;
_method = method;
@@ -179,8 +180,8 @@ namespace UVC.Data
/// 데이터 매퍼는 HTTP 응답을 IDataObject로 변환하는 역할을 합니다.
/// </summary>
/// <param name="dataMapper">사용할 데이터 매퍼 객체</param>
/// <returns>현재 HttpPipeLineInfo 인스턴스 (메서드 체이닝용)</returns>
public HttpPipeLineInfo setDataMapper(DataMapper dataMapper)
/// <returns>현재 HttpRequestConfig 인스턴스 (메서드 체이닝용)</returns>
public HttpRequestConfig setDataMapper(DataMapper dataMapper)
{
_dataMapper = dataMapper;
return this;
@@ -190,7 +191,7 @@ namespace UVC.Data
/// HTTP 응답 데이터의 유효성을 검사하는 검사기를 설정합니다.
/// </summary>
/// <param name="validator">HTTP 응답 데이터의 유효성 검사에 사용할 DataValidator 객체</param>
/// <returns>현재 HttpPipeLineInfo 인스턴스 (메서드 체이닝용)</returns>
/// <returns>현재 HttpRequestConfig 인스턴스 (메서드 체이닝용)</returns>
/// <remarks>
/// 이 메서드로 설정된 검사기는 HTTP 응답이 성공적으로 파싱된 후,
/// 성공 핸들러에 전달되기 전에 데이터의 유효성을 검사합니다.
@@ -217,7 +218,7 @@ namespace UVC.Data
/// });
///
/// // 3. 검사기를 HTTP 파이프라인에 설정
/// var pipelineInfo = new HttpPipeLineInfo("https://api.example.com/users", "get")
/// var pipelineInfo = new HttpRequestConfig("https://api.example.com/users", "get")
/// .setDataMapper(userDataMapper)
/// .setValidator(validator)
/// .setSuccessHandler(userData => {
@@ -229,7 +230,7 @@ namespace UVC.Data
/// });
/// </code>
/// </example>
public HttpPipeLineInfo setValidator(DataValidator validator)
public HttpRequestConfig setValidator(DataValidator validator)
{
this._validator = validator;
return this;
@@ -239,8 +240,8 @@ namespace UVC.Data
/// HTTP 파이프라인에 적용할 ResponseMask를 설정하고 업데이트된 파이프라인 구성을 반환합니다.
/// </summary>
/// <param name="responseMask">HTTP response에 적용할 <see cref="HttpResponseMask"/>입니다.</param>
/// <returns>지정된 response 마스크가 적용된 업데이트된 <see cref="HttpPipeLineInfo"/> 인스턴스입니다.</returns>
public HttpPipeLineInfo setResponseMask(HttpResponseMask responseMask)
/// <returns>지정된 response 마스크가 적용된 업데이트된 <see cref="HttpRequestConfig"/> 인스턴스입니다.</returns>
public HttpRequestConfig setResponseMask(HttpResponseMask responseMask)
{
_responseMask = responseMask;
return this;
@@ -252,8 +253,8 @@ namespace UVC.Data
/// UpdatedDataOnly=true일 경우 변경 된 데이터가 없으면 호출 되지 않습니다.
/// </summary>
/// <param name="handler">응답 데이터를 처리할 콜백 함수</param>
/// <returns>현재 HttpPipeLineInfo 인스턴스 (메서드 체이닝용)</returns>
public HttpPipeLineInfo setSuccessHandler(Action<IDataObject?>? handler)
/// <returns>현재 HttpRequestConfig 인스턴스 (메서드 체이닝용)</returns>
public HttpRequestConfig setSuccessHandler(Action<IDataObject?>? handler)
{
_successhandler = handler;
return this;
@@ -264,8 +265,8 @@ namespace UVC.Data
/// 실패 정보 또는 관련 데이터가 string 형태로 전달될 수 있습니다.
/// </summary>
/// <param name="handler">실패 정보를 처리할 콜백 함수입니다. 실패 시 관련 데이터를 인자로 받습니다.</param>
/// <returns>현재 HttpPipeLineInfo 인스턴스 (메서드 체이닝용)</returns>
public HttpPipeLineInfo setFailHandler(Action<string>? handler)
/// <returns>현재 HttpRequestConfig 인스턴스 (메서드 체이닝용)</returns>
public HttpRequestConfig setFailHandler(Action<string>? handler)
{
_failhandler = handler;
return this;
@@ -276,8 +277,8 @@ namespace UVC.Data
/// </summary>
/// <param name="maxRetryCount">최대 재시도 횟수 (기본값: 3)</param>
/// <param name="retryDelay">재시도 간 대기 시간(밀리초) (기본값: 1000)</param>
/// <returns>현재 HttpPipeLineInfo 인스턴스 (메서드 체이닝용)</returns>
public HttpPipeLineInfo setRetry(int maxRetryCount = 3, int retryDelay = 1000)
/// <returns>현재 HttpRequestConfig 인스턴스 (메서드 체이닝용)</returns>
public HttpRequestConfig setRetry(int maxRetryCount = 3, int retryDelay = 1000)
{
_maxRetryCount = maxRetryCount;
_retryDelay = retryDelay;
@@ -291,7 +292,7 @@ namespace UVC.Data
/// <param name="count">반복 횟수 (0은 무한 반복) (기본값: 0)</param>
/// <param name="interval">반복 실행 간 대기 시간(밀리초) (기본값: 1000)</param>
/// <param name="updatedDataOnly">변경된 데이터만 처리할지 여부 (기본값: true)</param>
/// <returns>현재 HttpPipeLineInfo 인스턴스 (메서드 체이닝용)</returns>
/// <returns>현재 HttpRequestConfig 인스턴스 (메서드 체이닝용)</returns>
/// <remarks>
/// 반복 요청 시 updatedDataOnly가 true인 경우, 서버 응답에서 데이터가 변경된 경우에만 핸들러가 호출됩니다.
/// 이는 불필요한 데이터 처리를 방지하고 성능을 향상시키는 데 도움이 됩니다.
@@ -299,17 +300,17 @@ namespace UVC.Data
/// <example>
/// <code>
/// // 5초마다 10번 반복 요청, 변경된 데이터만 처리
/// var pipelineInfo = new HttpPipeLineInfo("https://api.example.com/data", "GET")
/// var pipelineInfo = new HttpRequestConfig("https://api.example.com/data", "GET")
/// .setHandler(data => ProcessData(data))
/// .setRepeat(true, 10, 5000, true);
///
/// // 3초마다 무한 반복, 모든 응답 데이터 처리
/// var pipelineInfo = new HttpPipeLineInfo("https://api.example.com/status", "GET")
/// var pipelineInfo = new HttpRequestConfig("https://api.example.com/status", "GET")
/// .setHandler(data => UpdateStatus(data))
/// .setRepeat(true, 0, 3000, false);
/// </code>
/// </example>
public HttpPipeLineInfo setRepeat(bool repeat, int count = 0, int interval = 1000, bool updatedDataOnly = true)
public HttpRequestConfig setRepeat(bool repeat, int count = 0, int interval = 1000, bool updatedDataOnly = true)
{
_repeat = repeat;
_repeatCount = count;

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 7cf149dd7c328714c80abf40af9bd0b4

View File

@@ -1,7 +1,7 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace UVC.Data
namespace UVC.Data.Http
{
/// <summary>
/// HTTP 응답 문자열을 파싱하여 성공 여부를 확인하고 실제 데이터를 추출하는 규칙을 정의하는 클래스입니다.

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 4347b10495e6f0f4d952eefd7ad38c4c

View File

@@ -1,4 +1,4 @@
namespace UVC.Data
namespace UVC.Data.Http
{
/// <summary>
/// HttpResponseMask.Apply 메서드의 결과를 나타내는 클래스입니다.

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 7caaf4b007f811743a623cd526bdc35f

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: f0780f29b8d71ea46938f4804b5b7ca2

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 15e27eb73804cd548972be4bdd89a058

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 28af8b9febad2954a93c6bebda8d62ee

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 52872327cdcef7540be0704fa181c778

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 73168fc72a50dab44adc23095e0341be

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 7ed1a40f92147e942b095cef6d1263ca

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: d808f1610804b40429477dd3193c93e5

View File

@@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: 0f5efde6ce4a9b8409f53a6b056ca6ce
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,166 @@
#nullable enable
using NUnit.Framework;
using System;
using System.Collections.Generic;
using UVC.Data.Core;
namespace UVC.Data.Mqtt
{
/// <summary>
/// 수신된 단일 MQTT 메시지에 대한 모든 정보를 담는 데이터 컨테이너 클래스입니다.
/// 이 객체는 MqttWorker가 메시지를 수신했을 때 생성되며, MqttDataManager를 통해
/// 최종적으로 데이터 소비자에게 전달됩니다.
/// </summary>
/// <example>
/// <code>
/// // MqttDataPacket 객체 생성 예시
/// var packet = new MqttDataPacket("sensor/temp", "25.5");
///
/// // 데이터 접근 예시
/// Debug.Log($"수신 시간: {packet.Timestamp}");
/// Debug.Log($"토픽: {packet.Topic}");
/// Debug.Log($"내용: {packet.Payload}");
///
/// // MqttDataManager가 이 패킷을 리스너에게 전달한 후,
/// // IsPropagated를 true로 설정하여 중복 전송을 방지합니다.
/// packet.IsPropagated = true;
/// Debug.Log($"처리 완료 여부: {packet.IsPropagated}");
/// </code>
/// </example>
public class MqttDataPacket
{
public bool IsInPool { get; set; } = false;
/// <summary>
/// 데이터가 수신된 시간 (UTC 기준)입니다.
/// 전 세계 어디서든 동일한 시간 기록을 보장하기 위해 협정 세계시(UTC)를 사용합니다.
/// 'init' 키워드는 객체가 처음 생성될 때만 값을 할당할 수 있도록 하여, 데이터의 불변성을 보장합니다.
/// </summary>
public DateTime Timestamp { get; private set; }
/// <summary>
/// 메시지가 발행된 MQTT 토픽입니다. (예: "home/livingroom/light")
/// </summary>
public string Topic { get; private set; }
/// <summary>
/// 메시지의 실제 내용(데이터)입니다. 보통 JSON 형식의 문자열이 담깁니다.
/// </summary>
public string Payload { get; private set; }
/// <summary>
/// 이 데이터가 리스너에게 전파(전달)되었는지 여부를 나타내는 플래그입니다.
/// MqttDataManager가 이 값을 사용하여 동일한 데이터를 중복으로 전달하는 것을 방지합니다.
/// </summary>
public bool IsPropagated { get; set; } = false;
public MqttDataPacket()
{
// 기본 생성자 - 빈 객체 생성 시 사용
this.Timestamp = DateTime.UtcNow;
this.Topic = string.Empty;
this.Payload = string.Empty;
this.IsPropagated = false;
}
/// <summary>
/// DataObject 객체로부터 MqttDataPacket 인스턴스를 생성합니다.
/// TIMESTAMP 필드를 파싱하여 타임스탬프로 사용합니다.
/// </summary>
/// <param name="topic">메시지가 수신된 토픽</param>
/// <param name="dataObject">JSON 데이터를 담고 있는 DataObject</param>
public MqttDataPacket(string topic, DataObject dataObject)
{
this.Topic = topic;
this.Payload = dataObject.ToJson();
this.IsPropagated = false;
// TIMESTAMP 필드에서 시간 값을 파싱합니다. 실패 시 현재 UTC 시간을 사용합니다.
var timestampStr = dataObject.GetString("TIMESTAMP");
if (!string.IsNullOrEmpty(timestampStr) && DateTime.TryParse(timestampStr, null, System.Globalization.DateTimeStyles.AdjustToUniversal, out var parsedTimestamp))
{
this.Timestamp = parsedTimestamp;
}
else
{
this.Timestamp = DateTime.UtcNow;
}
}
/// <summary>
/// DataObject 객체로부터 MqttDataPacket 인스턴스를 설정합니다.
/// TIMESTAMP 필드를 파싱하여 타임스탬프로 사용합니다.
/// </summary>
/// <param name="topic">메시지가 수신된 토픽</param>
/// <param name="dataObject">JSON 데이터를 담고 있는 DataObject</param>
public MqttDataPacket FromDataObject(string topic, DataObject dataObject)
{
this.Topic = topic;
this.Payload = dataObject.ToJson();
this.IsPropagated = false;
// TIMESTAMP 필드에서 시간 값을 파싱합니다. 실패 시 현재 UTC 시간을 사용합니다.
var timestampStr = dataObject.GetString("TIMESTAMP");
if (!string.IsNullOrEmpty(timestampStr) && DateTime.TryParse(timestampStr, null, System.Globalization.DateTimeStyles.AdjustToUniversal, out var parsedTimestamp))
{
this.Timestamp = parsedTimestamp;
}
else
{
this.Timestamp = DateTime.UtcNow;
}
return this;
}
/// <summary>
/// 객체를 초기 상태로 재설정합니다.
/// </summary>
/// <remarks>이 메서드는 <see cref="Timestamp"/>를 현재 UTC 시간으로 설정하고, <see
/// cref="Topic"/> 및 <see cref="Payload"/> 문자열을 지우고, <see cref="IsPropagated"/>를 <see
/// langword="false"/>로 설정합니다.</remarks>
public void Reset()
{
// 객체를 초기 상태로 재설정합니다.
this.Timestamp = DateTime.UtcNow;
this.Topic = string.Empty;
this.Payload = string.Empty;
this.IsPropagated = false;
}
/// <summary>
/// 재사용을 위해 현재 인스턴스를 풀로 반환합니다.
/// </summary>
/// <remarks>이 메서드는 인스턴스가 더 이상 필요하지 않을 때 호출해야 하며, 이를 통해 인스턴스를 재사용하고
/// 새 인스턴스를 생성하는 오버헤드를 줄일 수 있습니다. 인스턴스를 풀로 반환하기 전에
/// 인스턴스가 유효한 상태인지 확인하십시오.</remarks>
public void ReturnToPool()
{
MqttDataPacketPool.Return(this);
}
/// <summary>
/// <see cref="MqttDataPacket"/> 객체 목록을 JSON 배열 문자열로 변환합니다.
/// </summary>
/// <param name="packets">변환할 <see cref="MqttDataPacket"/> 객체 목록입니다. null일 수 없습니다.</param>
/// <returns>제공된 패킷의 페이로드를 나타내는 JSON 배열 문자열입니다. 목록이 비어 있거나 null이면 "[]"를 반환합니다.
///</returns>
public static string ToJsonFromList(List<MqttDataPacket> packets)
{
if (packets == null || packets.Count == 0) return "[]";
var jsonArray = new System.Text.StringBuilder("[");
for (int i = 0; i < packets.Count; i++)
{
var packet = packets[i];
jsonArray.Append(packet.Payload);
if (i < packets.Count - 1)
{
jsonArray.Append(",");
}
}
jsonArray.Append("]");
return jsonArray.ToString();
}
}
}

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 31f9398e9f32c6246967b933160d036c

View File

@@ -0,0 +1,122 @@
using System.Collections.Concurrent;
namespace UVC.Data.Mqtt
{
public class MqttDataPacketPool
{
/// <summary>
/// MqttDataPacket 인스턴스를 저장하는 스레드 안전 큐입니다.
/// </summary>
private static ConcurrentQueue<MqttDataPacket> pool = new ConcurrentQueue<MqttDataPacket>();
/// <summary>
/// 풀의 최대 크기입니다. 이 크기를 초과하는 객체는 풀에 저장되지 않습니다.
/// </summary>
private static int maxPoolSize = 1000;
// --- 통계용 필드 ---
private static int _inUseCount = 0; // 현재 사용 중인 MqttDataPacket 인스턴스의 수
private static int _peakUsage = 0; // 최대 사용량 기록
private static int _poolMisses = 0; // 풀에서 객체를 찾지 못하고 새로 생성한 횟수
private static readonly object _statsLock = new object();
static MqttDataPacketPool()
{
// maxPoolSize만큼의 MqttDataPacket 인스턴스를 미리 생성하여 풀에 추가합니다.
for (int i = 0; i < maxPoolSize; i++)
{
pool.Enqueue(new MqttDataPacket() { IsInPool = true });
}
}
/// <summary>
/// 풀에서 MqttDataPacket 인스턴스를 가져옵니다.
/// 풀이 비어있으면 새 인스턴스를 생성하여 반환합니다.
/// </summary>
/// <returns>재사용 가능한 MqttDataPacket 인스턴스</returns>
public static MqttDataPacket Get()
{
bool fromPool = pool.TryDequeue(out var obj);
lock (_statsLock)
{
_inUseCount++;
if (_inUseCount > _peakUsage)
{
_peakUsage = _inUseCount;
}
// 풀이 비어있어서 새 객체를 만들어야 하는 경우
if (!fromPool)
{
_poolMisses++;
// 현재 사용량이 최대 풀 크기에 도달했는지 확인
if (_inUseCount > maxPoolSize)
{
int oldSize = maxPoolSize;
maxPoolSize += 1000;
//Debug.Log($"MqttDataPacketPool size automatically increased from {oldSize} to {maxPoolSize}. Peak usage: {_peakUsage}");
}
}
}
//if (_peakUsage % 100 == 0) Debug.Log($"MqttDataPacketPool stats: {GetStats()}");
if (fromPool)
{
obj.IsInPool = false;
}
return fromPool ? obj : new MqttDataPacket();
}
/// <summary>
/// 사용이 완료된 MqttDataPacket를 풀에 반환합니다.
/// 객체는 반환 전에 초기화되어 모든 속성이 제거됩니다.
/// </summary>
/// <param name="obj">풀에 반환할 MqttDataPacket 인스턴스</param>
public static void Return(MqttDataPacket obj)
{
if (obj == null || obj.IsInPool) return;
bool shouldReturnToPool;
lock (_statsLock)
{
_inUseCount--;
shouldReturnToPool = pool.Count < maxPoolSize;
}
if (shouldReturnToPool)
{
obj.Reset(); // 재사용 전 완벽한 초기화
obj.IsInPool = true;
pool.Enqueue(obj);
}
// 풀이 가득 차면 객체는 풀에 추가되지 않고 GC 대상이 됩니다.
}
/// <summary>
/// 풀의 현재 성능 통계를 문자열로 반환합니다.
/// </summary>
/// <returns>풀 통계 (최대 사용량, 현재 사용량, 풀 비어있을 때 생성 횟수, 현재 풀 크기)</returns>
public static string GetStats()
{
lock (_statsLock)
{
return $"최대 사용량: {_peakUsage}, 현재 사용량: {_inUseCount}, 풀 비어있을 때 생성 횟수: {_poolMisses}, 현재 풀 크기: {pool.Count}, Max Size: {maxPoolSize}";
}
}
/// <summary>
/// 풀 통계를 초기화합니다.
/// </summary>
public static void ResetStats()
{
lock (_statsLock)
{
_peakUsage = 0;
_poolMisses = 0;
}
}
}
}

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 5b4d67b55be14f54a8f27cc4558b711c

View File

@@ -5,12 +5,13 @@ using Newtonsoft.Json.Linq;
using SampleProject.Config;
using System;
using System.Collections.Generic;
using System.Text;
using UnityEngine;
using UVC.Data.Core;
using UVC.Log;
using UVC.network;
using UVC.Tests;
namespace UVC.Data
namespace UVC.Data.Mqtt
{
/// <summary>
/// MQTT 통신을 통해 데이터를 수신하고 처리하는 파이프라인을 관리하는 클래스입니다.
@@ -42,35 +43,35 @@ namespace UVC.Data
/// }
/// };
///
/// // 4. MQTTPipeLineInfo 생성 및 설정
/// var pipelineInfo = new MQTTPipeLineInfo("sensor/+/data")
/// // 4. MqttSubscriptionConfig 생성 및 설정
/// var pipelineInfo = new MqttSubscriptionConfig("sensor/+/data")
/// .setDataMapper(dataMapper)
/// .setHandler(dataHandler);
///
/// // 5. MQTTPipeLine 인스턴스 생성
/// var mqttPipeline = new MQTTPipeLine("mqtt.eclipseprojects.io", 1883);
/// // 5. MqttDataReceiver 인스턴스 생성
/// var mqttReceiver = new MqttDataReceiver("mqtt.eclipseprojects.io", 1883);
///
/// // 6. 파이프라인 정보 추가
/// mqttPipeline.Add(pipelineInfo);
/// mqttReceiver.Add(pipelineInfo);
///
/// // 7. 파이프라인 실행
/// mqttPipeline.Execute();
/// mqttReceiver.Start();
///
/// // ... 애플리케이션 로직 수행 ...
///
/// // 8. 파이프라인 중지 및 리소스 해제
/// mqttPipeline.Stop();
/// mqttPipeline.Dispose();
/// mqttReceiver.Stop();
/// mqttReceiver.Dispose();
/// </code>
/// </example>
public class MQTTPipeLine
public class MqttDataReceiver
{
/// <summary>
/// 테스트를 위한 목업 모드 활성화 여부를 설정하거나 가져옵니다.
/// </summary>
/// <remarks>
/// true로 설정하면 실제 MQTT 요청 대신 MQTTPipeLine를 사용합니다.
/// true로 설정하면 실제 MQTT 요청 대신 MockMQTTService를 사용합니다.
/// 테스트 환경에서 외부 의존성 없이 MQTT 통신을 시뮬레이션할 때 유용합니다.
/// </remarks>
public bool UseMockup { get; internal set; } = false;
@@ -78,38 +79,43 @@ namespace UVC.Data
/// <summary>
/// MQTT 브로커의 도메인 주소
/// </summary>
private string domain;
private string domain = "localhost";
/// <summary>
/// MQTT 브로커의 포트 번호
/// </summary>
public int port;
private int port = 1883;
private List<string> topics = new List<string>();
/// <summary>
/// 토픽별 파이프라인 정보를 저장하는 딕셔너리
/// </summary>
private Dictionary<string, MQTTPipeLineInfo> infoList;
private Dictionary<string, MqttSubscriptionConfig> infoList;
/// <summary>
/// MQTT 통신을 처리하는 서비스 객체
/// </summary>
private MQTTService mqtt;
private MqttWorker mqttWorker;
private MockMQTTService? mockupMQTT;
/// <summary>
/// MQTTPipeLine 인스턴스를 생성합니다.
/// MqttDataReceiver 인스턴스를 생성합니다.
/// </summary>
public MqttDataReceiver()
{
mqttWorker = new MqttWorker();
infoList = new Dictionary<string, MqttSubscriptionConfig>();
}
/// <summary>
/// MQTT 연결의 도메인과 포트를 설정합니다.
/// </summary>
/// <param name="domain">MQTT 브로커의 도메인 주소, 기본값은 "localhost"입니다.</param>
/// <param name="port">MQTT 브로커의 포트 번호, 기본값은 1883입니다.</param>
public MQTTPipeLine(string domain = "localhost", int port = 1883)
public void SetDomainPort(string domain, int port)
{
this.domain = string.IsNullOrEmpty(domain) ? Constants.MQTT_DOMAIN : domain;
this.port = port;
mqtt = new MQTTService(this.domain, this.port);
infoList = new Dictionary<string, MQTTPipeLineInfo>();
mqttWorker.SetDomainPort(this.domain, this.port);
}
/// <summary>
@@ -129,11 +135,11 @@ namespace UVC.Data
/// <summary>
/// 토픽에 대한 파이프라인 정보를 추가합니다.
/// </summary>
/// <param name="info">추가할 MQTTPipeLineInfo 객체</param>
/// <param name="info">추가할 MqttSubscriptionConfig 객체</param>
/// <remarks>
/// 동일한 토픽에 대한 정보가 이미 존재하는 경우 덮어씁니다.
/// </remarks>
public void Add(MQTTPipeLineInfo info)
public void Add(MqttSubscriptionConfig info)
{
if (!infoList.ContainsKey(info.Topic))
{
@@ -160,15 +166,15 @@ namespace UVC.Data
/// <summary>
/// 파이프라인을 실행하여 MQTT 브로커에 연결하고 등록된 모든 토픽을 구독합니다.
/// </summary>
public void Execute()
public void Start()
{
if (!UseMockup)
{
foreach (var topic in topics)
{
mqtt.AddTopicHandler(topic, OnTopicMessage);
mqttWorker.AddListener(topic, OnTopicPacketMessage);
}
mqtt.Connect();
mqttWorker.Start();
}
else
{
@@ -182,6 +188,38 @@ namespace UVC.Data
}
}
/// <summary>
/// 토픽에서 수신된 MQTT 데이터 패킷 목록을 처리합니다.
/// </summary>
/// <remarks>이 메서드는 구독된 MQTT 토픽에서 메시지가 수신될 때 호출됩니다.
/// 이 메서드를 호출하기 전에 <paramref name="packets"/> 매개변수가 null이 아닌지 확인하십시오.</remarks>
/// <param name="packets">처리할 <see cref="MqttDataPacket"/> 객체 목록입니다. null일 수 없습니다.</param>
private void OnTopicPacketMessage(string topic, List<MqttDataPacket> packets)
{
OnTopicMessage(topic, MqttDataPacket.ToJsonFromList(packets));
}
private void OnTopicMessage(string topic, string message)
{
bool isMainThread = PlayerLoopHelper.IsMainThread;
//Debug.Log($"OnTopicMessage isMainThread: {isMainThread}, topic: {topic}");
if (isMainThread)
{
// 메시지 처리를 백그라운드 스레드에서 실행하여 메인 스레드 부하를 줄입니다.
UniTask.RunOnThreadPool(() =>
{
OnTopicMessageLogic(topic, message);
}).Forget();
}
else
{
OnTopicMessageLogic(topic, message);
}
}
/// <summary>
/// MQTT 토픽으로 메시지가 수신되었을 때 호출되는 콜백 메서드입니다.
/// </summary>
@@ -191,88 +229,84 @@ namespace UVC.Data
/// 이 메서드는 수신된 메시지의 형식(JSON 객체 또는 배열)에 따라 적절한 파싱을 수행하고,
/// 등록된 데이터 매퍼를 통해 메시지를 변환한 후, 해당 토픽에 등록된 핸들러에게 전달합니다.
/// 'UpdatedDataOnly' 설정에 따라 데이터가 변경된 경우에만 핸들러를 호출할 수도 있습니다.
/// 메시지 처리는 백그라운드 스레드에서 수행되며, 핸들러는 메인 스레드에서 호출됩니다.
/// </remarks>
private void OnTopicMessage(string topic, string message)
private void OnTopicMessageLogic(string topic, string message)
{
// 메시지 처리를 백그라운드 스레드에서 실행하여 메인 스레드 부하를 줄입니다.
UniTask.RunOnThreadPool(() =>
// 토픽이 infoList와 readyHandlerList에 존재하고, 준비 상태가 true인 경우에만 처리합니다.
if (infoList.ContainsKey(topic))
{
// 토픽이 infoList와 readyHandlerList에 존재하고, 준비 상태가 true인 경우에만 처리합니다.
if (infoList.ContainsKey(topic))
MqttSubscriptionConfig info = infoList[topic];
IDataObject? mappedObject = null;
message = message.Trim();
if (!string.IsNullOrEmpty(message))
{
MQTTPipeLineInfo info = infoList[topic];
IDataObject? mappedObject = null;
message = message.Trim();
if (!string.IsNullOrEmpty(message))
try
{
try
if (message.StartsWith("{"))
{
if (message.StartsWith("{"))
JObject source = JObject.Parse(message);
if (info.Validator != null && !info.Validator.IsValid(source)) return;
if (info.DataMapper != null)
{
JObject source = JObject.Parse(message);
if (info.Validator != null && !info.Validator.IsValid(source)) return;
if (info.DataMapper != null)
{
mappedObject = info.DataMapper.Map(source);
}
else {
// DataMapper가 없으면 JObject를 IDataObject로 변환
mappedObject = new DataObject(source);
}
mappedObject = info.DataMapper.Map(source);
}
else if (message.StartsWith("["))
else
{
JArray source = JArray.Parse(message);
if (info.Validator != null)
{
JArray? validSource = info.Validator.GetValidData(source);
// 유효성 검사 실패 시 핸들러 호출을 중단
if (validSource == null || validSource.Count == 0) return;
source = validSource;
}
if (info.DataMapper != null)
{
mappedObject = info.DataMapper.Map(source);
}
else
{
// DataMapper가 없으면 JArray를 IDataObject로 변환
mappedObject = new DataArray(source);
}
// DataMapper가 없으면 JObject를 IDataObject로 변환
mappedObject = new DataObject(source);
}
}
else if (message.StartsWith("["))
{
JArray source = JArray.Parse(message);
if (info.Validator != null)
{
JArray? validSource = info.Validator.GetValidData(source);
// 유효성 검사 실패 시 핸들러 호출을 중단
if (validSource == null || validSource.Count == 0) return;
source = validSource;
}
if (info.DataMapper != null)
{
mappedObject = info.DataMapper.Map(source);
}
else
{
// DataMapper가 없으면 JArray를 IDataObject로 변환
mappedObject = new DataArray(source);
}
}
if (mappedObject == null) return;
// DataRepository는 내부적으로 데이터를 복사/업데이트하므로, mappedObject는 여기서 임시 객체가 됩니다.
var repoObject = DataRepository.Instance.AddOrUpdateData(topic, mappedObject, info.UpdatedDataOnly);
if(repoObject == mappedObject) repoObject = mappedObject.Clone(fromPool: false);
// 핸들러 호출이 필요한지 확인
bool shouldInvoke = !info.UpdatedDataOnly || (repoObject != null && repoObject.UpdatedCount > 0);
if (shouldInvoke)
{
var handlerData = repoObject;
// 핸들러를 메인 스레드에서 안전하게 호출
UniTask.Post(() => info.Handler?.Invoke(handlerData));
}
}
catch (Exception ex)
if (mappedObject == null) return;
// DataRepository는 내부적으로 데이터를 복사/업데이트하므로, mappedObject는 여기서 임시 객체가 됩니다.
var repoObject = DataRepository.Instance.AddOrUpdateData(topic, mappedObject, info.UpdatedDataOnly);
if (repoObject == mappedObject) repoObject = mappedObject.Clone(fromPool: false);
// 핸들러 호출이 필요한지 확인
bool shouldInvoke = !info.UpdatedDataOnly || (repoObject != null && repoObject.UpdatedCount > 0);
if (shouldInvoke)
{
// 예외 발생 시 로깅 또는 처리
// 예외 로깅도 메인 스레드에서 처리하여 Unity API 호출에 대한 스레드 안정성 확보
//UniTask.Post(() => ULog.Error($"Error processing message for topic '{topic}': {ex.Message}", ex));
ULog.Error($"Error processing message for topic '{topic}': {ex.Message}", ex);
var handlerData = repoObject;
// 핸들러를 메인 스레드에서 안전하게 호출
UniTask.Post(() => info.Handler?.Invoke(handlerData));
}
finally
}
catch (Exception ex)
{
// 예외 발생 시 로깅 또는 처리
// 예외 로깅도 메인 스레드에서 처리하여 Unity API 호출에 대한 스레드 안정성 확보
//UniTask.Post(() => ULog.Error($"Error processing message for topic '{topic}': {ex.Message}", ex));
ULog.Error($"Error processing message for topic '{topic}': {ex.Message}", ex);
}
finally
{
// DataMapper가 생성한 임시 객체를 풀에 반환합니다.
if (mappedObject != null)
{
// DataMapper가 생성한 임시 객체를 풀에 반환합니다.
if (mappedObject != null)
{
mappedObject.ReturnToPool();
}
mappedObject.ReturnToPool();
}
}
}
}).Forget();
}
}
/// <summary>
@@ -284,9 +318,9 @@ namespace UVC.Data
{
foreach (var topic in infoList.Keys)
{
mqtt.RemoveTopicHandler(topic, OnTopicMessage);
mqttWorker.RemoveListener(topic, OnTopicPacketMessage);
}
mqtt.Disconnect();
mqttWorker.Stop();
}
else
{
@@ -302,7 +336,7 @@ namespace UVC.Data
/// <see cref="Dispose"/>를 호출한 후에는 해당 인스턴스를 더 이상 사용할 수 없습니다.</remarks>
public void Dispose()
{
if (!UseMockup) mqtt.Disconnect();
if (!UseMockup) mqttWorker.Dispose();
else mockupMQTT?.Disconnect();
infoList.Clear();
}

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 4c2ea2ec74a1124499f9407837919de5

View File

@@ -1,8 +1,9 @@
#nullable enable
using System;
using UVC.Data.Core;
namespace UVC.Data
namespace UVC.Data.Mqtt
{
/// <summary>
/// MQTT 파이프라인 정보를 관리하는 클래스입니다.
@@ -20,12 +21,12 @@ namespace UVC.Data
/// dataMask["humidity"] = 0.0;
/// dataMask["timestamp"] = DateTime.Now;
///
/// var pipelineInfo = new MQTTPipeLineInfo("device/status")
/// var config = new MqttSubscriptionConfig("device/status")
/// .setDataMapper(new DataMapper(dataMask))
/// .setHandler(data => Console.WriteLine(data));
/// </code>
/// </example>
public class MQTTPipeLineInfo
public class MqttSubscriptionConfig
{
private string _topic; // MQTT 토픽
private Action<IDataObject?>? _handler = null; // 메시지 핸들러
@@ -55,7 +56,7 @@ namespace UVC.Data
/// value is double humidity && humidity >= 0 && humidity <= 100);
///
/// // 파이프라인에 validator 설정
/// var pipelineInfo = new MQTTPipeLineInfo("sensors/readings")
/// var config = new MqttSubscriptionConfig("sensors/readings")
/// .setDataMapper(dataMapper)
/// .setValidator(validator)
/// .setHandler(data => ProcessValidSensorData(data));
@@ -89,7 +90,7 @@ namespace UVC.Data
public bool UpdatedDataOnly => _updatedDataOnly;
/// <summary>
/// MQTTPipeLineInfo 클래스의 새 인스턴스를 초기화합니다.
/// MqttSubscriptionConfig 클래스의 새 인스턴스를 초기화합니다.
/// </summary>
/// <param name="topic">구독할 MQTT 토픽</param>
/// <param name="updatedDataOnly">변경된 데이터만 처리할지 여부 (기본값: true)</param>
@@ -97,7 +98,7 @@ namespace UVC.Data
/// updatedDataOnly가 true인 경우, 이전 데이터와 동일한 메시지는 핸들러에 전달되지 않습니다.
/// 이는 불필요한 데이터 처리를 방지하고 성능을 향상시킵니다.
/// </remarks>
public MQTTPipeLineInfo(string topic, bool updatedDataOnly = true)
public MqttSubscriptionConfig(string topic, bool updatedDataOnly = true)
{
_topic = topic;
_updatedDataOnly = updatedDataOnly;
@@ -107,12 +108,12 @@ namespace UVC.Data
/// MQTT 메시지를 수신했을 때 호출될 핸들러를 설정합니다.
/// </summary>
/// <param name="handler">메시지 데이터를 처리할 콜백 함수</param>
/// <returns>현재 MQTTPipeLineInfo 인스턴스 (메서드 체이닝용)</returns>
/// <returns>현재 MqttSubscriptionConfig 인스턴스 (메서드 체이닝용)</returns>
/// <remarks>
/// 핸들러는 메시지가 수신되고 DataMapper에 의해 변환된 후 호출됩니다.
/// UpdatedDataOnly 속성이 true인 경우, 데이터가 변경된 경우에만 호출됩니다.
/// </remarks>
public MQTTPipeLineInfo setHandler(Action<IDataObject?> handler)
public MqttSubscriptionConfig setHandler(Action<IDataObject?> handler)
{
_handler = handler;
return this;
@@ -122,7 +123,7 @@ namespace UVC.Data
/// MQTT 메시지 데이터를 처리할 데이터 매퍼를 설정합니다.
/// </summary>
/// <param name="dataMapper">사용할 데이터 매퍼 객체</param>
/// <returns>현재 MQTTPipeLineInfo 인스턴스 (메서드 체이닝용)</returns>
/// <returns>현재 MqttSubscriptionConfig 인스턴스 (메서드 체이닝용)</returns>
/// <remarks>
/// 데이터 매퍼는 수신된 MQTT 메시지(JSON 형식)를 IDataObject로 변환하는 역할을 합니다.
/// DataMask를 사용하여 특정 필드에 대한 타입 변환 및 필드 이름 매핑을 처리할 수 있습니다.
@@ -135,12 +136,12 @@ namespace UVC.Data
/// dataMask["humidity"] = 0; // 정수 타입 지정
/// dataMask["timestamp"] = ""; // 문자열 타입 지정
///
/// var pipelineInfo = new MQTTPipeLineInfo("sensor/data")
/// var pipelineInfo = new MqttSubscriptionConfig("sensor/data")
/// .setDataMapper(new DataMapper(dataMask))
/// .setHandler(data => ProcessSensorData(data));
/// </code>
/// </example>
public MQTTPipeLineInfo setDataMapper(DataMapper dataMapper)
public MqttSubscriptionConfig setDataMapper(DataMapper dataMapper)
{
_dataMapper = dataMapper;
return this;
@@ -151,7 +152,7 @@ namespace UVC.Data
/// MQTT 메시지 데이터의 유효성을 검사하는 데 사용할 Validator를 설정합니다.
/// </summary>
/// <param name="validator">데이터 유효성 검사기</param>
/// <returns>현재 MQTTPipeLineInfo 인스턴스 (메서드 체이닝용)</returns>
/// <returns>현재 MqttSubscriptionConfig 인스턴스 (메서드 체이닝용)</returns>
/// <remarks>
/// 유효성 검사기는 수신된 MQTT 메시지 데이터가 유효한지 확인하는 역할을 합니다.
/// 데이터가 특정 조건을 만족하는지 확인하고, 유효하지 않은 메시지는 필터링할 수 있습니다.
@@ -175,7 +176,7 @@ namespace UVC.Data
/// value is string id && !string.IsNullOrEmpty(id));
///
/// // Validator를 파이프라인에 설정
/// var pipelineInfo = new MQTTPipeLineInfo("sensors/data")
/// var pipelineInfo = new MqttSubscriptionConfig("sensors/data")
/// .setDataMapper(dataMapper)
/// .setValidator(_validator)
/// .setHandler(data => {
@@ -184,7 +185,7 @@ namespace UVC.Data
/// });
/// </code>
/// </example>
public MQTTPipeLineInfo setValidator(DataValidator validator)
public MqttSubscriptionConfig setValidator(DataValidator validator)
{
this._validator = validator;
return this;

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 4bd348b83dd87f34fb777bddaa693839

View File

@@ -1,12 +1,15 @@
using System;
using Cysharp.Threading.Tasks;
using NUnit.Framework;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using UnityEngine;
using UVC.Data.Core;
using UVC.network;
namespace UVC.Data
namespace UVC.Data.Mqtt
{
/// <summary>
/// 백그라운드 스레드에서 MQTT 통신, 데이터 처리 및 전파를 모두 담당하는 독립적인 워커 클래스입니다.
@@ -41,39 +44,56 @@ namespace UVC.Data
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly Dictionary<string, List<MqttDataPacket>> topicBuffers = new Dictionary<string, List<MqttDataPacket>>();
private readonly Dictionary<string, Action<List<MqttDataPacket>>> listeners = new Dictionary<string, Action<List<MqttDataPacket>>>();
private readonly Dictionary<string, Action<string, List<MqttDataPacket>>> listeners = new Dictionary<string, Action<string, List<MqttDataPacket>>>();
private readonly object bufferAndListenerLock = new object();
/// <summary>
/// 데이터를 버퍼에 보관할 최대 시간(초)입니다. 이 시간이 지난 데이터는 폐기됩니다.
/// 이 값이 0보다 크면 버퍼링 모드로 동작합니다.
/// </summary>
private float bufferDurationSec;
private float bufferDurationSec = 0f;
/// <summary>
/// [버퍼링 모드 전용] 버퍼링된 데이터를 리스너에게 전파하는 주기(초)입니다.
/// </summary>
private float propagationIntervalSec;
private float propagationIntervalSec = 1f;
private string domain;
private int port;
private string domain = "localhost";
private int port = 1883;
/// <summary>
/// MqttWorker의 생성자입니다.
/// </summary>
public MqttWorker() { }
/// <summary>
/// 현재 구성의 도메인과 포트를 설정합니다.
/// </summary>
/// <remarks>이 메서드는 현재 구성에 사용되는 도메인 및 포트 값을 업데이트합니다.
/// 도메인이 유효한 DNS 이름이고 포트가 유효한 네트워크 포트 범위 내에 있는지 확인합니다.
///</remarks>
/// <param name="domain">접속할 MQTT 브로커의 도메인 주소입니다.</param>
/// <param name="port">접속할 MQTT 브로커의 포트 번호입니다.</param>
/// <param name="bufferDurationSec">데이터 버퍼링 시간(초). 0 이하로 설정 시 버퍼링 없이 직접 전파 모드로 동작합니다.</param>
/// <param name="propagationIntervalSec">버퍼링 모드에서 데이터를 전파할 주기(초)입니다.</param>
public MqttWorker(string domain = "localhost", int port = 1883, float bufferDurationSec = 0f, float propagationIntervalSec = 1f)
public void SetDomainPort(string domain, int port)
{
this.domain = domain;
this.port = port;
this.bufferDurationSec = bufferDurationSec;
this.propagationIntervalSec = propagationIntervalSec;
this.IncomingMessageQueue = new ConcurrentQueue<MqttDataPacket>();
}
/// <summary>
/// 버퍼 전파의 지속 시간과 간격을 설정합니다.
/// </summary>
/// <remarks>이 메서드는 버퍼 작업에 대한 타이밍 매개변수를 구성합니다. 예기치 않은 동작을 방지하려면 두 매개변수 모두
/// 양수여야 합니다.</remarks>
/// <param name="durationSec">데이터 버퍼링 시간(초). 0 이하로 설정 시 버퍼링 없이 직접 전파 모드로 동작합니다.</param>
/// <<param name="intervalSec">버퍼링 모드에서 데이터를 전파할 주기(초)입니다.</param>
public void SetDurationInterval(float durationSec, float intervalSec)
{
this.bufferDurationSec = durationSec;
this.propagationIntervalSec = intervalSec;
}
/// <summary>
/// 백그라운드 워커 스레드를 시작합니다.
/// </summary>
@@ -111,7 +131,7 @@ namespace UVC.Data
/// 중요: 여기서 등록된 리스너(콜백)는 백그라운드 스레드에서 호출됩니다.
/// Unity API(GameObject, Transform 등)에 직접 접근하면 안 됩니다.
/// </summary>
public void AddListener(string topic, Action<List<MqttDataPacket>> listener)
public void AddListener(string topic, Action<string, List<MqttDataPacket>> listener)
{
lock (bufferAndListenerLock)
{
@@ -134,8 +154,8 @@ namespace UVC.Data
/// <remarks>지정된 리스너가 토픽의 마지막 리스너인 경우, 토픽은 내부 컬렉션에서 제거됩니다.
///</remarks>
/// <param name="topic">리스너를 제거할 토픽입니다. null이거나 비어 있을 수 없습니다.</param>
/// <param name="listener">제거할 리스너로, <see cref="Action{List{MqttDataPacket}}"/>으로 표현됩니다. null일 수 없습니다.</param>
public void RemoveListener(string topic, Action<List<MqttDataPacket>> listener)
/// <param name="listener">제거할 리스너로, <see cref="Action{string, List{MqttDataPacket}}"/>으로 표현됩니다. null일 수 없습니다.</param>
public void RemoveListener(string topic, Action<string, List<MqttDataPacket>> listener)
{
lock (bufferAndListenerLock)
{
@@ -156,7 +176,8 @@ namespace UVC.Data
/// </summary>
private void Run()
{
Debug.Log("[Worker] 백그라운드 스레드 시작.");
bool isMainThread = PlayerLoopHelper.IsMainThread;
Debug.Log($"[Worker] 백그라운드 스레드 시작. isMainThread:{isMainThread}");
// MQTT 서비스 객체를 생성하고 설정합니다.
MQTTService mqtt = new MQTTService(domain, port);
@@ -236,17 +257,50 @@ namespace UVC.Data
/// <param name="message">수신된 메시지 내용</param>
private void OnMqttMessageReceived(string topic, string message)
{
bool isMainThread = PlayerLoopHelper.IsMainThread;
//Debug.Log($"OnMqttMessageReceived isMainThread: {isMainThread}, topic: {topic}");
if (bufferDurationSec > 0)
{
// 버퍼링 모드: 메시지를 큐에 넣어 워커 스레드로 전달합니다.
IncomingMessageQueue.Enqueue(new MqttDataPacket(topic, message));
try
{
var dataArray = DataArrayPool.Get().FromJsonString(message);
foreach (var dataObject in dataArray)
{
IncomingMessageQueue.Enqueue(MqttDataPacketPool.Get().FromDataObject(topic, dataObject));
}
dataArray.ReturnToPool();
}
catch (Exception ex)
{
Debug.LogError($"[Worker] 주제 {topic}에 대한 수신 JSON 배열을 구문 분석하지 못했습니다. 오류: {ex.Message}");
}
}
else
{
// 직접 전파 모드: 즉시 리스너를 호출합니다.
lock (bufferAndListenerLock)
{
listeners[topic]?.Invoke(new List<MqttDataPacket>() { new MqttDataPacket(topic, message) });
if (listeners[topic] != null)
{
try
{
var list = new List<MqttDataPacket>();
var dataArray = DataArrayPool.Get().FromJsonString(message);
foreach (var dataObject in dataArray)
{
list.Add(new MqttDataPacket(topic, dataObject));
}
// 직접 전파 모드에서도 타임스탬프 순으로 정렬
list.Sort((p1, p2) => p1.Timestamp.CompareTo(p2.Timestamp));
listeners[topic].Invoke(topic, list);
dataArray.ReturnToPool();
}
catch (Exception ex)
{
Debug.LogError($"[Worker] 주제 {topic}에 대한 수신 JSON 배열을 구문 분석하지 못했습니다. 오류: {ex.Message}");
}
}
}
}
}
@@ -283,17 +337,31 @@ namespace UVC.Data
if (!topicBuffers.ContainsKey(topic)) continue;
var buffer = topicBuffers[topic];
// 오래된 데이터 제거
buffer.RemoveAll(p => p.Timestamp < cutoffTime);
// 오래된 데이터 풀에 반환 및 버퍼에서 제거
var outdatedPackets = buffer.Where(p => p.Timestamp < cutoffTime).ToList();
if (outdatedPackets.Count > 0)
{
foreach (var packet in outdatedPackets)
{
packet.ReturnToPool();
}
buffer.RemoveAll(p => p.Timestamp < cutoffTime);
}
var newPackets = buffer.Where(p => !p.IsPropagated).ToList();
if (newPackets.Count > 0)
{
try
{
//타임스탬프 순으로 정렬
newPackets.Sort((p1, p2) => p1.Timestamp.CompareTo(p2.Timestamp));
// 중요: 이 콜백은 Worker 스레드에서 직접 호출됩니다.
listeners[topic]?.Invoke(newPackets);
listeners[topic]?.Invoke(topic, newPackets);
}
catch (Exception ex)
{

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: 43858c6f47100444a8088212bdda5368

View File

@@ -1,65 +0,0 @@
using System;
namespace UVC.Data
{
/// <summary>
/// 수신된 단일 MQTT 메시지에 대한 모든 정보를 담는 데이터 컨테이너 클래스입니다.
/// 이 객체는 MqttWorker가 메시지를 수신했을 때 생성되며, MqttDataManager를 통해
/// 최종적으로 데이터 소비자에게 전달됩니다.
/// </summary>
/// <example>
/// <code>
/// // MqttDataPacket 객체 생성 예시
/// var packet = new MqttDataPacket("sensor/temp", "25.5");
///
/// // 데이터 접근 예시
/// Debug.Log($"수신 시간: {packet.Timestamp}");
/// Debug.Log($"토픽: {packet.Topic}");
/// Debug.Log($"내용: {packet.Payload}");
///
/// // MqttDataManager가 이 패킷을 리스너에게 전달한 후,
/// // IsPropagated를 true로 설정하여 중복 전송을 방지합니다.
/// packet.IsPropagated = true;
/// Debug.Log($"처리 완료 여부: {packet.IsPropagated}");
/// </code>
/// </example>
public class MqttDataPacket
{
/// <summary>
/// 데이터가 수신된 시간 (UTC 기준)입니다.
/// 전 세계 어디서든 동일한 시간 기록을 보장하기 위해 협정 세계시(UTC)를 사용합니다.
/// 'init' 키워드는 객체가 처음 생성될 때만 값을 할당할 수 있도록 하여, 데이터의 불변성을 보장합니다.
/// </summary>
public DateTime Timestamp { get; private set; }
/// <summary>
/// 메시지가 발행된 MQTT 토픽입니다. (예: "home/livingroom/light")
/// </summary>
public string Topic { get; private set; }
/// <summary>
/// 메시지의 실제 내용(데이터)입니다. 보통 JSON 형식의 문자열이 담깁니다.
/// </summary>
public string Payload { get; private set; }
/// <summary>
/// 이 데이터가 리스너에게 전파(전달)되었는지 여부를 나타내는 플래그입니다.
/// MqttDataManager가 이 값을 사용하여 동일한 데이터를 중복으로 전달하는 것을 방지합니다.
/// </summary>
public bool IsPropagated { get; set; }
/// <summary>
/// 새로운 MqttDataPacket 인스턴스를 생성합니다.
/// </summary>
/// <param name="topic">메시지가 수신된 토픽</param>
/// <param name="payload">메시지의 내용</param>
public MqttDataPacket(string topic, string payload)
{
this.Timestamp = DateTime.UtcNow;
this.Topic = topic;
this.Payload = payload;
// 모든 패킷은 생성 시점에는 아직 리스너에게 전달되지 않았으므로 'false'로 초기화됩니다.
this.IsPropagated = false;
}
}
}

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: cea10eb46c464f14c9a7ac8a38b4d73e

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: 4031acfadc3ee0a4a852aa2b0681f696

View File

@@ -1,7 +1,7 @@
using TMPro;
using UnityEngine;
using UnityEngine.UI;
using UVC.Data;
using UVC.Data.Core;
namespace UVC.Factory.Alarm
{
/// <summary>

View File

@@ -2,6 +2,7 @@
using System.Collections.Generic;
using UnityEngine;
using UVC.Data;
using UVC.Data.Core;
using UVC.Factory.Component;
namespace UVC.Factory.Alarm

View File

@@ -7,6 +7,8 @@ using System.Linq;
using UnityEngine;
using UVC.Core;
using UVC.Data;
using UVC.Data.Core;
using UVC.Data.Mqtt;
using UVC.Extention;
using UVC.Factory.Component;
@@ -32,7 +34,7 @@ namespace UVC.Factory.Alarm
// 알람 데이터에 포함된 설비 ID를 이용해 실제 3D 객체를 찾기 위해 사용됩니다.
private FactoryObjectManager? dataManager;
private bool testMode = false; // 테스트 모드 여부를 나타내는 플래그입니다.
private bool testMode = true; // 테스트 모드 여부를 나타내는 플래그입니다.
// 테스트용으로 사용할 AGV 이름 리스트입니다.
private List<string> agvNames = new List<string>();
@@ -110,18 +112,18 @@ namespace UVC.Factory.Alarm
DataValidator validator = new DataValidator();
validator.AddValidator("MACHINENAME", value => value != null);
// 3. MQTT 파이프라인 정보(MQTTPipeLineInfo) 생성:
// 3. MQTT 파이프라인 정보(MqttSubscriptionConfig) 생성:
// - "ALARM" 토픽을 구독합니다.
// - 위에서 정의한 dataMask를 사용해 수신된 JSON 데이터를 DataObject로 변환합니다.
// - validator를 사용해 데이터의 유효성을 검사합니다.
// - 유효한 데이터가 수신되면 OnUpdateData 메서드를 호출하여 처리합니다.
var pipelineInfo = new MQTTPipeLineInfo("ALARM")
var pipelineInfo = new MqttSubscriptionConfig("ALARM")
.setDataMapper(new DataMapper(dataMask))
.setValidator(validator)
.setHandler(OnUpdateData);
// 4. 생성한 파이프라인을 전역 MQTT 파이프라인에 추가하여 데이터 수신을 시작합니다.
AppMain.Instance.MQTTPipeLine.Add(pipelineInfo);
DataRepository.Instance.MqttReceiver.Add(pipelineInfo);
}
@@ -223,8 +225,8 @@ namespace UVC.Factory.Alarm
protected override void OnDestroy()
{
base.OnDestroy();
// AppMain의 MQTTPipeLine에서 "ALARM" 토픽에 대한 핸들러를 제거합니다.
AppMain.Instance.MQTTPipeLine.Remove("ALARM");
// AppMain의 MqttDataReceiver에서 "ALARM" 토픽에 대한 핸들러를 제거합니다.
DataRepository.Instance.MqttReceiver.Remove("ALARM");
}
/// <summary>

View File

@@ -1,6 +1,7 @@
using System;
using UnityEngine;
using UVC.Data;
using UVC.Data.Core;
namespace UVC.Factory.Alarm
{

View File

@@ -1,6 +1,6 @@
using System.Collections.Generic;
using UnityEngine;
using UVC.Data;
using UVC.Data.Core;
namespace UVC.Factory.Component
{
@@ -145,8 +145,8 @@ namespace UVC.Factory.Component
}
float? newDegree = newData.GetFloat("DEGREE");
if(newDegree.HasValue)
{
if (newDegree.HasValue)
{
if (data.GetFloat("DEGREE").Value != newDegree.Value)
{
Quaternion newTargetRotation = Quaternion.Euler(0, newDegree.Value, 0);

View File

@@ -2,12 +2,12 @@
using Cysharp.Threading.Tasks;
using SampleProject;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using UnityEngine;
using UVC.Core;
using UVC.Data;
using UVC.Data.Core;
using UVC.Data.Mqtt;
using UVC.Pool;
namespace UVC.Factory.Component
@@ -142,13 +142,13 @@ namespace UVC.Factory.Component
//validator.AddValidator("JOB_ID", value => value != null);
validator.AddValidator("VHL_NAME", value => value != null && value!.ToString() == "HFF09CNA8016");
var pipelineInfo = new MQTTPipeLineInfo("AGV")
var pipelineInfo = new MqttSubscriptionConfig("AGV")
.setDataMapper(new DataMapper(dataMask))
//.setValidator(validator)
.setHandler(OnUpdateData);
// 생성한 파이프라인 정보를 전역 MQTT 파이프라인에 추가합니다.
AppMain.Instance.MQTTPipeLine.Add(pipelineInfo);
DataRepository.Instance.MqttReceiver.Add(pipelineInfo);
}
/// <summary>
@@ -228,7 +228,7 @@ namespace UVC.Factory.Component
}
}
if(created == false)
if (created == false)
{
created = true;
// 씬이 처음 초기화될 때 AGVManager가 생성되었음을 알립니다.
@@ -244,7 +244,7 @@ namespace UVC.Factory.Component
{
base.OnDestroy();
SceneMain.Instance.Initialized -= OnSceneInitializedAsync;
AppMain.Instance.MQTTPipeLine.Remove("AGV");
DataRepository.Instance.MqttReceiver.Remove("AGV");
agvPool?.ClearRecycledItems();
}
}

View File

@@ -2,7 +2,7 @@
using System.Collections.Generic;
using UnityEngine;
using UnityEngine.EventSystems;
using UVC.Data;
using UVC.Data.Core;
using UVC.Factory.Modal;
using UVC.Object3d;
@@ -229,12 +229,12 @@ namespace UVC.Factory.Component
/// <summary>
/// 외부로부터 받은 새로운 데이터로 객체의 상태를 업데이트합니다.
/// 이 메서드는 내부적으로 `ProcessData`를 호출하여 실제 데이터 처리 로직을 수행합니다.
/// MQTTPipeLineInfo.updatedDataOnly가 true인 경우, 데이터가 변경된 경우에만 호출됩니다.
/// MqttSubscriptionConfig.updatedDataOnly가 true인 경우, 데이터가 변경된 경우에만 호출됩니다.
/// </summary>
/// <param name="newData">업데이트할 새로운 데이터가 포함된 IDataObject 객체입니다.</param>
public void UpdateData(DataObject newData)
{
if(newData == null) return;
if (newData == null) return;
ProcessData(newData);
}
@@ -247,7 +247,7 @@ namespace UVC.Factory.Component
/// 데이터 값에 따라 객체의 색상, 애니메이션, 동작 등을 변경하는 코드를 작성해야 합니다.
/// </remarks>
/// <param name="newData">처리할 데이터 객체입니다. null일 수 없습니다.</param>
protected virtual void ProcessData(DataObject newData) {}
protected virtual void ProcessData(DataObject newData) { }
/// <summary>
/// 객체의 위치를 가져옵니다. 월드 좌표 또는 로컬 좌표로 반환할 수 있습니다.

View File

@@ -26,6 +26,7 @@ namespace UVC.network
private bool autoReconnect = true; // 자동 재연결 여부
private int reconnectDelay = 1000; // 재연결 시도 간격 (ms)
private bool onBackgroundThread = true; // 핸들러를 백그라운드 스레드에서 호출할지 여부
private ConcurrentDictionary<string, Action<string, string>> topicHandler;
@@ -42,6 +43,7 @@ namespace UVC.network
/// <param name="domain">MQTT 브로커의 호스트명 또는 IP 주소입니다.</param>
/// <param name="port">MQTT 브로커의 포트 번호입니다.</param>
/// <param name="autoReconnect">연결이 끊겼을 때 자동으로 재연결을 시도할지 여부입니다.</param>
/// <param name="onBackground">Handler를 백그라운드 스레드에서 호출 할지 여부</param>
/// <example>
/// <code>
/// // localhost의 기본 MQTT 포트(1883)에 연결하는 서비스 생성
@@ -51,7 +53,7 @@ namespace UVC.network
/// var mqttService = new MQTTService("mqtt.example.com", 8883, false);
/// </code>
/// </example>
public MQTTService(string domain, int port = 1883, bool autoReconnect = true)
public MQTTService(string domain, int port = 1883, bool autoReconnect = true, bool onBackground = true)
{
topicHandler = new ConcurrentDictionary<string, Action<string, string>>();
MQTTDomain = domain;
@@ -317,8 +319,31 @@ namespace UVC.network
/// <remarks>
/// 이 메서드는 메시지를 로깅하고 해당 토픽에 등록된 모든 핸들러를 호출합니다.
/// </remarks>
private void OnTopic(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
private void OnTopic(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
{
// 메인 스레드에서 실행 중인지 확인합니다.
bool isMainThread = PlayerLoopHelper.IsMainThread;
//Debug.Log($"MQTT OnTopic isMainThread={isMainThread}, onBackgroundThread:{onBackgroundThread}, {topic.Filter.OriginalFilter}");
if (isMainThread && onBackgroundThread)
{
// 백그라운드 스레드에서 실행
UniTask.RunOnThreadPool(() => OnTopicLogic(client, topic, topicName, message)).Forget();
}
else if (!isMainThread && !onBackgroundThread)
{
// 메인 스레드에서 실행
UniTask.Post(() => OnTopicLogic(client, topic, topicName, message));
}
else
{
// 메인 스레드에서 실행
OnTopicLogic(client, topic, topicName, message);
}
}
private void OnTopicLogic(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
{
//Debug.Log($"MQTT OnTopicLogic isMainThread={PlayerLoopHelper.IsMainThread}");
string payload = Encoding.UTF8.GetString(message.Payload.Data, message.Payload.Offset, message.Payload.Count);
//ULog.Debug($"MQTT OnTopic {topic.Filter.OriginalFilter} => {payload}");
ServerLog.LogMqtt(MQTTDomain, MQTTPort.ToString(), topic.Filter.OriginalFilter, payload, DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"));
@@ -326,23 +351,13 @@ namespace UVC.network
{
if (topicHandler.TryGetValue(topic.Filter.OriginalFilter, out var handler))
{
// 메인 스레드에서 실행 중인지 확인합니다.
bool isMainThread = UniTask.SwitchToMainThread().GetAwaiter().IsCompleted;
Debug.Log($"MQTT OnTopic {topic.Filter.OriginalFilter} => {payload} isMainThread={isMainThread}");
if (isMainThread)
{
// 메인 스레드이므로 핸들러를 직접 호출합니다.
handler.Invoke(topic.Filter.OriginalFilter, payload);
}
else
{
// 백그라운드 스레드이므로 UniTask.Post를 사용하여 메인 스레드로 작업을 보냅니다.
UniTask.Post(() => handler.Invoke(topic.Filter.OriginalFilter, payload));
}
handler.Invoke(topic.Filter.OriginalFilter, payload);
}
}
}
/// <summary>
/// 지정된 토픽으로 메시지를 발행(publish)합니다.
/// </summary>

View File

@@ -5,6 +5,7 @@ using System;
using System.Linq;
using UnityEngine;
using UVC.Data;
using UVC.Data.Core;
namespace UVC.Tests.Data
{

View File

@@ -3,7 +3,7 @@ using NUnit.Framework;
using System;
using System.Collections.Generic;
using UnityEngine;
using UVC.Data;
using UVC.Data.Core;
namespace UVC.Tests.Data
{

View File

@@ -6,6 +6,7 @@ using System;
using System.Linq;
using UnityEngine;
using UVC.Data;
using UVC.Data.Core;
namespace UVC.Tests.Data
{

View File

@@ -8,17 +8,19 @@ using System.Reflection;
using System.Threading;
using UnityEngine;
using UVC.Data;
using UVC.Data.Core;
using UVC.Data.Http;
namespace UVC.Tests.Data
{
/// <summary>
/// HttpPipeLine 클래스의 테스트를 위한 테스트 클래스입니다.
/// HttpDataFetcher 클래스의 테스트를 위한 테스트 클래스입니다.
/// </summary>
[TestFixture]
public class HttpPipeLineTests
public class HttpDataFetcherTests
{
// 테스트에 사용할 HttpPipeLine 인스턴스
private HttpPipeLine? pipeLine;
// 테스트에 사용할 HttpDataFetcher 인스턴스
private HttpDataFetcher? pipeLine;
/// <summary>
/// 각 테스트 실행 전에 호출되는 설정 메서드입니다.
@@ -26,7 +28,7 @@ namespace UVC.Tests.Data
[SetUp]
public void Setup()
{
pipeLine = new HttpPipeLine();
pipeLine = new HttpDataFetcher();
pipeLine.UseMockup = true; // MockHttpRequester 사용 설정
// 테스트를 위한 DataRepository 초기화
ClearDataRepository();
@@ -43,7 +45,7 @@ namespace UVC.Tests.Data
{
Setup();
Debug.Log("===== HttpPipeLine 테스트 시작 =====");
Debug.Log("===== HttpDataFetcher 테스트 시작 =====");
//RunTest(nameof(Add_NewInfo_AddedSuccessfully), Add_NewInfo_AddedSuccessfully);
//RunTest(nameof(Add_ExistingInfo_UpdatesExistingEntry), Add_ExistingInfo_UpdatesExistingEntry);
//await RunTestAsync(nameof(Remove_ExistingInfo_RemovedSuccessfullyAsync), Remove_ExistingInfo_RemovedSuccessfullyAsync);
@@ -77,7 +79,7 @@ namespace UVC.Tests.Data
await RunTestAsync(nameof(Test_Excute_WithArrayAndValidator_FiltersInvalidData), Test_Excute_WithArrayAndValidator_FiltersInvalidData);
Debug.Log("===== DataValidator 테스트 완료 =====");
Debug.Log("===== HttpPipeLine 테스트 완료 =====");
Debug.Log("===== HttpDataFetcher 테스트 완료 =====");
}
/// <summary>
@@ -114,13 +116,13 @@ namespace UVC.Tests.Data
}
/// <summary>
/// 새로운 HttpPipeLineInfo를 추가하는 테스트
/// 새로운 HttpRequestConfig를 추가하는 테스트
/// </summary>
[Test]
public void Add_NewInfo_AddedSuccessfully()
{
// Arrange
var info = new HttpPipeLineInfo("http://test.com");
var info = new HttpRequestConfig("http://test.com");
// Act
pipeLine.Add("test", info);
@@ -132,14 +134,14 @@ namespace UVC.Tests.Data
}
/// <summary>
/// 기존에 존재하는 키로 HttpPipeLineInfo를 추가할 때 업데이트 테스트
/// 기존에 존재하는 키로 HttpRequestConfig를 추가할 때 업데이트 테스트
/// </summary>
[Test]
public void Add_ExistingInfo_UpdatesExistingEntry()
{
// Arrange
var info1 = new HttpPipeLineInfo("http://test1.com");
var info2 = new HttpPipeLineInfo("http://test2.com");
var info1 = new HttpRequestConfig("http://test1.com");
var info2 = new HttpRequestConfig("http://test2.com");
pipeLine.Add("test", info1);
// Act
@@ -153,13 +155,13 @@ namespace UVC.Tests.Data
}
/// <summary>
/// 존재하는 HttpPipeLineInfo를 제거하는 테스트
/// 존재하는 HttpRequestConfig를 제거하는 테스트
/// </summary>
[Test]
public async UniTask Remove_ExistingInfo_RemovedSuccessfullyAsync()
{
// Arrange
var info = new HttpPipeLineInfo("http://test.com");
var info = new HttpRequestConfig("http://test.com");
pipeLine.Add("test", info);
// Act
@@ -177,7 +179,7 @@ namespace UVC.Tests.Data
public async UniTask Remove_NonExistingInfo_DoesNothing()
{
// Arrange
var info = new HttpPipeLineInfo("http://test.com");
var info = new HttpRequestConfig("http://test.com");
pipeLine.Add("test", info);
// Act - 존재하지 않는 키 제거 시도
@@ -190,14 +192,14 @@ namespace UVC.Tests.Data
}
/// <summary>
/// HttpPipeLine의 private infoList 필드 가져오기
/// HttpDataFetcher의 private infoList 필드 가져오기
/// </summary>
private Dictionary<string, HttpPipeLineInfo> GetInfoListField()
private Dictionary<string, HttpRequestConfig> GetInfoListField()
{
var fieldInfo = typeof(HttpPipeLine).GetField("infoList",
var fieldInfo = typeof(HttpDataFetcher).GetField("infoList",
BindingFlags.NonPublic | BindingFlags.Instance);
return (Dictionary<string, HttpPipeLineInfo>)fieldInfo.GetValue(pipeLine);
return (Dictionary<string, HttpRequestConfig>)fieldInfo.GetValue(pipeLine);
}
/// <summary>
@@ -246,8 +248,8 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// HttpPipeLineInfo 설정
var info = new HttpPipeLineInfo("http://test.com")
// HttpRequestConfig 설정
var info = new HttpRequestConfig("http://test.com")
.setDataMapper(dataMapper)
.setSuccessHandler((data) =>
{
@@ -304,8 +306,8 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// HttpPipeLineInfo 설정
var info = new HttpPipeLineInfo("http://test.com")
// HttpRequestConfig 설정
var info = new HttpRequestConfig("http://test.com")
.setDataMapper(dataMapper)
.setSuccessHandler((data) =>
{
@@ -381,8 +383,8 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// HttpPipeLineInfo 설정
var info = new HttpPipeLineInfo(agvUrl, "get")
// HttpRequestConfig 설정
var info = new HttpRequestConfig(agvUrl, "get")
.setDataMapper(dataMapper)
.setSuccessHandler((data) =>
{
@@ -435,8 +437,8 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// HttpPipeLineInfo 설정
var info = new HttpPipeLineInfo(alarmUrl, "get")
// HttpRequestConfig 설정
var info = new HttpRequestConfig(alarmUrl, "get")
.setDataMapper(dataMapper)
.setSuccessHandler((data) =>
{
@@ -515,11 +517,11 @@ namespace UVC.Tests.Data
}
};
// 각 데이터 타입별 HttpPipeLineInfo 설정 및 등록
// 각 데이터 타입별 HttpRequestConfig 설정 및 등록
foreach (var item in urls)
{
string key = item.Key;
var info = new HttpPipeLineInfo(item.Value, "get")
var info = new HttpRequestConfig(item.Value, "get")
.setDataMapper(new DataMapper(dataMasks[key]))
.setSuccessHandler((data) =>
{
@@ -569,8 +571,8 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// HttpPipeLineInfo 설정
var info = new HttpPipeLineInfo(testUrl, "get")
// HttpRequestConfig 설정
var info = new HttpRequestConfig(testUrl, "get")
.setDataMapper(dataMapper)
.setSuccessHandler((data) =>
{
@@ -647,8 +649,8 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// HttpPipeLineInfo 설정
var info = new HttpPipeLineInfo(baseInfoUrl, "get")
// HttpRequestConfig 설정
var info = new HttpRequestConfig(baseInfoUrl, "get")
.setDataMapper(dataMapper)
.setSuccessHandler((data) =>
{
@@ -733,8 +735,8 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// 반복 실행 설정을 포함한 HttpPipeLineInfo 생성
var info = new HttpPipeLineInfo(testUrl, "get")
// 반복 실행 설정을 포함한 HttpRequestConfig 생성
var info = new HttpRequestConfig(testUrl, "get")
.setDataMapper(dataMapper)
.setSuccessHandler(async (data) =>
{
@@ -807,8 +809,8 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// 무한 반복 설정을 포함한 HttpPipeLineInfo 생성
var info = new HttpPipeLineInfo(testUrl, "get")
// 무한 반복 설정을 포함한 HttpRequestConfig 생성
var info = new HttpRequestConfig(testUrl, "get")
.setDataMapper(dataMapper)
.setSuccessHandler((data) => { handlerCallCount++; })
.setRepeat(true, 0, repeatInterval, false); // 무한 반복 (repeatCount = 0)
@@ -876,12 +878,12 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// 두 개의 반복 요청 설정
var info1 = new HttpPipeLineInfo(testUrl1, "get")
var info1 = new HttpRequestConfig(testUrl1, "get")
.setDataMapper(dataMapper)
.setSuccessHandler((data) => { handlerCallCount1++; })
.setRepeat(true, 0, repeatInterval1, false);
var info2 = new HttpPipeLineInfo(testUrl2, "get")
var info2 = new HttpRequestConfig(testUrl2, "get")
.setDataMapper(dataMapper)
.setSuccessHandler((data) => { handlerCallCount2++; })
.setRepeat(true, 0, repeatInterval2, false);
@@ -957,8 +959,8 @@ namespace UVC.Tests.Data
var dataMapper = new DataMapper(dataMask);
// 반복 횟수가 지정된 HttpPipeLineInfo 생성
var info = new HttpPipeLineInfo(testUrl, "get")
// 반복 횟수가 지정된 HttpRequestConfig 생성
var info = new HttpRequestConfig(testUrl, "get")
.setDataMapper(dataMapper)
.setSuccessHandler((data) =>
{
@@ -1003,11 +1005,11 @@ namespace UVC.Tests.Data
}
/// <summary>
/// HttpPipeLine의 private repeatTokenSources 필드 가져오기
/// HttpDataFetcher의 private repeatTokenSources 필드 가져오기
/// </summary>
private Dictionary<string, CancellationTokenSource> GetRepeatTokenSourcesField()
{
var fieldInfo = typeof(HttpPipeLine).GetField("repeatTokenSources",
var fieldInfo = typeof(HttpDataFetcher).GetField("repeatTokenSources",
BindingFlags.NonPublic | BindingFlags.Instance);
return (Dictionary<string, CancellationTokenSource>)fieldInfo.GetValue(pipeLine);
@@ -1154,11 +1156,12 @@ namespace UVC.Tests.Data
// "status" 필드가 "active"인 경우에만 유효하도록 설정
var validator = new DataValidator();
validator.AddValidator("status", value => {
return value is string s && s == "active";
validator.AddValidator("status", value =>
{
return value is string s && s == "active";
});
var info = new HttpPipeLineInfo(testUrl)
var info = new HttpRequestConfig(testUrl)
.setDataMapper(dataMapper)
.setValidator(validator)
.setSuccessHandler(data =>
@@ -1208,7 +1211,7 @@ namespace UVC.Tests.Data
var validator = new DataValidator();
validator.AddValidator("status", value => value is string s && s == "active");
var info = new HttpPipeLineInfo(testUrl)
var info = new HttpRequestConfig(testUrl)
.setDataMapper(dataMapper)
.setValidator(validator)
.setSuccessHandler(data =>
@@ -1257,11 +1260,12 @@ namespace UVC.Tests.Data
// "value"가 15보다 큰 항목만 유효하도록 설정
var validator = new DataValidator();
validator.AddValidator("value", value => {
return value is int v && v > 15;
validator.AddValidator("value", value =>
{
return value is int v && v > 15;
});
var info = new HttpPipeLineInfo(testUrl)
var info = new HttpRequestConfig(testUrl)
.setDataMapper(dataMapper)
.setValidator(validator)
.setSuccessHandler(data =>

View File

@@ -0,0 +1,2 @@
fileFormatVersion: 2
guid: a46a845a86ba29c49b6ab4fe038b374a

View File

@@ -1,2 +0,0 @@
fileFormatVersion: 2
guid: e90362e4ba6e6bf4bb2c320d28f13403

View File

@@ -1,21 +1,20 @@
#nullable enable
using Cysharp.Threading.Tasks;
using Newtonsoft.Json.Linq;
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using UnityEngine;
using UVC.Data;
using UVC.Data.Core;
using UVC.Data.Mqtt;
using UVC.Log;
namespace UVC.Tests.Data
{
[TestFixture]
public class MQTTPipeLineTests
public class MqttDataReceiverTests
{
private MQTTPipeLine pipeLine;
private MqttDataReceiver mqttReceiver;
private Dictionary<string, TestDataHandler> handlers;
private Dictionary<string, DataMask> dataMasks;
private readonly string[] topicNames = { "AGV", "CARRIER", "STOCKER_STACK", "ALL" };
@@ -25,8 +24,9 @@ namespace UVC.Tests.Data
public void Setup()
{
// 기본 테스트 환경 설정
pipeLine = new MQTTPipeLine("localhost", 1883);
pipeLine.UseMockup = true; // 테스트에서는 항상 MockMQTTService 사용
mqttReceiver = new MqttDataReceiver();
mqttReceiver.SetDomainPort("localhost", 1883);
mqttReceiver.UseMockup = true; // 테스트에서는 항상 MockMQTTService 사용
// 핸들러와 데이터 마스크 초기화
handlers = new Dictionary<string, TestDataHandler>();
@@ -43,7 +43,7 @@ namespace UVC.Tests.Data
public async UniTask TestAll()
{
Setup();
Debug.Log("===== MQTTPipeLine 테스트 시작 =====");
Debug.Log("===== MqttDataReceiver 테스트 시작 =====");
// 하나씩 테스트 해야 함
//await RunTestAsync(nameof(ExecutePipeLine_AllTopics_RegistersAndHandlesMessages), ExecutePipeLine_AllTopics_RegistersAndHandlesMessages);
//await RunTestAsync(nameof(RemoveTopic_ShouldStopReceivingMessages), RemoveTopic_ShouldStopReceivingMessages);
@@ -57,7 +57,7 @@ namespace UVC.Tests.Data
//await RunTestAsync(nameof(OnTopicMessage_WithValidData_ValidatorPassesAsync), OnTopicMessage_WithValidData_ValidatorPassesAsync);
//await RunTestAsync(nameof(OnTopicMessage_WithInvalidData_ValidatorFailsAsync), OnTopicMessage_WithInvalidData_ValidatorFailsAsync);
await RunTestAsync(nameof(OnTopicMessage_WithArrayAndValidator_FiltersInvalidDataAsync), OnTopicMessage_WithArrayAndValidator_FiltersInvalidDataAsync);
Debug.Log("===== MQTTPipeLine 테스트 완료 =====");
Debug.Log("===== MqttDataReceiver 테스트 완료 =====");
}
private void RunTest(string testName, Action testAction)
@@ -92,8 +92,8 @@ namespace UVC.Tests.Data
public void TearDown()
{
// 테스트 완료 후 리소스 정리
pipeLine.Stop();
pipeLine.Dispose();
mqttReceiver.Stop();
mqttReceiver.Dispose();
}
private DataMask CreateDataMaskForTopic(string topic)
@@ -282,16 +282,16 @@ namespace UVC.Tests.Data
// 필요한 UpdatedDataOnly 설정
bool updatedDataOnly = topic != "ALL";
var pipelineInfo = new MQTTPipeLineInfo(topic, updatedDataOnly)
var pipelineInfo = new MqttSubscriptionConfig(topic, updatedDataOnly)
.setDataMapper(new DataMapper(dataMasks[topic]))
.setHandler(handlers[topic].HandleData);
pipeLine.Add(pipelineInfo);
mqttReceiver.Add(pipelineInfo);
}
Debug.Log("파이프라인 설정 완료.");
// Act - 파이프라인 실행
pipeLine.Execute();
Debug.Log("파이프라인 Execute.");
mqttReceiver.Start();
Debug.Log("파이프라인 Start.");
// Assert - 일정 시간 기다린 후 각 핸들러가 호출되었는지 확인
await UniTask.Delay(1500);
@@ -313,12 +313,12 @@ namespace UVC.Tests.Data
{
// Arrange
// AGV 토픽만 등록
var agvInfo = new MQTTPipeLineInfo("AGV", true)
var agvInfo = new MqttSubscriptionConfig("AGV", true)
.setDataMapper(new DataMapper(dataMasks["AGV"]))
.setHandler(handlers["AGV"].HandleData);
pipeLine.Add(agvInfo);
pipeLine.Execute();
mqttReceiver.Add(agvInfo);
mqttReceiver.Start();
// 메시지가 수신되도록 잠시 대기
await UniTask.Delay(1000);
@@ -328,7 +328,7 @@ namespace UVC.Tests.Data
Assert.IsTrue(initialCallCount > 0, "초기 AGV 토픽의 핸들러가 호출되지 않았습니다.");
// Act
pipeLine.Remove("AGV"); // AGV 토픽 제거
mqttReceiver.Remove("AGV"); // AGV 토픽 제거
// 핸들러 초기화
handlers["AGV"].Reset();
@@ -344,11 +344,11 @@ namespace UVC.Tests.Data
public async UniTask UpdatedDataOnly_ShouldOnlyCallHandlerForUpdatedData()
{
// Arrange - 파이프라인 설정
// TestMQTTPipeLine을 사용하여 직접 메시지를 보낼 수 있게 함
var testPipeLine = new TestMQTTPipeLine();
// TestMqttDataReceiver을 사용하여 직접 메시지를 보낼 수 있게 함
var testPipeLine = new TestMqttDataReceiver();
// UpdatedDataOnly가 true인 AGV 토픽 추가
var agvInfo = new MQTTPipeLineInfo("AGV", true)
var agvInfo = new MqttSubscriptionConfig("AGV", true)
.setDataMapper(new DataMapper(dataMasks["AGV"]))
.setHandler(handlers["AGV"].HandleData);
@@ -375,12 +375,12 @@ namespace UVC.Tests.Data
public void OnTopicMessage_ValidJsonObject_CallsHandler()
{
// Arrange
var testPipeLine = new TestMQTTPipeLine();
var testPipeLine = new TestMqttDataReceiver();
foreach (var topic in topicNames)
{
bool updatedDataOnly = topic != "ALL";
var pipelineInfo = new MQTTPipeLineInfo(topic, updatedDataOnly)
var pipelineInfo = new MqttSubscriptionConfig(topic, updatedDataOnly)
.setDataMapper(new DataMapper(dataMasks[topic]))
.setHandler(handlers[topic].HandleData);
@@ -406,9 +406,9 @@ namespace UVC.Tests.Data
public void OnTopicMessage_JsonArray_CallsHandler()
{
// Arrange
var testPipeLine = new TestMQTTPipeLine();
var testPipeLine = new TestMqttDataReceiver();
var pipelineInfo = new MQTTPipeLineInfo("AGV", true)
var pipelineInfo = new MqttSubscriptionConfig("AGV", true)
.setDataMapper(new DataMapper(dataMasks["AGV"]))
.setHandler(handlers["AGV"].HandleData);
@@ -430,9 +430,9 @@ namespace UVC.Tests.Data
public void OnTopicMessage_EmptyMessage_DoesNotCallHandler()
{
// Arrange
var testPipeLine = new TestMQTTPipeLine();
var testPipeLine = new TestMqttDataReceiver();
var pipelineInfo = new MQTTPipeLineInfo("AGV", true)
var pipelineInfo = new MqttSubscriptionConfig("AGV", true)
.setDataMapper(new DataMapper(dataMasks["AGV"]))
.setHandler(handlers["AGV"].HandleData);
@@ -449,9 +449,9 @@ namespace UVC.Tests.Data
public void OnTopicMessage_InvalidJson_DoesNotCallHandler()
{
// Arrange
var testPipeLine = new TestMQTTPipeLine();
var testPipeLine = new TestMqttDataReceiver();
var pipelineInfo = new MQTTPipeLineInfo("AGV", true)
var pipelineInfo = new MqttSubscriptionConfig("AGV", true)
.setDataMapper(new DataMapper(dataMasks["AGV"]))
.setHandler(handlers["AGV"].HandleData);
@@ -476,12 +476,13 @@ namespace UVC.Tests.Data
dataMask.ObjectName = "AGV";
dataMask.ObjectIdKey = "VHL_NAME";
// 3. MQTTPipeLine 설정 (MockMQTTService 사용)
var pipeline = new MQTTPipeLine("localhost", 1883);
// 3. MqttDataReceiver 설정 (MockMQTTService 사용)
var pipeline = new MqttDataReceiver();
pipeline.SetDomainPort("localhost", 1883);
pipeline.UseMockup = true; // MockMQTTService 사용 설정
// 4. UpdatedDataOnly=true로 토픽 등록
var pipelineInfo = new MQTTPipeLineInfo("AGV", true)
var pipelineInfo = new MqttSubscriptionConfig("AGV", true)
.setDataMapper(new DataMapper(dataMask))
.setHandler(handler.HandleData);
@@ -489,7 +490,7 @@ namespace UVC.Tests.Data
// Act
// 파이프라인 실행 - 이것이 MockMQTTService를 통해 메시지를 보내기 시작
pipeline.Execute();
pipeline.Start();
// 첫 번째 데이터 세트가 수신될 때까지 대기
await UniTask.Delay(1500);
@@ -583,18 +584,19 @@ namespace UVC.Tests.Data
public async UniTask OnTopicMessage_WithValidData_ValidatorPassesAsync()
{
// Arrange
var testPipeLine = new TestMQTTPipeLine();
var testPipeLine = new TestMqttDataReceiver();
var handler = new TestDataHandler();
var dataMapper = new DataMapper(new DataMask { ["id"] = 0, ["status"] = "" });
// "status" 필드가 "active"인 경우에만 유효하도록 설정
var validator = new DataValidator();
validator.AddValidator("status", value => {
validator.AddValidator("status", value =>
{
Debug.Log($"Validator called with value: {value}, {value is string s2 && s2 == "active"}");
return value is string s && s == "active";
});
var pipelineInfo = new MQTTPipeLineInfo("test_topic")
var pipelineInfo = new MqttSubscriptionConfig("test_topic")
.setDataMapper(dataMapper)
.setValidator(validator)
.setHandler(handler.HandleData);
@@ -621,18 +623,19 @@ namespace UVC.Tests.Data
public async UniTask OnTopicMessage_WithInvalidData_ValidatorFailsAsync()
{
// Arrange
var testPipeLine = new TestMQTTPipeLine();
var testPipeLine = new TestMqttDataReceiver();
var handler = new TestDataHandler();
var dataMapper = new DataMapper(new DataMask { ["id"] = 0, ["status"] = "" });
// "status" 필드가 "active"인 경우에만 유효하도록 설정
var validator = new DataValidator();
validator.AddValidator("status", value => {
validator.AddValidator("status", value =>
{
Debug.Log($"Validator called with value2: {value}, {value is string s2 && s2 == "active"}");
return value is string s && s == "active";
});
var pipelineInfo = new MQTTPipeLineInfo("test_topic")
var pipelineInfo = new MqttSubscriptionConfig("test_topic")
.setDataMapper(dataMapper)
.setValidator(validator)
.setHandler(handler.HandleData);
@@ -657,7 +660,7 @@ namespace UVC.Tests.Data
public async UniTask OnTopicMessage_WithArrayAndValidator_FiltersInvalidDataAsync()
{
// Arrange
var testPipeLine = new TestMQTTPipeLine();
var testPipeLine = new TestMqttDataReceiver();
var handler = new TestDataHandler();
var dataMapper = new DataMapper(new DataMask { ["id"] = 0, ["value"] = 0 });
@@ -669,7 +672,7 @@ namespace UVC.Tests.Data
return value is int v && v > 15;
});
var pipelineInfo = new MQTTPipeLineInfo("test_topic")
var pipelineInfo = new MqttSubscriptionConfig("test_topic")
.setDataMapper(dataMapper)
.setValidator(validator)
.setHandler(handler.HandleData);
@@ -702,18 +705,19 @@ namespace UVC.Tests.Data
#endregion
}
// MQTTPipeLine의 OnTopicMessage 메서드를 테스트하기 위한 확장 클래스
public class TestMQTTPipeLine : MQTTPipeLine
// MqttDataReceiver의 OnTopicMessage 메서드를 테스트하기 위한 확장 클래스
public class TestMqttDataReceiver : MqttDataReceiver
{
public TestMQTTPipeLine() : base("localhost", 1883)
public TestMqttDataReceiver() : base()
{
UseMockup = true;
SetDomainPort("localhost", 1883);
}
public void TestOnTopicMessage(string topic, string message)
{
// private 메서드에 접근하기 위한 래퍼
typeof(MQTTPipeLine).GetMethod("OnTopicMessage",
typeof(MqttDataReceiver).GetMethod("OnTopicMessage",
System.Reflection.BindingFlags.NonPublic |
System.Reflection.BindingFlags.Instance)?.Invoke(this, new object[] { topic, message });
}

View File

@@ -7,8 +7,8 @@ namespace UVC.Tests
public static void RunAllTests()
{
//new DataMapperTests().TestAll();
//new HttpPipeLineTests().TestAll();
//new MQTTPipeLineTests().TestAll();
//new HttpDataFetcherTests().TestAll();
//new MqttDataReceiverTests().TestAll();
//new DataObjectTests().TestAll();
new DataArrayTests().TestAll();
}

View File

@@ -26,8 +26,8 @@ namespace UVC.UI.Commands
}
// 제네릭 ActionCommand<T>는 이미 파라미터를 생성자에서 받으므로,
// ICommand.Execute(object parameter)를 구현할 때 해당 파라미터를 사용하지 않거나,
// 혹은 Execute(object) 호출 시 전달된 파라미터로 내부 _parameter를 덮어쓰는 등의 정책을 정해야 합니다.
// ICommand.Start(object parameter)를 구현할 때 해당 파라미터를 사용하지 않거나,
// 혹은 Start(object) 호출 시 전달된 파라미터로 내부 _parameter를 덮어쓰는 등의 정책을 정해야 합니다.
// 또는, ICommand<T> 인터페이스를 고려할 수도 있습니다 (아래 2번 방법).
public class ActionCommand<T> : ICommand<T> // ICommand<T>를 구현
{
@@ -49,18 +49,18 @@ namespace UVC.UI.Commands
_useDefaultParameterForParameterlessExecute = useDefaultForParameterless;
}
// ICommand<T>의 Execute(T parameter) 구현
// ICommand<T>의 Start(T parameter) 구현
public void Execute(T parameter)
{
_action.Invoke(parameter);
}
// ICommand<T> 인터페이스에 의해 추가된 파라미터 없는 Execute()
// 기본 구현은 Execute(default(T))를 호출합니다.
// ICommand<T> 인터페이스에 의해 추가된 파라미터 없는 Start()
// 기본 구현은 Start(default(T))를 호출합니다.
// 이 메서드는 ICommand<T>의 기본 인터페이스 메서드에 의해 제공되므로,
// 여기서 명시적으로 재정의할 필요는 없습니다. (void ICommand<T>.Execute() => Execute(default(T));)
// 여기서 명시적으로 재정의할 필요는 없습니다. (void ICommand<T>.Start() => Start(default(T));)
// 만약 다른 동작을 원한다면 여기서 재정의할 수 있습니다.
// public new void Execute() // 'new'는 인터페이스의 기본 구현을 숨기기 위함이 아님.
// public new void Start() // 'new'는 인터페이스의 기본 구현을 숨기기 위함이 아님.
// {
// if (_useDefaultParameterForParameterlessExecute)
// {
@@ -69,12 +69,12 @@ namespace UVC.UI.Commands
// else
// {
// // 또는 예외를 발생시키거나, 로깅 후 아무것도 하지 않음
// Debug.LogWarning($"ActionCommand<{typeof(T).Name}>의 파라미터 없는 Execute()가 호출되었으나, 기본 파라미터 사용이 설정되지 않았습니다.");
// Debug.LogWarning($"ActionCommand<{typeof(T).Name}>의 파라미터 없는 Start()가 호출되었으나, 기본 파라미터 사용이 설정되지 않았습니다.");
// }
// }
// ICommand의 Execute(object parameter = null) 구현
// ICommand의 Start(object parameter = null) 구현
void ICommand.Execute(object parameter) // 명시적 인터페이스 구현
{
if (parameter is T typedParameter)
@@ -96,7 +96,7 @@ namespace UVC.UI.Commands
}
else
{
Debug.LogError($"ActionCommand<{typeof(T).Name}>.Execute(object): 잘못된 파라미터 타입입니다. 기대: {typeof(T).Name}, 실제: {parameter.GetType().Name}");
Debug.LogError($"ActionCommand<{typeof(T).Name}>.Start(object): 잘못된 파라미터 타입입니다. 기대: {typeof(T).Name}, 실제: {parameter.GetType().Name}");
// 예외를 발생시킬 수도 있습니다: throw new ArgumentException("잘못된 파라미터 타입입니다.", nameof(parameter));
}
}

View File

@@ -19,7 +19,7 @@ namespace UVC.UI.Commands
{
string targetLanguage = _languageCode;
// Execute 호출 시 전달된 파라미터가 있다면 그것을 우선 사용
// Start 호출 시 전달된 파라미터가 있다면 그것을 우선 사용
if (parameter is string langCodeFromParam && !string.IsNullOrEmpty(langCodeFromParam))
{
targetLanguage = langCodeFromParam;

View File

@@ -9,6 +9,6 @@
{
void Execute(T parameter);
void Execute() => Execute(default(T)); // 기본 Execute 구현 제공 가능
void Execute() => Execute(default(T)); // 기본 Start 구현 제공 가능
}
}

View File

@@ -1,7 +1,7 @@
using System.Collections.Generic;
using UnityEngine;
using UnityEngine.UI;
using UVC.Data;
using UVC.Data.Core;
using UVC.Factory.Component;
using UVC.Locale;
using UVC.Log;