using Newtonsoft.Json.Linq; using System.Collections.Concurrent; using UnityEngine; using System.Collections; public class SimulationModelSink : SimulationModel { public string eventShipItem = "items_shipped_successfully"; public string eventRemoveItem = "product_defective"; public int totalShipped = 0; ConcurrentQueue dataQueue; // Start is called once before the first execution of Update after the MonoBehaviour is created void Start() { } // Update is called once per frame void Update() { } protected override IEnumerator RunSimulationCoroutine() { yield return new WaitUntil(() => !string.IsNullOrEmpty(nodeID)); while (dataQueue == null) { dataQueue = MQTTDataBase.Instance.GetDataQueue(nodeID); if (dataQueue == null) yield return null; else break; } while (true) { if (dataQueue.IsEmpty) { yield return null; continue; } if (dataQueue.TryDequeue(out JObject currentData)) { string eventKey = "_event"; if (currentData.ContainsKey(eventKey)) { string value = currentData[eventKey].ToString(); if (value.Contains(eventShipItem)) { } else if (value.Contains(eventRemoveItem)) { string[] queueIDKey = { "data", "queue_name" }; string[] queueRemainKey = { "data", "remaining_queue_length" }; string queueID = GetJsonValue(currentData, queueIDKey)?.ToString(); int queueCapacity = GetJsonIntValue(currentData, queueRemainKey); SimulationModel model = DataManager.I.GetModel(queueID); SimulationModelStore storeModel = (SimulationModelStore)model; while (storeModel.storedProducts.Count > queueCapacity) { Destroy(storeModel.GetProduct()); totalShipped++; } } } } yield return null; } yield return null; } }