using Best.MQTT; using Best.MQTT.Packets.Builders; using Cysharp.Threading.Tasks; using System; using System.Collections.Concurrent; using System.Text; using UnityEngine; using UVC.Log; namespace UVC.network { /// /// MQTT 클라이언트를 관리하고 메시지 송수신을 처리하는 서비스 클래스입니다. /// 이 클래스는 MQTT 브로커와의 연결 관리, 토픽 구독, 메시지 발행 등의 기능을 제공합니다. /// /// /// 이 클래스는 스레드 안전한 방식으로 토픽 핸들러를 관리하며, 연결 끊김 시 자동 재연결 기능을 제공합니다. /// 내부적으로 Best.MQTT 라이브러리를 사용하여 MQTT 프로토콜 통신을 구현합니다. /// public class MQTTService { private string MQTTDomain = "localhost"; private int MQTTPort = 1883; private MQTTClient client; private bool autoReconnect = true; // 자동 재연결 여부 private int reconnectDelay = 1000; // 재연결 시도 간격 (ms) private ConcurrentDictionary> topicHandler; /// /// MQTT 브로커와의 연결 상태를 확인합니다. /// /// 클라이언트가 초기화되고 브로커에 연결된 경우 true를 반환합니다. public bool IsConnected => client != null && client.State == ClientStates.Connected; private Action onMessageReceived; /// /// MQTTService 인스턴스를 생성합니다. /// /// MQTT 브로커의 호스트명 또는 IP 주소입니다. /// MQTT 브로커의 포트 번호입니다. /// 연결이 끊겼을 때 자동으로 재연결을 시도할지 여부입니다. /// /// /// // localhost의 기본 MQTT 포트(1883)에 연결하는 서비스 생성 /// var mqttService = new MQTTService("localhost", 1883); /// /// // 자동 재연결 비활성화 /// var mqttService = new MQTTService("mqtt.example.com", 8883, false); /// /// public MQTTService(string domain, int port = 1883, bool autoReconnect = true) { topicHandler = new ConcurrentDictionary>(); MQTTDomain = domain; MQTTPort = port; this.autoReconnect = autoReconnect; } /// /// MQTT 클라이언트를 생성하고 이벤트 핸들러를 등록합니다. /// /// /// 이 메서드는 클라이언트가 이미 존재하는 경우 기존 연결을 종료한 후 새로운 클라이언트를 생성합니다. /// private void CreateMQTTClient() { Debug.Log($"MQTT Domain:{MQTTDomain} , MQTTPORT:{MQTTPort}"); var options = new ConnectionOptionsBuilder() .WithTCP(MQTTDomain, MQTTPort) .Build(); if (client != null) Disconnect(); client = new MQTTClient(options); client.OnConnected += OnConnectedMQTT; client.OnStateChanged += OnStateChangedMQTT; client.OnDisconnect += OnDisconnectedMQTT; client.OnError += OnErrorMQTT; } /// /// 특정 MQTT 토픽에 대한 메시지 핸들러를 추가합니다. /// /// 구독할 MQTT 토픽입니다. /// 토픽에 메시지가 수신되면 호출될 핸들러입니다. /// 첫 번째 매개변수는 토픽이고 두 번째 매개변수는 메시지 내용입니다. /// /// 하나의 토픽에 여러 핸들러를 등록할 수 있으며, 메시지 수신 시 모든 핸들러가 호출됩니다. /// /// /// /// // MQTT 서비스 인스턴스 생성 /// var mqtt = new MQTTService("localhost", 1883); /// /// // 핸들러 등록 및 연결 /// mqtt.AddTopicHandler("sensor/temperature", (topic, message) => { /// Debug.Log($"온도 데이터 수신: {message}"); /// }); /// mqtt.Connect(); /// /// public void AddTopicHandler(string topic, Action handler) { topicHandler.AddOrUpdate( topic, // 키가 없을 때 새로운 핸들러 추가 handler, // 키가 이미 있을 때 기존 핸들러에 추가 (_, existingHandler) => existingHandler + handler ); } /// /// 특정 MQTT 토픽에서 메시지 핸들러를 제거합니다. /// /// 핸들러를 제거할 MQTT 토픽입니다. /// 제거할 메시지 핸들러입니다. /// /// 지정된 토픽에 대한 모든 핸들러가 제거되면 해당 토픽에 대한 키도 제거됩니다. /// /// /// /// // 핸들러 정의 /// Action<string, string> temperatureHandler = (topic, message) => { /// Debug.Log($"온도 데이터 수신: {message}"); /// }; /// /// // 핸들러 등록 /// mqtt.AddTopicHandler("sensor/temperature", temperatureHandler); /// /// // 나중에 핸들러 제거 /// mqtt.RemoveTopicHandler("sensor/temperature", temperatureHandler); /// /// public void RemoveTopicHandler(string topic, Action handler) { topicHandler.AddOrUpdate( topic, // 키가 없는 경우 - 여기서는 발생하면 안 됨 _ => null, // 기존 핸들러에서 제거 (_, existingHandler) => { var updatedHandler = existingHandler - handler; return updatedHandler; } ); // 핸들러가 null이면 키 자체를 제거 if (topicHandler.TryGetValue(topic, out var currentHandler) && currentHandler == null) { topicHandler.TryRemove(topic, out _); } } /// /// 모든 토픽 핸들러를 제거합니다. /// /// /// 이 메서드는 모든 토픽 구독을 효과적으로 취소합니다. 다음에 연결할 때 /// 새로운 핸들러를 추가해야 합니다. /// /// /// /// // 모든 핸들러 제거 /// mqtt.ClearTopicHandlers(); /// /// public void ClearTopicHandlers() { topicHandler.Clear(); } /// /// MQTT 브로커에 연결합니다. /// /// /// 이 메서드는 새 MQTT 클라이언트를 생성하고 브로커에 연결을 시작합니다. /// 연결 성공 시 OnConnectedMQTT 이벤트 핸들러가 호출되며, 이때 등록된 모든 토픽을 구독합니다. /// /// /// /// // MQTT 서비스 인스턴스 생성 /// var mqtt = new MQTTService("test.mosquitto.org", 1883); /// /// // 토픽 핸들러 추가 후 연결 /// mqtt.AddTopicHandler("test/topic", (topic, message) => { /// Debug.Log($"메시지 수신: {message}"); /// }); /// mqtt.Connect(); /// /// public void Connect() { CreateMQTTClient(); client.BeginConnect(ConnectPacketBuilderCallback); } /// /// MQTT 브로커와의 연결을 종료합니다. /// /// /// 이 메서드는 모든 이벤트 핸들러를 제거하고 연결된 경우 MQTT 브로커로 정상 연결 종료 패킷을 전송합니다. /// /// /// /// // 사용이 끝난 후 연결 종료 /// mqtt.Disconnect(); /// /// public void Disconnect() { if (client != null) { // 이벤트 핸들러 제거 client.OnConnected -= OnConnectedMQTT; client.OnStateChanged -= OnStateChangedMQTT; client.OnDisconnect -= OnDisconnectedMQTT; client.OnError -= OnErrorMQTT; if (IsConnected) { client.CreateDisconnectPacketBuilder() .WithReasonCode(DisconnectReasonCodes.NormalDisconnection) .WithReasonString("Bye") .BeginDisconnect(); } client = null; } } /// /// 연결 패킷 빌더를 구성하는 콜백 메서드입니다. /// /// MQTT 클라이언트 인스턴스 /// 연결 패킷 빌더 /// 구성된 연결 패킷 빌더 private ConnectPacketBuilder ConnectPacketBuilderCallback(MQTTClient client, ConnectPacketBuilder builder) { return builder.WithKeepAlive(60 * 60);//keep alive 1 } /// /// MQTT 브로커에 연결되었을 때 호출되는 이벤트 핸들러입니다. /// /// 연결된 MQTT 클라이언트 /// /// 이 메서드는 연결 성공 시 등록된 모든 토픽에 대한 구독을 시작합니다. /// private void OnConnectedMQTT(MQTTClient client) { Debug.Log($"MQTT OnConnected"); BulkSubscribePacketBuilder builder = client.CreateBulkSubscriptionBuilder(); foreach (var topic in topicHandler.Keys) { builder.WithTopic(new SubscribeTopicBuilder(topic).WithMessageCallback(OnTopic)); } builder.BeginSubscribe(); } /// /// MQTT 클라이언트의 상태가 변경되었을 때 호출되는 이벤트 핸들러입니다. /// /// MQTT 클라이언트 /// 이전 상태 /// 새 상태 private void OnStateChangedMQTT(MQTTClient client, ClientStates oldState, ClientStates newState) { Debug.Log($"MQTT OnStateChanged {oldState} => {newState}"); } /// /// MQTT 브로커와의 연결이 종료되었을 때 호출되는 이벤트 핸들러입니다. /// /// MQTT 클라이언트 /// 연결 종료 이유 코드 /// 연결 종료 이유 문자열 /// /// 자동 재연결이 활성화된 경우 지정된 지연 시간 후에 재연결을 시도합니다. /// private void OnDisconnectedMQTT(MQTTClient client, DisconnectReasonCodes code, string reason) { //ULog.Info($"MQTT Disconnected - code: {code}, reason: '{reason}'"); Debug.Log($"MQTTDisconnected pcTime={DateTime.Now}"); // 지연 후에만 한 번 연결 if (Application.isPlaying && autoReconnect) { UniTask.Delay(reconnectDelay).ContinueWith(() => { Connect(); }).Forget(); } } /// /// MQTT 클라이언트에서 오류가 발생했을 때 호출되는 이벤트 핸들러입니다. /// /// MQTT 클라이언트 /// 오류 발생 이유 /// /// 이 이벤트 후에는 OnDisconnectedMQTT 이벤트도 발생합니다. /// private void OnErrorMQTT(MQTTClient client, string reason) { Debug.LogError($"MQTT OnError reason: '{reason}'"); } /// /// 구독한 토픽으로 메시지가 수신되었을 때 호출되는 콜백입니다. /// /// MQTT 클라이언트 /// 구독 토픽 정보 /// 수신된 메시지의 토픽 이름 /// 수신된 메시지 /// /// 이 메서드는 메시지를 로깅하고 해당 토픽에 등록된 모든 핸들러를 호출합니다. /// private void OnTopic(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message) { string payload = Encoding.UTF8.GetString(message.Payload.Data, message.Payload.Offset, message.Payload.Count); Debug.Log($"MQTT OnTopic {topic.Filter.OriginalFilter} => {payload}"); ServerLog.LogMqtt(MQTTDomain, MQTTPort.ToString(), topic.Filter.OriginalFilter, payload, DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ")); if (onMessageReceived != null) { onMessageReceived.Invoke(topic.Filter.OriginalFilter, payload); } } /// /// 지정된 토픽으로 메시지를 발행(publish)합니다. /// /// 메시지를 발행할 토픽입니다. /// 발행할 메시지 내용입니다. /// /// 이 메서드는 클라이언트가 연결되어 있지 않으면 오류 로그를 기록하고 메시지를 발행하지 않습니다. /// /// /// /// // MQTT 서비스 인스턴스 생성 및 연결 /// var mqtt = new MQTTService("localhost", 1883); /// mqtt.Connect(); /// /// // 메시지 발행 /// mqtt.Publish("device/status", "online"); /// /// // JSON 형식 메시지 발행 /// string jsonMessage = JsonUtility.ToJson(new DeviceStatus { status = "online", battery = 95 }); /// mqtt.Publish("device/status", jsonMessage); /// /// public void Publish(string topic, string message) { if (client == null || client.State != ClientStates.Connected) { Debug.LogError("MQTT client is not connected. Cannot publish message."); return; } client.CreateApplicationMessageBuilder(topic) .WithPayload(message) .BeginPublish(); } } }