Files
XRLib/Assets/Scripts/UVC/Network/MQTTService.cs

383 lines
16 KiB
C#
Raw Normal View History

2025-06-04 23:10:11 +09:00
using Best.MQTT;
using Best.MQTT.Packets.Builders;
using Cysharp.Threading.Tasks;
using System;
using System.Collections.Concurrent;
using System.Text;
using UnityEngine;
using UVC.Log;
namespace UVC.network
{
/// <summary>
/// MQTT 클라이언트를 관리하고 메시지 송수신을 처리하는 서비스 클래스입니다.
/// 이 클래스는 MQTT 브로커와의 연결 관리, 토픽 구독, 메시지 발행 등의 기능을 제공합니다.
/// </summary>
/// <remarks>
/// 이 클래스는 스레드 안전한 방식으로 토픽 핸들러를 관리하며, 연결 끊김 시 자동 재연결 기능을 제공합니다.
/// 내부적으로 Best.MQTT 라이브러리를 사용하여 MQTT 프로토콜 통신을 구현합니다.
/// </remarks>
public class MQTTService
{
private string MQTTDomain = "localhost";
private int MQTTPort = 1883;
private MQTTClient client;
private bool autoReconnect = true; // 자동 재연결 여부
private int reconnectDelay = 1000; // 재연결 시도 간격 (ms)
private ConcurrentDictionary<string, Action<string, string>> topicHandler;
/// <summary>
/// MQTT 브로커와의 연결 상태를 확인합니다.
/// </summary>
/// <value>클라이언트가 초기화되고 브로커에 연결된 경우 <c>true</c>를 반환합니다.</value>
public bool IsConnected => client != null && client.State == ClientStates.Connected;
/// <summary>
/// MQTTService 인스턴스를 생성합니다.
/// </summary>
/// <param name="domain">MQTT 브로커의 호스트명 또는 IP 주소입니다.</param>
/// <param name="port">MQTT 브로커의 포트 번호입니다.</param>
/// <param name="autoReconnect">연결이 끊겼을 때 자동으로 재연결을 시도할지 여부입니다.</param>
/// <example>
/// <code>
/// // localhost의 기본 MQTT 포트(1883)에 연결하는 서비스 생성
/// var mqttService = new MQTTService("localhost", 1883);
///
/// // 자동 재연결 비활성화
/// var mqttService = new MQTTService("mqtt.example.com", 8883, false);
/// </code>
/// </example>
public MQTTService(string domain, int port = 1883, bool autoReconnect = true)
{
topicHandler = new ConcurrentDictionary<string, Action<string, string>>();
MQTTDomain = domain;
MQTTPort = port;
this.autoReconnect = autoReconnect;
}
/// <summary>
/// MQTT 클라이언트를 생성하고 이벤트 핸들러를 등록합니다.
/// </summary>
/// <remarks>
/// 이 메서드는 클라이언트가 이미 존재하는 경우 기존 연결을 종료한 후 새로운 클라이언트를 생성합니다.
/// </remarks>
private void CreateMQTTClient()
{
2025-06-10 20:16:35 +09:00
ULog.Debug($"MQTT Domain:{MQTTDomain} , MQTTPORT:{MQTTPort}");
2025-06-04 23:10:11 +09:00
var options = new ConnectionOptionsBuilder()
.WithTCP(MQTTDomain, MQTTPort)
.Build();
if (client != null) Disconnect();
client = new MQTTClient(options);
client.OnConnected += OnConnectedMQTT;
client.OnStateChanged += OnStateChangedMQTT;
client.OnDisconnect += OnDisconnectedMQTT;
client.OnError += OnErrorMQTT;
}
/// <summary>
/// 특정 MQTT 토픽에 대한 메시지 핸들러를 추가합니다.
/// </summary>
/// <param name="topic">구독할 MQTT 토픽입니다.</param>
/// <param name="handler">토픽에 메시지가 수신되면 호출될 핸들러입니다.
/// 첫 번째 매개변수는 토픽이고 두 번째 매개변수는 메시지 내용입니다.</param>
/// <remarks>
/// 하나의 토픽에 여러 핸들러를 등록할 수 있으며, 메시지 수신 시 모든 핸들러가 호출됩니다.
/// </remarks>
/// <example>
/// <code>
/// // MQTT 서비스 인스턴스 생성
/// var mqtt = new MQTTService("localhost", 1883);
///
/// // 핸들러 등록 및 연결
/// mqtt.AddTopicHandler("sensor/temperature", (topic, message) => {
/// Debug.Log($"온도 데이터 수신: {message}");
/// });
/// mqtt.Connect();
/// </code>
/// </example>
public void AddTopicHandler(string topic, Action<string, string> handler)
{
topicHandler.AddOrUpdate(
topic,
// 키가 없을 때 새로운 핸들러 추가
handler,
// 키가 이미 있을 때 기존 핸들러에 추가
(_, existingHandler) => existingHandler + handler
);
}
/// <summary>
/// 특정 MQTT 토픽에서 메시지 핸들러를 제거합니다.
/// </summary>
/// <param name="topic">핸들러를 제거할 MQTT 토픽입니다.</param>
/// <param name="handler">제거할 메시지 핸들러입니다.</param>
/// <remarks>
/// 지정된 토픽에 대한 모든 핸들러가 제거되면 해당 토픽에 대한 키도 제거됩니다.
/// </remarks>
/// <example>
/// <code>
/// // 핸들러 정의
2025-06-10 01:09:36 +09:00
/// Action<string, string> temperatureHandler = (topic, message) => {
2025-06-04 23:10:11 +09:00
/// Debug.Log($"온도 데이터 수신: {message}");
/// };
///
/// // 핸들러 등록
/// mqtt.AddTopicHandler("sensor/temperature", temperatureHandler);
///
/// // 나중에 핸들러 제거
/// mqtt.RemoveTopicHandler("sensor/temperature", temperatureHandler);
/// </code>
/// </example>
public void RemoveTopicHandler(string topic, Action<string, string> handler)
{
topicHandler.AddOrUpdate(
topic,
// 키가 없는 경우 - 여기서는 발생하면 안 됨
_ => null,
// 기존 핸들러에서 제거
(_, existingHandler) =>
{
var updatedHandler = existingHandler - handler;
return updatedHandler;
}
);
// 핸들러가 null이면 키 자체를 제거
if (topicHandler.TryGetValue(topic, out var currentHandler) && currentHandler == null)
{
topicHandler.TryRemove(topic, out _);
}
}
/// <summary>
/// 모든 토픽 핸들러를 제거합니다.
/// </summary>
/// <remarks>
/// 이 메서드는 모든 토픽 구독을 효과적으로 취소합니다. 다음에 연결할 때
/// 새로운 핸들러를 추가해야 합니다.
/// </remarks>
/// <example>
/// <code>
/// // 모든 핸들러 제거
/// mqtt.ClearTopicHandlers();
/// </code>
/// </example>
public void ClearTopicHandlers()
{
topicHandler.Clear();
}
/// <summary>
/// MQTT 브로커에 연결합니다.
/// </summary>
/// <remarks>
/// 이 메서드는 새 MQTT 클라이언트를 생성하고 브로커에 연결을 시작합니다.
/// 연결 성공 시 OnConnectedMQTT 이벤트 핸들러가 호출되며, 이때 등록된 모든 토픽을 구독합니다.
/// </remarks>
/// <example>
/// <code>
/// // MQTT 서비스 인스턴스 생성
/// var mqtt = new MQTTService("test.mosquitto.org", 1883);
///
/// // 토픽 핸들러 추가 후 연결
/// mqtt.AddTopicHandler("test/topic", (topic, message) => {
/// Debug.Log($"메시지 수신: {message}");
/// });
/// mqtt.Connect();
/// </code>
/// </example>
public void Connect()
{
CreateMQTTClient();
client.BeginConnect(ConnectPacketBuilderCallback);
}
/// <summary>
/// MQTT 브로커와의 연결을 종료합니다.
/// </summary>
/// <remarks>
/// 이 메서드는 모든 이벤트 핸들러를 제거하고 연결된 경우 MQTT 브로커로 정상 연결 종료 패킷을 전송합니다.
/// </remarks>
/// <example>
/// <code>
/// // 사용이 끝난 후 연결 종료
/// mqtt.Disconnect();
/// </code>
/// </example>
public void Disconnect()
{
if (client != null)
{
// 이벤트 핸들러 제거
client.OnConnected -= OnConnectedMQTT;
client.OnStateChanged -= OnStateChangedMQTT;
client.OnDisconnect -= OnDisconnectedMQTT;
client.OnError -= OnErrorMQTT;
if (IsConnected)
{
client.CreateDisconnectPacketBuilder()
.WithReasonCode(DisconnectReasonCodes.NormalDisconnection)
.WithReasonString("Bye")
.BeginDisconnect();
}
client = null;
}
}
/// <summary>
/// 연결 패킷 빌더를 구성하는 콜백 메서드입니다.
/// </summary>
/// <param name="client">MQTT 클라이언트 인스턴스</param>
/// <param name="builder">연결 패킷 빌더</param>
/// <returns>구성된 연결 패킷 빌더</returns>
private ConnectPacketBuilder ConnectPacketBuilderCallback(MQTTClient client, ConnectPacketBuilder builder)
{
return builder.WithKeepAlive(60 * 60);//keep alive 1
}
/// <summary>
/// MQTT 브로커에 연결되었을 때 호출되는 이벤트 핸들러입니다.
/// </summary>
/// <param name="client">연결된 MQTT 클라이언트</param>
/// <remarks>
/// 이 메서드는 연결 성공 시 등록된 모든 토픽에 대한 구독을 시작합니다.
/// </remarks>
private void OnConnectedMQTT(MQTTClient client)
{
2025-06-10 20:16:35 +09:00
ULog.Debug($"MQTT OnConnected");
2025-06-04 23:10:11 +09:00
BulkSubscribePacketBuilder builder = client.CreateBulkSubscriptionBuilder();
foreach (var topic in topicHandler.Keys)
{
builder.WithTopic(new SubscribeTopicBuilder(topic).WithMessageCallback(OnTopic));
}
builder.BeginSubscribe();
}
/// <summary>
/// MQTT 클라이언트의 상태가 변경되었을 때 호출되는 이벤트 핸들러입니다.
/// </summary>
/// <param name="client">MQTT 클라이언트</param>
/// <param name="oldState">이전 상태</param>
/// <param name="newState">새 상태</param>
private void OnStateChangedMQTT(MQTTClient client, ClientStates oldState, ClientStates newState)
{
2025-06-10 20:16:35 +09:00
ULog.Debug($"MQTT OnStateChanged {oldState} => {newState}");
2025-06-04 23:10:11 +09:00
}
/// <summary>
/// MQTT 브로커와의 연결이 종료되었을 때 호출되는 이벤트 핸들러입니다.
/// </summary>
/// <param name="client">MQTT 클라이언트</param>
/// <param name="code">연결 종료 이유 코드</param>
/// <param name="reason">연결 종료 이유 문자열</param>
/// <remarks>
/// 자동 재연결이 활성화된 경우 지정된 지연 시간 후에 재연결을 시도합니다.
/// </remarks>
private void OnDisconnectedMQTT(MQTTClient client, DisconnectReasonCodes code, string reason)
{
//ULog.Info($"MQTT Disconnected - code: {code}, reason: '{reason}'");
2025-06-10 20:16:35 +09:00
ULog.Debug($"MQTTDisconnected pcTime={DateTime.Now}");
2025-06-04 23:10:11 +09:00
// 지연 후에만 한 번 연결
if (Application.isPlaying && autoReconnect)
{
UniTask.Delay(reconnectDelay).ContinueWith(() =>
{
Connect();
}).Forget();
}
}
/// <summary>
/// MQTT 클라이언트에서 오류가 발생했을 때 호출되는 이벤트 핸들러입니다.
/// </summary>
/// <param name="client">MQTT 클라이언트</param>
/// <param name="reason">오류 발생 이유</param>
/// <remarks>
/// 이 이벤트 후에는 OnDisconnectedMQTT 이벤트도 발생합니다.
/// </remarks>
private void OnErrorMQTT(MQTTClient client, string reason)
{
2025-06-10 20:16:35 +09:00
ULog.Error($"MQTT OnError reason: '{reason}'", new Exception(reason));
2025-06-04 23:10:11 +09:00
}
/// <summary>
/// 구독한 토픽으로 메시지가 수신되었을 때 호출되는 콜백입니다.
/// </summary>
/// <param name="client">MQTT 클라이언트</param>
/// <param name="topic">구독 토픽 정보</param>
/// <param name="topicName">수신된 메시지의 토픽 이름</param>
/// <param name="message">수신된 메시지</param>
/// <remarks>
/// 이 메서드는 메시지를 로깅하고 해당 토픽에 등록된 모든 핸들러를 호출합니다.
/// </remarks>
private void OnTopic(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
{
string payload = Encoding.UTF8.GetString(message.Payload.Data, message.Payload.Offset, message.Payload.Count);
2025-06-10 20:16:35 +09:00
ULog.Debug($"MQTT OnTopic {topic.Filter.OriginalFilter} => {payload}");
2025-06-04 23:10:11 +09:00
ServerLog.LogMqtt(MQTTDomain, MQTTPort.ToString(), topic.Filter.OriginalFilter, payload, DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"));
2025-06-11 00:36:58 +09:00
if(payload.Trim().Length > 0)
2025-06-04 23:10:11 +09:00
{
2025-06-11 00:36:58 +09:00
if (topicHandler.TryGetValue(topic.Filter.OriginalFilter, out var handler))
{
// 등록된 핸들러가 있으면 호출
handler.Invoke(topic.Filter.OriginalFilter, payload);
}
2025-06-04 23:10:11 +09:00
}
}
/// <summary>
/// 지정된 토픽으로 메시지를 발행(publish)합니다.
/// </summary>
/// <param name="topic">메시지를 발행할 토픽입니다.</param>
/// <param name="message">발행할 메시지 내용입니다.</param>
/// <remarks>
/// 이 메서드는 클라이언트가 연결되어 있지 않으면 오류 로그를 기록하고 메시지를 발행하지 않습니다.
/// </remarks>
/// <example>
/// <code>
/// // MQTT 서비스 인스턴스 생성 및 연결
/// var mqtt = new MQTTService("localhost", 1883);
/// mqtt.Connect();
///
/// // 메시지 발행
/// mqtt.Publish("device/status", "online");
///
/// // JSON 형식 메시지 발행
/// string jsonMessage = JsonUtility.ToJson(new DeviceStatus { status = "online", battery = 95 });
/// mqtt.Publish("device/status", jsonMessage);
/// </code>
/// </example>
public void Publish(string topic, string message)
{
if (client == null || client.State != ClientStates.Connected)
{
2025-06-10 20:16:35 +09:00
ULog.Error("MQTT client is not connected. Cannot publish message.", new Exception("MQTT client is not connected. Cannot publish message."));
2025-06-04 23:10:11 +09:00
return;
}
client.CreateApplicationMessageBuilder(topic)
.WithPayload(message)
.BeginPublish();
}
/// <summary>
/// 현재 인스턴스에서 사용되는 모든 리소스를 해제하고 진행 중인 모든 작업을 중지합니다.
/// </summary>
/// <remarks>이 메서드는 모든 반복 작업을 중단하고, 내부 상태를 지우고, 관련 리소스를 삭제합니다.
/// <see cref="Dispose"/>를 호출한 후에는 해당 인스턴스를 더 이상 사용할 수 없습니다.</remarks>
public void Dispose()
{
Disconnect();
topicHandler.Clear();
client = null;
}
2025-06-04 23:10:11 +09:00
}
}