using Best.MQTT;
using Best.MQTT.Packets.Builders;
using Cysharp.Threading.Tasks;
using System;
using System.Collections.Concurrent;
using System.Text;
using UnityEngine;
using UVC.Log;
namespace UVC.Network
{
///
/// MQTT 클라이언트를 관리하고 메시지 송수신을 처리하는 서비스 클래스입니다.
/// 이 클래스는 MQTT 브로커와의 연결 관리, 토픽 구독, 메시지 발행 등의 기능을 제공합니다.
///
///
/// 이 클래스는 스레드 안전한 방식으로 토픽 핸들러를 관리하며, 연결 끊김 시 자동 재연결 기능을 제공합니다.
/// 내부적으로 Best.MQTT 라이브러리를 사용하여 MQTT 프로토콜 통신을 구현합니다.
/// WebGL 플랫폼에서는 스레드풀이 지원되지 않으므로, 메시지 핸들러는 메인 스레드에서 실행됩니다.
///
public class MQTTService
{
private string MQTTDomain = "localhost";
private int MQTTPort = 1883;
private MQTTClient client;
private bool autoReconnect = true; // 자동 재연결 여부
private int reconnectDelay = 1000; // 재연결 시도 간격 (ms)
private bool onBackgroundThread = true; // 핸들러를 백그라운드 스레드에서 호출할지 여부
private ConcurrentDictionary> topicHandler;
///
/// MQTT 브로커와의 연결 상태를 확인합니다.
///
/// 클라이언트가 초기화되고 브로커에 연결된 경우 true를 반환합니다.
public bool IsConnected => client != null && client.State == ClientStates.Connected;
private bool useTLS = false;
private bool useWebSocket = false;
///
/// MQTTService 인스턴스를 생성합니다.
///
/// MQTT 브로커의 호스트명 또는 IP 주소입니다.
/// MQTT 브로커의 포트 번호입니다.
/// WebSocket을 통해 연결할지 여부입니다.
/// TLS 암호화를 사용할지 여부입니다.
/// 연결이 끊겼을 때 자동으로 재연결을 시도할지 여부입니다.
/// Handler를 백그라운드 스레드에서 호출 할지 여부
///
///
/// // localhost의 기본 MQTT 포트(1883)에 연결하는 서비스 생성
/// var mqttService = new MQTTService("localhost", 1883);
///
/// // 자동 재연결 비활성화
/// var mqttService = new MQTTService("mqtt.example.com", 8883, false);
///
///
public MQTTService(string domain, int port = 1883, bool useWebSocket = false, bool useTLS = false, bool autoReconnect = true, bool onBackground = true)
{
topicHandler = new ConcurrentDictionary>();
MQTTDomain = domain;
MQTTPort = port;
this.autoReconnect = autoReconnect;
this.useTLS = useTLS;
this.useWebSocket = useWebSocket;
}
///
/// MQTT 클라이언트를 생성하고 이벤트 핸들러를 등록합니다.
///
///
/// 이 메서드는 클라이언트가 이미 존재하는 경우 기존 연결을 종료한 후 새로운 클라이언트를 생성합니다.
///
private void CreateMQTTClient()
{
ULog.Debug($"MQTT Domain:{MQTTDomain} , MQTTPORT:{MQTTPort}");
var builder = new ConnectionOptionsBuilder();
if (useWebSocket) {
builder.WithWebSocket(MQTTDomain, MQTTPort)
.WithPath("/mqtt"); // 필요 시 "/ws" 등 브로커/프록시 설정과 일치하게 변경
}
else {
#if !UNITY_WEBGL || UNITY_EDITOR
builder.WithTCP(MQTTDomain, MQTTPort);
#endif
}
if(useTLS) builder.WithTLS();
var options = builder.Build();
if (client != null) Disconnect();
client = new MQTTClient(options);
client.OnConnected += OnConnectedMQTT;
client.OnStateChanged += OnStateChangedMQTT;
client.OnDisconnect += OnDisconnectedMQTT;
client.OnError += OnErrorMQTT;
}
///
/// 특정 MQTT 토픽에 대한 메시지 핸들러를 추가합니다.
///
/// 구독할 MQTT 토픽입니다.
/// 토픽에 메시지가 수신되면 호출될 핸들러입니다.
/// 첫 번째 매개변수는 토픽이고 두 번째 매개변수는 메시지 내용입니다.
///
/// 하나의 토픽에 여러 핸들러를 등록할 수 있으며, 메시지 수신 시 모든 핸들러가 호출됩니다.
///
///
///
/// // MQTT 서비스 인스턴스 생성
/// var mqtt = new MQTTService("localhost", 1883);
///
/// // 핸들러 등록 및 연결
/// mqtt.AddTopicHandler("sensor/temperature", (topic, message) => {
/// Debug.Log($"온도 데이터 수신: {message}");
/// });
/// mqtt.Connect();
///
///
public void AddTopicHandler(string topic, Action handler)
{
topicHandler.AddOrUpdate(
topic,
// 키가 없을 때 새로운 핸들러 추가
handler,
// 키가 이미 있을 때 기존 핸들러에 추가
(_, existingHandler) => existingHandler + handler
);
}
///
/// 특정 MQTT 토픽에서 메시지 핸들러를 제거합니다.
///
/// 핸들러를 제거할 MQTT 토픽입니다.
/// 제거할 메시지 핸들러입니다.
///
/// 지정된 토픽에 대한 모든 핸들러가 제거되면 해당 토픽에 대한 키도 제거됩니다.
///
///
///
/// // 핸들러 정의
/// Action temperatureHandler = (topic, message) => {
/// Debug.Log($"온도 데이터 수신: {message}");
/// };
///
/// // 핸들러 등록
/// mqtt.AddTopicHandler("sensor/temperature", temperatureHandler);
///
/// // 나중에 핸들러 제거
/// mqtt.RemoveTopicHandler("sensor/temperature", temperatureHandler);
///
///
public void RemoveTopicHandler(string topic, Action handler)
{
topicHandler.AddOrUpdate(
topic,
// 키가 없는 경우 - 여기서는 발생하면 안 됨
_ => null,
// 기존 핸들러에서 제거
(_, existingHandler) =>
{
var updatedHandler = existingHandler - handler;
return updatedHandler;
}
);
// 핸들러가 null이면 키 자체를 제거
if (topicHandler.TryGetValue(topic, out var currentHandler) && currentHandler == null)
{
topicHandler.TryRemove(topic, out _);
}
}
///
/// 모든 토픽 핸들러를 제거합니다.
///
///
/// 이 메서드는 모든 토픽 구독을 효과적으로 취소합니다. 다음에 연결할 때
/// 새로운 핸들러를 추가해야 합니다.
///
///
///
/// // 모든 핸들러 제거
/// mqtt.ClearTopicHandlers();
///
///
public void ClearTopicHandlers()
{
topicHandler.Clear();
}
///
/// MQTT 브로커에 연결합니다.
///
///
/// 이 메서드는 새 MQTT 클라이언트를 생성하고 브로커에 연결을 시작합니다.
/// 연결 성공 시 OnConnectedMQTT 이벤트 핸들러가 호출되며, 이때 등록된 모든 토픽을 구독합니다.
///
///
///
/// // MQTT 서비스 인스턴스 생성
/// var mqtt = new MQTTService("test.mosquitto.org", 1883);
///
/// // 토픽 핸들러 추가 후 연결
/// mqtt.AddTopicHandler("test/topic", (topic, message) => {
/// Debug.Log($"메시지 수신: {message}");
/// });
/// mqtt.Connect();
///
///
public void Connect()
{
CreateMQTTClient();
client.BeginConnect(ConnectPacketBuilderCallback);
}
///
/// MQTT 브로커와의 연결을 종료합니다.
///
///
/// 이 메서드는 모든 이벤트 핸들러를 제거하고 연결된 경우 MQTT 브로커로 정상 연결 종료 패킷을 전송합니다.
///
///
///
/// // 사용이 끝난 후 연결 종료
/// mqtt.Disconnect();
///
///
public void Disconnect()
{
if (client != null)
{
// 이벤트 핸들러 제거
client.OnConnected -= OnConnectedMQTT;
client.OnStateChanged -= OnStateChangedMQTT;
client.OnDisconnect -= OnDisconnectedMQTT;
client.OnError -= OnErrorMQTT;
if (IsConnected)
{
client.CreateDisconnectPacketBuilder()
.WithReasonCode(DisconnectReasonCodes.NormalDisconnection)
.WithReasonString("Bye")
.BeginDisconnect();
}
client = null;
}
}
///
/// 연결 패킷 빌더를 구성하는 콜백 메서드입니다.
///
/// MQTT 클라이언트 인스턴스
/// 연결 패킷 빌더
/// 구성된 연결 패킷 빌더
private ConnectPacketBuilder ConnectPacketBuilderCallback(MQTTClient client, ConnectPacketBuilder builder)
{
return builder.WithKeepAlive(60 * 60);//keep alive 1
}
///
/// MQTT 브로커에 연결되었을 때 호출되는 이벤트 핸들러입니다.
///
/// 연결된 MQTT 클라이언트
///
/// 이 메서드는 연결 성공 시 등록된 모든 토픽에 대한 구독을 시작합니다.
///
private void OnConnectedMQTT(MQTTClient client)
{
ULog.Debug($"MQTT OnConnected");
BulkSubscribePacketBuilder builder = client.CreateBulkSubscriptionBuilder();
foreach (var topic in topicHandler.Keys)
{
builder.WithTopic(new SubscribeTopicBuilder(topic).WithMessageCallback(OnTopic));
}
builder.BeginSubscribe();
}
///
/// MQTT 클라이언트의 상태가 변경되었을 때 호출되는 이벤트 핸들러입니다.
///
/// MQTT 클라이언트
/// 이전 상태
/// 새 상태
private void OnStateChangedMQTT(MQTTClient client, ClientStates oldState, ClientStates newState)
{
ULog.Debug($"MQTT OnStateChanged {oldState} => {newState}");
}
///
/// MQTT 브로커와의 연결이 종료되었을 때 호출되는 이벤트 핸들러입니다.
///
/// MQTT 클라이언트
/// 연결 종료 이유 코드
/// 연결 종료 이유 문자열
///
/// 자동 재연결이 활성화된 경우 지정된 지연 시간 후에 재연결을 시도합니다.
///
private void OnDisconnectedMQTT(MQTTClient client, DisconnectReasonCodes code, string reason)
{
//ULog.Info($"MQTT Disconnected - code: {code}, reason: '{reason}'");
ULog.Debug($"MQTTDisconnected pcTime={DateTime.Now}");
// 지연 후에만 한 번 연결
if (Application.isPlaying && autoReconnect)
{
UniTask.Delay(reconnectDelay).ContinueWith(() =>
{
Connect();
}).Forget();
}
}
///
/// MQTT 클라이언트에서 오류가 발생했을 때 호출되는 이벤트 핸들러입니다.
///
/// MQTT 클라이언트
/// 오류 발생 이유
///
/// 이 이벤트 후에는 OnDisconnectedMQTT 이벤트도 발생합니다.
///
private void OnErrorMQTT(MQTTClient client, string reason)
{
ULog.Error($"MQTT OnError reason: '{reason}'");
}
///
/// 구독한 토픽으로 메시지가 수신되었을 때 호출되는 콜백입니다.
///
/// MQTT 클라이언트
/// 구독 토픽 정보
/// 수신된 메시지의 토픽 이름
/// 수신된 메시지
///
/// 이 메서드는 메시지를 로깅하고 해당 토픽에 등록된 모든 핸들러를 호출합니다.
///
private void OnTopic(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
{
if (useWebSocket)
{
if (PlayerLoopHelper.IsMainThread)
OnTopicLogic(client, topic, topicName, message);
else
UniTask.Post(() => OnTopicLogic(client, topic, topicName, message));
}
else
{
if (onBackgroundThread)
{
UniTask.RunOnThreadPool(() => OnTopicLogic(client, topic, topicName, message)).Forget();
}
else
{
if (PlayerLoopHelper.IsMainThread)
OnTopicLogic(client, topic, topicName, message);
else
UniTask.Post(() => OnTopicLogic(client, topic, topicName, message));
}
}
}
private void OnTopicLogic(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
{
//Debug.Log($"MQTT OnTopicLogic isMainThread={PlayerLoopHelper.IsMainThread}");
string payload = Encoding.UTF8.GetString(message.Payload.Data, message.Payload.Offset, message.Payload.Count);
//ULog.Debug($"MQTT OnTopic {topic.Filter.OriginalFilter} => {payload}");
#if !UNITY_WEBGL || UNITY_EDITOR
ServerLog.LogMqtt(MQTTDomain, MQTTPort.ToString(), topic.Filter.OriginalFilter, payload, DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"));
#endif
if(payload.Trim().Length > 0)
{
if (topicHandler.TryGetValue(topic.Filter.OriginalFilter, out var handler))
{
handler.Invoke(topic.Filter.OriginalFilter, payload);
}
}
}
///
/// 지정된 토픽으로 메시지를 발행(publish)합니다.
///
/// 메시지를 발행할 토픽입니다.
/// 발행할 메시지 내용입니다.
///
/// 이 메서드는 클라이언트가 연결되어 있지 않으면 오류 로그를 기록하고 메시지를 발행하지 않습니다.
///
///
///
/// // MQTT 서비스 인스턴스 생성 및 연결
/// var mqtt = new MQTTService("localhost", 1883);
/// mqtt.Connect();
///
/// // 메시지 발행
/// mqtt.Publish("device/status", "online");
///
/// // JSON 형식 메시지 발행
/// string jsonMessage = JsonUtility.ToJson(new DeviceStatus { status = "online", battery = 95 });
/// mqtt.Publish("device/status", jsonMessage);
///
///
public void Publish(string topic, string message)
{
if (client == null || client.State != ClientStates.Connected)
{
ULog.Error("MQTT client is not connected. Cannot publish message.");
return;
}
client.CreateApplicationMessageBuilder(topic)
.WithPayload(message)
.BeginPublish();
}
///
/// 현재 인스턴스에서 사용되는 모든 리소스를 해제하고 진행 중인 모든 작업을 중지합니다.
///
/// 이 메서드는 모든 반복 작업을 중단하고, 내부 상태를 지우고, 관련 리소스를 삭제합니다.
/// 를 호출한 후에는 해당 인스턴스를 더 이상 사용할 수 없습니다.
public void Dispose()
{
Disconnect();
topicHandler.Clear();
client = null;
}
}
}