using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; using Haoliang.Core.Services; using Haoliang.Models.Models.Device; using Haoliang.Models.Models.Production; using Haoliang.Models.Models.System; using Haoliang.Models.Common; namespace Haoliang.Core.Services { public interface IRealTimeService { /// /// Connect a client to WebSocket hub /// Task ConnectClientAsync(string connectionId, string userId, string clientType); /// /// Disconnect a client /// Task DisconnectClientAsync(string connectionId); /// /// Join a device monitoring group /// Task JoinDeviceGroupAsync(string connectionId, int deviceId); /// /// Leave a device monitoring group /// Task LeaveDeviceGroupAsync(string connectionId, int deviceId); /// /// Join a dashboard group /// Task JoinDashboardGroupAsync(string connectionId, string dashboardId); /// /// Leave a dashboard group /// Task LeaveDashboardGroupAsync(string connectionId, string dashboardId); /// /// Broadcast device status update /// Task BroadcastDeviceStatusAsync(DeviceStatusUpdate statusUpdate); /// /// Broadcast production update /// Task BroadcastProductionUpdateAsync(ProductionUpdate productionUpdate); /// /// Broadcast alert update /// Task BroadcastAlertAsync(AlertUpdate alertUpdate); /// /// Send system notification /// Task SendSystemNotificationAsync(SystemNotification notification); /// /// Send real-time dashboard data /// Task SendDashboardUpdateAsync(DashboardUpdate dashboardUpdate); /// /// Send command to specific client /// Task SendCommandToClientAsync(string connectionId, RealTimeCommand command); /// /// Broadcast command to all clients /// Task BroadcastCommandAsync(RealTimeCommand command); /// /// Get connected clients count /// Task GetConnectedClientsCountAsync(); /// /// Get connected clients by type /// Task> GetConnectedClientsByTypeAsync(string clientType); /// /// Get device monitoring status /// Task GetDeviceMonitoringStatusAsync(int deviceId); /// /// Start data streaming for device /// Task StartDeviceStreamingAsync(int deviceId, int intervalMs = 1000); /// /// Stop data streaming for device /// Task StopDeviceStreamingAsync(int deviceId); /// /// Get active streaming devices /// Task> GetActiveStreamingDevicesAsync(); } public class RealTimeService : IRealTimeService { private readonly IHubContext _hubContext; private readonly IDeviceCollectionService _deviceCollectionService; private readonly IProductionService _productionService; private readonly IAlarmService _alarmService; private readonly ICacheService _cacheService; private readonly ConcurrentDictionary _connectedClients = new ConcurrentDictionary(); private readonly ConcurrentDictionary _deviceStreaming = new ConcurrentDictionary(); private readonly Timer _deviceStatusTimer; private readonly Timer _productionTimer; public RealTimeService( IHubContext hubContext, IDeviceCollectionService deviceCollectionService, IProductionService productionService, IAlarmService alarmService, ICacheService cacheService) { _hubContext = hubContext; _deviceCollectionService = deviceCollectionService; _productionService = productionService; _alarmService = alarmService; _cacheService = cacheService; // Start timers for periodic updates _deviceStatusTimer = new Timer(UpdateDeviceStatuses, null, TimeSpan.Zero, TimeSpan.FromSeconds(30)); _productionTimer = new Timer(UpdateProductionData, null, TimeSpan.Zero, TimeSpan.FromSeconds(60)); } public async Task ConnectClientAsync(string connectionId, string userId, string clientType) { var clientInfo = new ClientInfo { ConnectionId = connectionId, UserId = userId, ClientType = clientType, ConnectedAt = DateTime.UtcNow, LastActivity = DateTime.UtcNow, Groups = new HashSet(), DeviceGroups = new HashSet() }; _connectedClients.AddOrUpdate(connectionId, clientInfo, (key, existing) => clientInfo); await _hubContext.Clients.Client(connectionId).SendAsync("ClientConnected", new { ClientId = connectionId, UserId = userId, ClientType = clientType, Timestamp = DateTime.UtcNow }); } public async Task DisconnectClientAsync(string connectionId) { if (_connectedClients.TryRemove(connectionId, out var clientInfo)) { // Remove from all groups foreach (var group in clientInfo.Groups) { await _hubContext.Groups.RemoveFromGroupAsync(connectionId, group); } foreach (var deviceId in clientInfo.DeviceGroups) { await _hubContext.Groups.RemoveFromGroupAsync(connectionId, $"device_{deviceId}"); } // Notify other clients await _hubContext.Clients.AllExcept(connectionId).SendAsync("ClientDisconnected", new { ClientId = connectionId, UserId = clientInfo.UserId, Timestamp = DateTime.UtcNow }); } } public async Task JoinDeviceGroupAsync(string connectionId, int deviceId) { if (_connectedClients.TryGetValue(connectionId, out var clientInfo)) { clientInfo.DeviceGroups.Add(deviceId); clientInfo.LastActivity = DateTime.UtcNow; await _hubContext.Groups.AddToGroupAsync(connectionId, $"device_{deviceId}"); // Send current device status var deviceStatus = await _deviceCollectionService.GetDeviceCurrentStatusAsync(deviceId); await _hubContext.Clients.Client(connectionId).SendAsync("DeviceStatusUpdated", new { DeviceId = deviceId, Status = deviceStatus.Status, Timestamp = DateTime.UtcNow }); } } public async Task LeaveDeviceGroupAsync(string connectionId, int deviceId) { if (_connectedClients.TryGetValue(connectionId, out var clientInfo)) { clientInfo.DeviceGroups.Remove(deviceId); await _hubContext.Groups.RemoveFromGroupAsync(connectionId, $"device_{deviceId}"); } } public async Task JoinDashboardGroupAsync(string connectionId, string dashboardId) { if (_connectedClients.TryGetValue(connectionId, out var clientInfo)) { clientInfo.Groups.Add($"dashboard_{dashboardId}"); clientInfo.LastActivity = DateTime.UtcNow; await _hubContext.Groups.AddToGroupAsync(connectionId, $"dashboard_{dashboardId}"); // Send current dashboard data var dashboardUpdate = await GetDashboardUpdateAsync(); await _hubContext.Clients.Client(connectionId).SendAsync("DashboardUpdated", dashboardUpdate); } } public async Task LeaveDashboardGroupAsync(string connectionId, string dashboardId) { if (_connectedClients.TryGetValue(connectionId, out var clientInfo)) { clientInfo.Groups.Remove($"dashboard_{dashboardId}"); await _hubContext.Groups.RemoveFromGroupAsync(connectionId, $"dashboard_{dashboardId}"); } } public async Task BroadcastDeviceStatusAsync(DeviceStatusUpdate statusUpdate) { await _hubContext.Clients.Group($"device_{statusUpdate.DeviceId}").SendAsync("DeviceStatusUpdated", statusUpdate); // Also broadcast to dashboard groups await _hubContext.Clients.Group("dashboard").SendAsync("DeviceStatusUpdated", statusUpdate); } public async Task BroadcastProductionUpdateAsync(ProductionUpdate productionUpdate) { await _hubContext.Clients.Group($"device_{productionUpdate.DeviceId}").SendAsync("ProductionUpdated", productionUpdate); // Also broadcast to dashboard groups await _hubContext.Clients.Group("dashboard").SendAsync("ProductionUpdated", productionUpdate); } public async Task BroadcastAlertAsync(AlertUpdate alertUpdate) { await _hubContext.Clients.Group("dashboard").SendAsync("AlertUpdated", alertUpdate); await _hubContext.Clients.Group("alerts").SendAsync("AlertUpdated", alertUpdate); // Send to specific device groups if alert is device-specific if (alertUpdate.DeviceId.HasValue) { await _hubContext.Clients.Group($"device_{alertUpdate.DeviceId.Value}").SendAsync("AlertUpdated", alertUpdate); } } public async Task SendSystemNotificationAsync(SystemNotification notification) { await _hubContext.Clients.Group("notifications").SendAsync("SystemNotification", notification); } public async Task SendDashboardUpdateAsync(DashboardUpdate dashboardUpdate) { await _hubContext.Clients.Group("dashboard").SendAsync("DashboardUpdated", dashboardUpdate); } public async Task SendCommandToClientAsync(string connectionId, RealTimeCommand command) { await _hubContext.Clients.Client(connectionId).SendAsync("Command", command); } public async Task BroadcastCommandAsync(RealTimeCommand command) { await _hubContext.Clients.All.SendAsync("Command", command); } public async Task GetConnectedClientsCountAsync() { // Clean up inactive clients var cutoffTime = DateTime.UtcNow.AddMinutes(-5); var inactiveClients = _connectedClients.Values.Where(c => c.LastActivity < cutoffTime).ToList(); foreach (var client in inactiveClients) { await DisconnectClientAsync(client.ConnectionId); } return _connectedClients.Count; } public async Task> GetConnectedClientsByTypeAsync(string clientType) { return _connectedClients.Values .Where(c => c.ClientType.Equals(clientType, StringComparison.OrdinalIgnoreCase)) .ToList(); } public async Task GetDeviceMonitoringStatusAsync(int deviceId) { var streamingInfo = _deviceStreaming.GetValueOrDefault(deviceId); var monitoringClients = _connectedClients.Values .Count(c => c.DeviceGroups.Contains(deviceId)); return new DeviceMonitoringStatus { DeviceId = deviceId, IsStreaming = streamingInfo != null, StreamingIntervalMs = streamingInfo?.IntervalMs ?? 0, MonitoringClients = monitoringClients, StreamingStartedAt = streamingInfo?.StartedAt, LastStreamingUpdate = streamingInfo?.LastUpdate }; } public async Task StartDeviceStreamingAsync(int deviceId, int intervalMs = 1000) { if (!_deviceStreaming.ContainsKey(deviceId)) { var streamingInfo = new DeviceStreamingInfo { DeviceId = deviceId, IntervalMs = intervalMs, StartedAt = DateTime.UtcNow, LastUpdate = DateTime.UtcNow, IsRunning = true }; _deviceStreaming.AddOrUpdate(deviceId, streamingInfo, (key, existing) => streamingInfo); // Start streaming task Task.Run(() => StreamDeviceData(deviceId, intervalMs)); } } public async Task StopDeviceStreamingAsync(int deviceId) { if (_deviceStreaming.TryRemove(deviceId, out var streamingInfo)) { streamingInfo.IsRunning = false; } } public async Task> GetActiveStreamingDevicesAsync() { return _deviceStreaming.Values .Where(s => s.IsRunning) .Select(s => s.DeviceId) .ToList(); } #region Private Methods private void UpdateDeviceStatuses(object state) { Task.Run(async () => { try { var activeDevices = await _deviceCollectionService.GetAllActiveDevicesAsync(); foreach (var device in activeDevices) { var status = await _deviceCollectionService.GetDeviceCurrentStatusAsync(device.Id); var statusUpdate = new DeviceStatusUpdate { DeviceId = device.Id, DeviceName = device.Name, Status = status.Status, CurrentProgram = status.CurrentProgram, Runtime = status.Runtime, Timestamp = DateTime.UtcNow }; await BroadcastDeviceStatusAsync(statusUpdate); } } catch (Exception ex) { // Log error Console.WriteLine($"Error updating device statuses: {ex.Message}"); } }); } private void UpdateProductionData(object state) { Task.Run(async () => { try { var date = DateTime.Today; var devices = await _deviceCollectionService.GetAllActiveDevicesAsync(); foreach (var device in devices) { var production = await _productionService.GetDeviceProductionForDateAsync(device.Id, date); var productionUpdate = new ProductionUpdate { DeviceId = device.Id, DeviceName = device.Name, Quantity = production, Timestamp = DateTime.UtcNow }; await BroadcastProductionUpdateAsync(productionUpdate); } } catch (Exception ex) { // Log error Console.WriteLine($"Error updating production data: {ex.Message}"); } }); } private async Task StreamDeviceData(int deviceId, int intervalMs) { var streamingInfo = _deviceStreaming.GetValueOrDefault(deviceId); if (streamingInfo == null || !streamingInfo.IsRunning) return; try { while (streamingInfo.IsRunning) { try { // Get current device status var status = await _deviceCollectionService.GetDeviceCurrentStatusAsync(deviceId); // Get current production data var production = await _productionService.GetDeviceProductionForDateAsync(deviceId, DateTime.Today); // Create streaming message var streamingMessage = new DeviceStreamingMessage { DeviceId = deviceId, DeviceName = status.DeviceName, Status = status.Status, CurrentProgram = status.CurrentProgram, Runtime = status.Runtime, Quantity = production, Timestamp = DateTime.UtcNow, IntervalMs = intervalMs }; // Send to device group await _hubContext.Clients.Group($"device_{deviceId}").SendAsync("DeviceStreamingData", streamingMessage); // Update last streaming time streamingInfo.LastUpdate = DateTime.UtcNow; } catch (Exception ex) { // Log error but continue streaming Console.WriteLine($"Error streaming device {deviceId} data: {ex.Message}"); } await Task.Delay(intervalMs); } } catch (Exception ex) { Console.WriteLine($"Device streaming task for device {deviceId} failed: {ex.Message}"); } } private async Task GetDashboardUpdateAsync() { // Get dashboard summary from cache or service var dashboardSummary = await _cacheService.GetOrSetDashboardSummaryAsync(DateTime.Today, () => _productionService.GetDashboardSummaryAsync(new DashboardFilter { Date = DateTime.Today })); return new DashboardUpdate { Timestamp = DateTime.UtcNow, TotalDevices = dashboardSummary.TotalDevices, ActiveDevices = dashboardSummary.ActiveDevices, OfflineDevices = dashboardSummary.OfflineDevices, TotalProductionToday = dashboardSummary.TotalProductionToday, TotalProductionThisWeek = dashboardSummary.TotalProductionThisWeek, TotalProductionThisMonth = dashboardSummary.TotalProductionThisMonth, OverallEfficiency = dashboardSummary.OverallEfficiency, QualityRate = dashboardSummary.QualityRate, DeviceSummaries = dashboardSummary.DeviceSummaries }; } #endregion } #region Supporting Models public class RealTimeHub : Hub { private readonly IRealTimeService _realTimeService; public RealTimeHub(IRealTimeService realTimeService) { _realTimeService = realTimeService; } public override async Task OnConnectedAsync() { await base.OnConnectedAsync(); } public override async Task OnDisconnectedAsync(Exception exception) { await base.OnDisconnectedAsync(exception); } public async Task JoinDeviceGroup(int deviceId) { await _realTimeService.JoinDeviceGroupAsync(Context.ConnectionId, deviceId); await Clients.Caller.SendAsync("JoinedDeviceGroup", new { DeviceId = deviceId }); } public async Task LeaveDeviceGroup(int deviceId) { await _realTimeService.LeaveDeviceGroupAsync(Context.ConnectionId, deviceId); await Clients.Caller.SendAsync("LeftDeviceGroup", new { DeviceId = deviceId }); } public async Task JoinDashboardGroup(string dashboardId) { await _realTimeService.JoinDashboardGroupAsync(Context.ConnectionId, dashboardId); await Clients.Caller.SendAsync("JoinedDashboardGroup", new { DashboardId = dashboardId }); } public async Task LeaveDashboardGroup(string dashboardId) { await _realTimeService.LeaveDashboardGroupAsync(Context.ConnectionId, dashboardId); await Clients.Caller.SendAsync("LeftDashboardGroup", new { DashboardId = dashboardId }); } public async Task RequestDeviceStreaming(int deviceId, int intervalMs = 1000) { await _realTimeService.StartDeviceStreamingAsync(deviceId, intervalMs); await Clients.Caller.SendAsync("DeviceStreamingStarted", new { DeviceId = deviceId, IntervalMs = intervalMs }); } public async Task StopDeviceStreaming(int deviceId) { await _realTimeService.StopDeviceStreamingAsync(deviceId); await Clients.Caller.SendAsync("DeviceStreamingStopped", new { DeviceId = deviceId }); } public async Task Ping() { await Clients.Caller.SendAsync("Pong", new { Timestamp = DateTime.UtcNow }); } } public class ClientInfo { public string ConnectionId { get; set; } public string UserId { get; set; } public string ClientType { get; set; } public DateTime ConnectedAt { get; set; } public DateTime LastActivity { get; set; } public HashSet Groups { get; set; } public HashSet DeviceGroups { get; set; } } public class DeviceStreamingInfo { public int DeviceId { get; set; } public int IntervalMs { get; set; } public DateTime StartedAt { get; set; } public DateTime LastUpdate { get; set; } public bool IsRunning { get; set; } } public class DeviceStatusUpdate { public int DeviceId { get; set; } public string DeviceName { get; set; } public DeviceStatus Status { get; set; } public string CurrentProgram { get; set; } public TimeSpan Runtime { get; set; } public DateTime Timestamp { get; set; } } public class ProductionUpdate { public int DeviceId { get; set; } public string DeviceName { get; set; } public decimal Quantity { get; set; } public DateTime Timestamp { get; set; } } public class AlertUpdate { public int? DeviceId { get; set; } public string DeviceName { get; set; } public string AlertType { get; set; } public string Message { get; set; } public DateTime Timestamp { get; set; } public bool IsResolved { get; set; } } public class SystemNotification { public string NotificationType { get; set; } public string Title { get; set; } public string Message { get; set; } public DateTime Timestamp { get; set; } public Dictionary Data { get; set; } } public class DashboardUpdate { public DateTime Timestamp { get; set; } public int TotalDevices { get; set; } public int ActiveDevices { get; set; } public int OfflineDevices { get; set; } public decimal TotalProductionToday { get; set; } public decimal TotalProductionThisWeek { get; set; } public decimal TotalProductionThisMonth { get; set; } public decimal OverallEfficiency { get; set; } public decimal QualityRate { get; set; } public List DeviceSummaries { get; set; } } public class RealTimeCommand { public string Command { get; set; } public object Parameters { get; set; } public DateTime Timestamp { get; set; } public string CommandType { get; set; } } public class DeviceStreamingMessage { public int DeviceId { get; set; } public string DeviceName { get; set; } public DeviceStatus Status { get; set; } public string CurrentProgram { get; set; } public TimeSpan Runtime { get; set; } public decimal Quantity { get; set; } public DateTime Timestamp { get; set; } public int IntervalMs { get; set; } } public class DeviceMonitoringStatus { public int DeviceId { get; set; } public bool IsStreaming { get; set; } public int StreamingIntervalMs { get; set; } public int MonitoringClients { get; set; } public DateTime? StreamingStartedAt { get; set; } public DateTime? LastStreamingUpdate { get; set; } } #endregion }