408 lines
17 KiB
C#
408 lines
17 KiB
C#
using Best.MQTT;
|
|
using Best.MQTT.Packets.Builders;
|
|
using Cysharp.Threading.Tasks;
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Text;
|
|
using UnityEngine;
|
|
using UVC.Log;
|
|
|
|
namespace UVC.network
|
|
{
|
|
/// <summary>
|
|
/// MQTT 클라이언트를 관리하고 메시지 송수신을 처리하는 서비스 클래스입니다.
|
|
/// 이 클래스는 MQTT 브로커와의 연결 관리, 토픽 구독, 메시지 발행 등의 기능을 제공합니다.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// 이 클래스는 스레드 안전한 방식으로 토픽 핸들러를 관리하며, 연결 끊김 시 자동 재연결 기능을 제공합니다.
|
|
/// 내부적으로 Best.MQTT 라이브러리를 사용하여 MQTT 프로토콜 통신을 구현합니다.
|
|
/// </remarks>
|
|
public class MQTTService
|
|
{
|
|
private string MQTTDomain = "localhost";
|
|
private int MQTTPort = 1883;
|
|
|
|
private MQTTClient client;
|
|
|
|
private bool autoReconnect = true; // 자동 재연결 여부
|
|
private int reconnectDelay = 1000; // 재연결 시도 간격 (ms)
|
|
private bool onBackgroundThread = true; // 핸들러를 백그라운드 스레드에서 호출할지 여부
|
|
|
|
private ConcurrentDictionary<string, Action<string, string>> topicHandler;
|
|
|
|
/// <summary>
|
|
/// MQTT 브로커와의 연결 상태를 확인합니다.
|
|
/// </summary>
|
|
/// <value>클라이언트가 초기화되고 브로커에 연결된 경우 <c>true</c>를 반환합니다.</value>
|
|
public bool IsConnected => client != null && client.State == ClientStates.Connected;
|
|
|
|
|
|
/// <summary>
|
|
/// MQTTService 인스턴스를 생성합니다.
|
|
/// </summary>
|
|
/// <param name="domain">MQTT 브로커의 호스트명 또는 IP 주소입니다.</param>
|
|
/// <param name="port">MQTT 브로커의 포트 번호입니다.</param>
|
|
/// <param name="autoReconnect">연결이 끊겼을 때 자동으로 재연결을 시도할지 여부입니다.</param>
|
|
/// <param name="onBackground">Handler를 백그라운드 스레드에서 호출 할지 여부</param>
|
|
/// <example>
|
|
/// <code>
|
|
/// // localhost의 기본 MQTT 포트(1883)에 연결하는 서비스 생성
|
|
/// var mqttService = new MQTTService("localhost", 1883);
|
|
///
|
|
/// // 자동 재연결 비활성화
|
|
/// var mqttService = new MQTTService("mqtt.example.com", 8883, false);
|
|
/// </code>
|
|
/// </example>
|
|
public MQTTService(string domain, int port = 1883, bool autoReconnect = true, bool onBackground = true)
|
|
{
|
|
topicHandler = new ConcurrentDictionary<string, Action<string, string>>();
|
|
MQTTDomain = domain;
|
|
MQTTPort = port;
|
|
this.autoReconnect = autoReconnect;
|
|
}
|
|
|
|
/// <summary>
|
|
/// MQTT 클라이언트를 생성하고 이벤트 핸들러를 등록합니다.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// 이 메서드는 클라이언트가 이미 존재하는 경우 기존 연결을 종료한 후 새로운 클라이언트를 생성합니다.
|
|
/// </remarks>
|
|
private void CreateMQTTClient()
|
|
{
|
|
ULog.Debug($"MQTT Domain:{MQTTDomain} , MQTTPORT:{MQTTPort}");
|
|
var options = new ConnectionOptionsBuilder()
|
|
.WithTCP(MQTTDomain, MQTTPort)
|
|
.Build();
|
|
|
|
if (client != null) Disconnect();
|
|
|
|
client = new MQTTClient(options);
|
|
client.OnConnected += OnConnectedMQTT;
|
|
client.OnStateChanged += OnStateChangedMQTT;
|
|
client.OnDisconnect += OnDisconnectedMQTT;
|
|
client.OnError += OnErrorMQTT;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 특정 MQTT 토픽에 대한 메시지 핸들러를 추가합니다.
|
|
/// </summary>
|
|
/// <param name="topic">구독할 MQTT 토픽입니다.</param>
|
|
/// <param name="handler">토픽에 메시지가 수신되면 호출될 핸들러입니다.
|
|
/// 첫 번째 매개변수는 토픽이고 두 번째 매개변수는 메시지 내용입니다.</param>
|
|
/// <remarks>
|
|
/// 하나의 토픽에 여러 핸들러를 등록할 수 있으며, 메시지 수신 시 모든 핸들러가 호출됩니다.
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// // MQTT 서비스 인스턴스 생성
|
|
/// var mqtt = new MQTTService("localhost", 1883);
|
|
///
|
|
/// // 핸들러 등록 및 연결
|
|
/// mqtt.AddTopicHandler("sensor/temperature", (topic, message) => {
|
|
/// Debug.Log($"온도 데이터 수신: {message}");
|
|
/// });
|
|
/// mqtt.Connect();
|
|
/// </code>
|
|
/// </example>
|
|
public void AddTopicHandler(string topic, Action<string, string> handler)
|
|
{
|
|
topicHandler.AddOrUpdate(
|
|
topic,
|
|
// 키가 없을 때 새로운 핸들러 추가
|
|
handler,
|
|
// 키가 이미 있을 때 기존 핸들러에 추가
|
|
(_, existingHandler) => existingHandler + handler
|
|
);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 특정 MQTT 토픽에서 메시지 핸들러를 제거합니다.
|
|
/// </summary>
|
|
/// <param name="topic">핸들러를 제거할 MQTT 토픽입니다.</param>
|
|
/// <param name="handler">제거할 메시지 핸들러입니다.</param>
|
|
/// <remarks>
|
|
/// 지정된 토픽에 대한 모든 핸들러가 제거되면 해당 토픽에 대한 키도 제거됩니다.
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// // 핸들러 정의
|
|
/// Action<string, string> temperatureHandler = (topic, message) => {
|
|
/// Debug.Log($"온도 데이터 수신: {message}");
|
|
/// };
|
|
///
|
|
/// // 핸들러 등록
|
|
/// mqtt.AddTopicHandler("sensor/temperature", temperatureHandler);
|
|
///
|
|
/// // 나중에 핸들러 제거
|
|
/// mqtt.RemoveTopicHandler("sensor/temperature", temperatureHandler);
|
|
/// </code>
|
|
/// </example>
|
|
public void RemoveTopicHandler(string topic, Action<string, string> handler)
|
|
{
|
|
topicHandler.AddOrUpdate(
|
|
topic,
|
|
// 키가 없는 경우 - 여기서는 발생하면 안 됨
|
|
_ => null,
|
|
// 기존 핸들러에서 제거
|
|
(_, existingHandler) =>
|
|
{
|
|
var updatedHandler = existingHandler - handler;
|
|
return updatedHandler;
|
|
}
|
|
);
|
|
|
|
// 핸들러가 null이면 키 자체를 제거
|
|
if (topicHandler.TryGetValue(topic, out var currentHandler) && currentHandler == null)
|
|
{
|
|
topicHandler.TryRemove(topic, out _);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 모든 토픽 핸들러를 제거합니다.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// 이 메서드는 모든 토픽 구독을 효과적으로 취소합니다. 다음에 연결할 때
|
|
/// 새로운 핸들러를 추가해야 합니다.
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// // 모든 핸들러 제거
|
|
/// mqtt.ClearTopicHandlers();
|
|
/// </code>
|
|
/// </example>
|
|
public void ClearTopicHandlers()
|
|
{
|
|
topicHandler.Clear();
|
|
}
|
|
|
|
/// <summary>
|
|
/// MQTT 브로커에 연결합니다.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// 이 메서드는 새 MQTT 클라이언트를 생성하고 브로커에 연결을 시작합니다.
|
|
/// 연결 성공 시 OnConnectedMQTT 이벤트 핸들러가 호출되며, 이때 등록된 모든 토픽을 구독합니다.
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// // MQTT 서비스 인스턴스 생성
|
|
/// var mqtt = new MQTTService("test.mosquitto.org", 1883);
|
|
///
|
|
/// // 토픽 핸들러 추가 후 연결
|
|
/// mqtt.AddTopicHandler("test/topic", (topic, message) => {
|
|
/// Debug.Log($"메시지 수신: {message}");
|
|
/// });
|
|
/// mqtt.Connect();
|
|
/// </code>
|
|
/// </example>
|
|
public void Connect()
|
|
{
|
|
CreateMQTTClient();
|
|
client.BeginConnect(ConnectPacketBuilderCallback);
|
|
}
|
|
|
|
/// <summary>
|
|
/// MQTT 브로커와의 연결을 종료합니다.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// 이 메서드는 모든 이벤트 핸들러를 제거하고 연결된 경우 MQTT 브로커로 정상 연결 종료 패킷을 전송합니다.
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// // 사용이 끝난 후 연결 종료
|
|
/// mqtt.Disconnect();
|
|
/// </code>
|
|
/// </example>
|
|
public void Disconnect()
|
|
{
|
|
if (client != null)
|
|
{
|
|
// 이벤트 핸들러 제거
|
|
client.OnConnected -= OnConnectedMQTT;
|
|
client.OnStateChanged -= OnStateChangedMQTT;
|
|
client.OnDisconnect -= OnDisconnectedMQTT;
|
|
client.OnError -= OnErrorMQTT;
|
|
if (IsConnected)
|
|
{
|
|
client.CreateDisconnectPacketBuilder()
|
|
.WithReasonCode(DisconnectReasonCodes.NormalDisconnection)
|
|
.WithReasonString("Bye")
|
|
.BeginDisconnect();
|
|
}
|
|
client = null;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 연결 패킷 빌더를 구성하는 콜백 메서드입니다.
|
|
/// </summary>
|
|
/// <param name="client">MQTT 클라이언트 인스턴스</param>
|
|
/// <param name="builder">연결 패킷 빌더</param>
|
|
/// <returns>구성된 연결 패킷 빌더</returns>
|
|
private ConnectPacketBuilder ConnectPacketBuilderCallback(MQTTClient client, ConnectPacketBuilder builder)
|
|
{
|
|
return builder.WithKeepAlive(60 * 60);//keep alive 1
|
|
}
|
|
|
|
/// <summary>
|
|
/// MQTT 브로커에 연결되었을 때 호출되는 이벤트 핸들러입니다.
|
|
/// </summary>
|
|
/// <param name="client">연결된 MQTT 클라이언트</param>
|
|
/// <remarks>
|
|
/// 이 메서드는 연결 성공 시 등록된 모든 토픽에 대한 구독을 시작합니다.
|
|
/// </remarks>
|
|
private void OnConnectedMQTT(MQTTClient client)
|
|
{
|
|
ULog.Debug($"MQTT OnConnected");
|
|
BulkSubscribePacketBuilder builder = client.CreateBulkSubscriptionBuilder();
|
|
foreach (var topic in topicHandler.Keys)
|
|
{
|
|
builder.WithTopic(new SubscribeTopicBuilder(topic).WithMessageCallback(OnTopic));
|
|
}
|
|
builder.BeginSubscribe();
|
|
}
|
|
|
|
/// <summary>
|
|
/// MQTT 클라이언트의 상태가 변경되었을 때 호출되는 이벤트 핸들러입니다.
|
|
/// </summary>
|
|
/// <param name="client">MQTT 클라이언트</param>
|
|
/// <param name="oldState">이전 상태</param>
|
|
/// <param name="newState">새 상태</param>
|
|
private void OnStateChangedMQTT(MQTTClient client, ClientStates oldState, ClientStates newState)
|
|
{
|
|
ULog.Debug($"MQTT OnStateChanged {oldState} => {newState}");
|
|
}
|
|
|
|
/// <summary>
|
|
/// MQTT 브로커와의 연결이 종료되었을 때 호출되는 이벤트 핸들러입니다.
|
|
/// </summary>
|
|
/// <param name="client">MQTT 클라이언트</param>
|
|
/// <param name="code">연결 종료 이유 코드</param>
|
|
/// <param name="reason">연결 종료 이유 문자열</param>
|
|
/// <remarks>
|
|
/// 자동 재연결이 활성화된 경우 지정된 지연 시간 후에 재연결을 시도합니다.
|
|
/// </remarks>
|
|
private void OnDisconnectedMQTT(MQTTClient client, DisconnectReasonCodes code, string reason)
|
|
{
|
|
//ULog.Info($"MQTT Disconnected - code: {code}, reason: '{reason}'");
|
|
ULog.Debug($"MQTTDisconnected pcTime={DateTime.Now}");
|
|
// 지연 후에만 한 번 연결
|
|
if (Application.isPlaying && autoReconnect)
|
|
{
|
|
UniTask.Delay(reconnectDelay).ContinueWith(() =>
|
|
{
|
|
Connect();
|
|
}).Forget();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// MQTT 클라이언트에서 오류가 발생했을 때 호출되는 이벤트 핸들러입니다.
|
|
/// </summary>
|
|
/// <param name="client">MQTT 클라이언트</param>
|
|
/// <param name="reason">오류 발생 이유</param>
|
|
/// <remarks>
|
|
/// 이 이벤트 후에는 OnDisconnectedMQTT 이벤트도 발생합니다.
|
|
/// </remarks>
|
|
private void OnErrorMQTT(MQTTClient client, string reason)
|
|
{
|
|
ULog.Error($"MQTT OnError reason: '{reason}'");
|
|
}
|
|
|
|
/// <summary>
|
|
/// 구독한 토픽으로 메시지가 수신되었을 때 호출되는 콜백입니다.
|
|
/// </summary>
|
|
/// <param name="client">MQTT 클라이언트</param>
|
|
/// <param name="topic">구독 토픽 정보</param>
|
|
/// <param name="topicName">수신된 메시지의 토픽 이름</param>
|
|
/// <param name="message">수신된 메시지</param>
|
|
/// <remarks>
|
|
/// 이 메서드는 메시지를 로깅하고 해당 토픽에 등록된 모든 핸들러를 호출합니다.
|
|
/// </remarks>
|
|
private void OnTopic(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
|
|
{
|
|
// 메인 스레드에서 실행 중인지 확인합니다.
|
|
bool isMainThread = PlayerLoopHelper.IsMainThread;
|
|
//Debug.Log($"MQTT OnTopic isMainThread={isMainThread}, onBackgroundThread:{onBackgroundThread}, {topic.Filter.OriginalFilter}");
|
|
if (isMainThread && onBackgroundThread)
|
|
{
|
|
// 백그라운드 스레드에서 실행
|
|
UniTask.RunOnThreadPool(() => OnTopicLogic(client, topic, topicName, message)).Forget();
|
|
}
|
|
else if (!isMainThread && !onBackgroundThread)
|
|
{
|
|
// 메인 스레드에서 실행
|
|
UniTask.Post(() => OnTopicLogic(client, topic, topicName, message));
|
|
}
|
|
else
|
|
{
|
|
// 메인 스레드에서 실행
|
|
OnTopicLogic(client, topic, topicName, message);
|
|
}
|
|
}
|
|
|
|
private void OnTopicLogic(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
|
|
{
|
|
//Debug.Log($"MQTT OnTopicLogic isMainThread={PlayerLoopHelper.IsMainThread}");
|
|
string payload = Encoding.UTF8.GetString(message.Payload.Data, message.Payload.Offset, message.Payload.Count);
|
|
//ULog.Debug($"MQTT OnTopic {topic.Filter.OriginalFilter} => {payload}");
|
|
ServerLog.LogMqtt(MQTTDomain, MQTTPort.ToString(), topic.Filter.OriginalFilter, payload, DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"));
|
|
if(payload.Trim().Length > 0)
|
|
{
|
|
if (topicHandler.TryGetValue(topic.Filter.OriginalFilter, out var handler))
|
|
{
|
|
handler.Invoke(topic.Filter.OriginalFilter, payload);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// <summary>
|
|
/// 지정된 토픽으로 메시지를 발행(publish)합니다.
|
|
/// </summary>
|
|
/// <param name="topic">메시지를 발행할 토픽입니다.</param>
|
|
/// <param name="message">발행할 메시지 내용입니다.</param>
|
|
/// <remarks>
|
|
/// 이 메서드는 클라이언트가 연결되어 있지 않으면 오류 로그를 기록하고 메시지를 발행하지 않습니다.
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// // MQTT 서비스 인스턴스 생성 및 연결
|
|
/// var mqtt = new MQTTService("localhost", 1883);
|
|
/// mqtt.Connect();
|
|
///
|
|
/// // 메시지 발행
|
|
/// mqtt.Publish("device/status", "online");
|
|
///
|
|
/// // JSON 형식 메시지 발행
|
|
/// string jsonMessage = JsonUtility.ToJson(new DeviceStatus { status = "online", battery = 95 });
|
|
/// mqtt.Publish("device/status", jsonMessage);
|
|
/// </code>
|
|
/// </example>
|
|
public void Publish(string topic, string message)
|
|
{
|
|
if (client == null || client.State != ClientStates.Connected)
|
|
{
|
|
ULog.Error("MQTT client is not connected. Cannot publish message.");
|
|
return;
|
|
}
|
|
client.CreateApplicationMessageBuilder(topic)
|
|
.WithPayload(message)
|
|
.BeginPublish();
|
|
}
|
|
|
|
/// <summary>
|
|
/// 현재 인스턴스에서 사용되는 모든 리소스를 해제하고 진행 중인 모든 작업을 중지합니다.
|
|
/// </summary>
|
|
/// <remarks>이 메서드는 모든 반복 작업을 중단하고, 내부 상태를 지우고, 관련 리소스를 삭제합니다.
|
|
/// <see cref="Dispose"/>를 호출한 후에는 해당 인스턴스를 더 이상 사용할 수 없습니다.</remarks>
|
|
public void Dispose()
|
|
{
|
|
Disconnect();
|
|
topicHandler.Clear();
|
|
client = null;
|
|
}
|
|
}
|
|
}
|