2025-06-06 02:17:54 +09:00
|
|
|
|
#nullable enable
|
|
|
|
|
|
|
|
|
|
|
|
using Newtonsoft.Json.Linq;
|
|
|
|
|
|
using SampleProject.Config;
|
|
|
|
|
|
using System.Collections.Generic;
|
|
|
|
|
|
using UVC.network;
|
2025-06-05 20:09:28 +09:00
|
|
|
|
|
|
|
|
|
|
namespace UVC.Data
|
|
|
|
|
|
{
|
2025-06-09 19:29:59 +09:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// MQTT 통신을 통해 데이터를 수신하고 처리하는 파이프라인을 관리하는 클래스입니다.
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <remarks>
|
|
|
|
|
|
/// 이 클래스는 MQTT 브로커에 연결하여 등록된 토픽으로 들어오는 메시지를 수신하고,
|
|
|
|
|
|
/// 해당 메시지를 지정된 데이터 매퍼를 통해 변환한 후 핸들러에게 전달합니다.
|
|
|
|
|
|
/// 여러 MQTT 토픽을 동시에 관리하고 각각에 대한 처리 방식을 개별적으로 설정할 수 있습니다.
|
|
|
|
|
|
/// </remarks>
|
2025-06-05 20:09:28 +09:00
|
|
|
|
public class MQTTPipeLine
|
|
|
|
|
|
{
|
2025-06-09 19:29:59 +09:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// MQTT 브로커의 도메인 주소
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
private string domain;
|
2025-06-06 02:17:54 +09:00
|
|
|
|
|
2025-06-09 19:29:59 +09:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// MQTT 브로커의 포트 번호
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public int port;
|
2025-06-06 02:17:54 +09:00
|
|
|
|
|
2025-06-09 19:29:59 +09:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 토픽별 파이프라인 정보를 저장하는 딕셔너리
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
private Dictionary<string, MQTTPipeLineInfo> infoList;
|
2025-06-06 02:17:54 +09:00
|
|
|
|
|
2025-06-09 19:29:59 +09:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// MQTT 통신을 처리하는 서비스 객체
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
private MQTTService mqtt;
|
2025-06-06 02:17:54 +09:00
|
|
|
|
|
2025-06-09 19:29:59 +09:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// MQTTPipeLine 인스턴스를 생성합니다.
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="domain">MQTT 브로커의 도메인 주소, 기본값은 "localhost"입니다.</param>
|
|
|
|
|
|
/// <param name="port">MQTT 브로커의 포트 번호, 기본값은 1883입니다.</param>
|
|
|
|
|
|
public MQTTPipeLine(string domain = "localhost", int port = 1883)
|
|
|
|
|
|
{
|
|
|
|
|
|
this.domain = string.IsNullOrEmpty(domain) ? Constants.MQTT_DOMAIN : domain;
|
|
|
|
|
|
this.port = port;
|
|
|
|
|
|
mqtt = new MQTTService(Constants.MQTT_DOMAIN, Constants.MQTT_PORT);
|
|
|
|
|
|
infoList = new Dictionary<string, MQTTPipeLineInfo>();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 토픽에 대한 파이프라인 정보를 추가합니다.
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="info">추가할 MQTTPipeLineInfo 객체</param>
|
|
|
|
|
|
/// <remarks>
|
|
|
|
|
|
/// 동일한 토픽에 대한 정보가 이미 존재하는 경우 덮어씁니다.
|
|
|
|
|
|
/// </remarks>
|
2025-06-06 02:17:54 +09:00
|
|
|
|
public void Add(MQTTPipeLineInfo info)
|
|
|
|
|
|
{
|
2025-06-09 19:29:59 +09:00
|
|
|
|
if (!infoList.ContainsKey(info.Topic))
|
2025-06-06 02:17:54 +09:00
|
|
|
|
{
|
2025-06-09 19:29:59 +09:00
|
|
|
|
infoList.Add(info.Topic, info);
|
2025-06-06 02:17:54 +09:00
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
2025-06-09 19:29:59 +09:00
|
|
|
|
infoList[info.Topic] = info;
|
2025-06-06 02:17:54 +09:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-06-09 19:29:59 +09:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 지정된 토픽에 대한 파이프라인 정보를 제거합니다.
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="topic">제거할 토픽 이름</param>
|
2025-06-06 02:17:54 +09:00
|
|
|
|
public void Remove(string topic)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (infoList.ContainsKey(topic))
|
|
|
|
|
|
{
|
|
|
|
|
|
infoList.Remove(topic);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-06-09 19:29:59 +09:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 파이프라인을 실행하여 MQTT 브로커에 연결하고 등록된 모든 토픽을 구독합니다.
|
|
|
|
|
|
/// </summary>
|
2025-06-06 02:17:54 +09:00
|
|
|
|
public void Execute()
|
|
|
|
|
|
{
|
|
|
|
|
|
foreach (var topic in infoList.Keys)
|
|
|
|
|
|
{
|
|
|
|
|
|
mqtt.AddTopicHandler(topic, OnTopicMessage);
|
|
|
|
|
|
}
|
|
|
|
|
|
mqtt.Connect();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-06-09 19:29:59 +09:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// MQTT 토픽으로 메시지가 수신되었을 때 호출되는 콜백 메서드입니다.
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="topic">수신된 메시지의 토픽</param>
|
|
|
|
|
|
/// <param name="message">수신된 메시지 내용</param>
|
|
|
|
|
|
/// <remarks>
|
|
|
|
|
|
/// 이 메서드는 수신된 메시지의 형식(JSON 객체 또는 배열)에 따라 적절한 파싱을 수행하고,
|
|
|
|
|
|
/// 등록된 데이터 매퍼를 통해 메시지를 변환한 후, 해당 토픽에 등록된 핸들러에게 전달합니다.
|
|
|
|
|
|
/// 'UpdatedDataOnly' 설정에 따라 데이터가 변경된 경우에만 핸들러를 호출할 수도 있습니다.
|
|
|
|
|
|
/// </remarks>
|
2025-06-06 02:17:54 +09:00
|
|
|
|
private void OnTopicMessage(string topic, string message)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (infoList.ContainsKey(topic))
|
|
|
|
|
|
{
|
|
|
|
|
|
MQTTPipeLineInfo info = infoList[topic];
|
|
|
|
|
|
IDataObject? dataObject = null;
|
|
|
|
|
|
if (!string.IsNullOrEmpty(message))
|
|
|
|
|
|
{
|
|
|
|
|
|
message = message.Trim();
|
|
|
|
|
|
if (message.StartsWith("{"))
|
|
|
|
|
|
{
|
|
|
|
|
|
JObject source = JObject.Parse(message);
|
2025-06-09 19:29:59 +09:00
|
|
|
|
if (info.DataMapper != null) dataObject = info.DataMapper.Mapping(source);
|
2025-06-06 02:17:54 +09:00
|
|
|
|
}
|
|
|
|
|
|
else if (message.StartsWith("["))
|
|
|
|
|
|
{
|
|
|
|
|
|
JArray source = JArray.Parse(message);
|
2025-06-09 19:29:59 +09:00
|
|
|
|
if (info.DataMapper != null) dataObject = info.DataMapper.Mapping(source);
|
2025-06-06 02:17:54 +09:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-06-09 19:29:59 +09:00
|
|
|
|
if (dataObject != null) dataObject = DataRepository.Instance.AddData(topic, dataObject, info.UpdatedDataOnly);
|
|
|
|
|
|
// 갱신 된 데이터가 있는 경우 핸들러 호출
|
|
|
|
|
|
if (info.UpdatedDataOnly)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (dataObject != null && dataObject.UpdatedCount > 0) info.Handler?.Invoke(dataObject);
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
info.Handler?.Invoke(dataObject);
|
|
|
|
|
|
}
|
2025-06-06 02:17:54 +09:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-06-09 19:29:59 +09:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 파이프라인을 중지하고 모든 토픽 구독을 해제한 후 MQTT 브로커와의 연결을 종료합니다.
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public void Stop()
|
|
|
|
|
|
{
|
|
|
|
|
|
foreach (var topic in infoList.Keys)
|
|
|
|
|
|
{
|
|
|
|
|
|
mqtt.RemoveTopicHandler(topic, OnTopicMessage);
|
|
|
|
|
|
}
|
|
|
|
|
|
mqtt.Disconnect();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 현재 인스턴스에서 사용되는 모든 리소스를 해제하고 진행 중인 모든 작업을 중지합니다.
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <remarks>이 메서드는 모든 반복 작업을 중단하고, 내부 상태를 지우고, 관련 리소스를 삭제합니다.
|
|
|
|
|
|
/// <see cref="Dispose"/>를 호출한 후에는 해당 인스턴스를 더 이상 사용할 수 없습니다.</remarks>
|
|
|
|
|
|
public void Dispose()
|
|
|
|
|
|
{
|
|
|
|
|
|
mqtt.Disconnect();
|
|
|
|
|
|
infoList.Clear();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-06-05 20:09:28 +09:00
|
|
|
|
}
|
|
|
|
|
|
}
|