You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

547 lines
21 KiB
C#

using Microsoft.AspNetCore.SignalR;
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Haoliang.Core.Services;
using Haoliang.Models.Models.Device;
using Haoliang.Models.Models.Production;
using Haoliang.Models.Models.System;
namespace Haoliang.Api.Hubs
{
public class RealTimeHub : Hub
{
private static readonly ConcurrentDictionary<string, ClientConnectionInfo> _connectedClients =
new ConcurrentDictionary<string, ClientConnectionInfo>();
private static readonly ConcurrentDictionary<int, DeviceStreamingInfo> _deviceStreaming =
new ConcurrentDictionary<int, DeviceStreamingInfo>();
private readonly IRealTimeService _realTimeService;
private readonly IDeviceCollectionService _deviceCollectionService;
private readonly IProductionService _productionService;
public RealTimeHub(
IRealTimeService realTimeService,
IDeviceCollectionService deviceCollectionService,
IProductionService productionService)
{
_realTimeService = realTimeService;
_deviceCollectionService = deviceCollectionService;
_productionService = productionService;
}
/// <summary>
/// Called when a new client connects to the hub
/// </summary>
public override async Task OnConnectedAsync()
{
var connectionId = Context.ConnectionId;
// Get client information from query parameters
var userId = Context.GetHttpContext().Request.Query["userId"];
var clientType = Context.GetHttpContext().Request.Query["clientType"] ?? "web";
var dashboardId = Context.GetHttpContext().Request.Query["dashboardId"];
var clientInfo = new ClientConnectionInfo
{
ConnectionId = connectionId,
UserId = userId.ToString(),
ClientType = clientType.ToString(),
ConnectedAt = DateTime.UtcNow,
LastActivity = DateTime.UtcNow,
DashboardId = string.IsNullOrEmpty(dashboardId.ToString()) ? null : dashboardId.ToString(),
UserAgent = Context.GetHttpContext().Request.Headers["User-Agent"].ToString(),
IpAddress = Context.GetHttpContext().Connection.RemoteIpAddress?.ToString()
};
_connectedClients.AddOrUpdate(connectionId, clientInfo, (key, existing) => clientInfo);
// Add to notifications group by default
await Groups.AddToGroupAsync(connectionId, "notifications");
// If dashboard ID provided, add to dashboard group
if (!string.IsNullOrEmpty(clientInfo.DashboardId))
{
await Groups.AddToGroupAsync(connectionId, $"dashboard_{clientInfo.DashboardId}");
}
// Notify other clients about new connection
await Clients.Others.SendAsync("ClientConnected", new
{
ClientId = connectionId,
UserId = clientInfo.UserId,
ClientType = clientType,
Timestamp = DateTime.UtcNow
});
// Send welcome message to connecting client
await Clients.Caller.SendAsync("Welcome", new
{
ClientId = connectionId,
Timestamp = DateTime.UtcNow,
ServerTime = DateTime.UtcNow
});
await base.OnConnectedAsync();
}
/// <summary>
/// Called when a client disconnects from the hub
/// </summary>
public override async Task OnDisconnectedAsync(Exception exception)
{
var connectionId = Context.ConnectionId;
if (_connectedClients.TryRemove(connectionId, out var clientInfo))
{
// Remove from all groups
await Groups.RemoveFromGroupAsync(connectionId, "notifications");
await Groups.RemoveFromGroupAsync(connectionId, $"dashboard_{clientInfo.DashboardId}");
// Remove from device groups
foreach (var deviceId in clientInfo.MonitoredDevices)
{
await Groups.RemoveFromGroupAsync(connectionId, $"device_{deviceId}");
}
// Stop device streaming if client was streaming
foreach (var deviceId in clientInfo.StreamingDevices)
{
await StopDeviceStreamingInternal(deviceId);
}
// Notify other clients about disconnection
await Clients.Others.SendAsync("ClientDisconnected", new
{
ClientId = connectionId,
UserId = clientInfo.UserId,
Reason = exception?.Message ?? "Unknown",
Timestamp = DateTime.UtcNow
});
}
await base.OnDisconnectedAsync(exception);
}
/// <summary>
/// Client requests to join device monitoring group
/// </summary>
public async Task JoinDeviceGroup(int deviceId)
{
var connectionId = Context.ConnectionId;
if (_connectedClients.TryGetValue(connectionId, out var clientInfo))
{
clientInfo.MonitoredDevices.Add(deviceId);
clientInfo.LastActivity = DateTime.UtcNow;
await Groups.AddToGroupAsync(connectionId, $"device_{deviceId}");
// Send current device status
var deviceStatus = await _deviceCollectionService.GetDeviceCurrentStatusAsync(deviceId);
await Clients.Caller.SendAsync("DeviceStatusUpdated", new
{
DeviceId = deviceId,
Status = deviceStatus.Status,
CurrentProgram = deviceStatus.CurrentProgram,
Runtime = deviceStatus.Runtime,
Timestamp = DateTime.UtcNow
});
// Notify other clients
await Clients.Group($"device_{deviceId}").SendAsync("DeviceMonitoringStarted", new
{
DeviceId = deviceId,
ClientCount = GetDeviceClientCount(deviceId),
Timestamp = DateTime.UtcNow
});
}
}
/// <summary>
/// Client requests to leave device monitoring group
/// </summary>
public async Task LeaveDeviceGroup(int deviceId)
{
var connectionId = Context.ConnectionId;
if (_connectedClients.TryGetValue(connectionId, out var clientInfo))
{
clientInfo.MonitoredDevices.Remove(deviceId);
await Groups.RemoveFromGroupAsync(connectionId, $"device_{deviceId}");
// Notify other clients
await Clients.Group($"device_{deviceId}").SendAsync("DeviceMonitoringStopped", new
{
DeviceId = deviceId,
ClientCount = GetDeviceClientCount(deviceId),
Timestamp = DateTime.UtcNow
});
}
}
/// <summary>
/// Client requests to join dashboard group
/// </summary>
public async Task JoinDashboardGroup(string dashboardId)
{
var connectionId = Context.ConnectionId;
if (_connectedClients.TryGetValue(connectionId, out var clientInfo))
{
clientInfo.DashboardId = dashboardId;
clientInfo.LastActivity = DateTime.UtcNow;
await Groups.AddToGroupAsync(connectionId, $"dashboard_{dashboardId}");
// Send current dashboard data
var dashboardUpdate = await GetDashboardUpdateAsync();
await Clients.Caller.SendAsync("DashboardUpdated", dashboardUpdate);
// Notify dashboard group about new client
await Clients.Group($"dashboard_{dashboardId}").SendAsync("DashboardClientJoined", new
{
ClientId = connectionId,
DashboardId = dashboardId,
Timestamp = DateTime.UtcNow
});
}
}
/// <summary>
/// Client requests to leave dashboard group
/// </summary>
public async Task LeaveDashboardGroup(string dashboardId)
{
var connectionId = Context.ConnectionId;
if (_connectedClients.TryGetValue(connectionId, out var clientInfo))
{
clientInfo.DashboardId = null;
await Groups.RemoveFromGroupAsync(connectionId, $"dashboard_{dashboardId}");
// Notify dashboard group about client leaving
await Clients.Group($"dashboard_{dashboardId}").SendAsync("DashboardClientLeft", new
{
ClientId = connectionId,
DashboardId = dashboardId,
Timestamp = DateTime.UtcNow
});
}
}
/// <summary>
/// Client requests to start device streaming
/// </summary>
public async Task StartDeviceStreaming(int deviceId, [FromQuery] int intervalMs = 1000)
{
var connectionId = Context.ConnectionId;
if (_connectedClients.TryGetValue(connectionId, out var clientInfo))
{
clientInfo.StreamingDevices.Add(deviceId);
clientInfo.LastActivity = DateTime.UtcNow;
var streamingInfo = new DeviceStreamingInfo
{
DeviceId = deviceId,
IntervalMs = intervalMs,
StartedAt = DateTime.UtcNow,
LastUpdate = DateTime.UtcNow,
ClientsStreaming = new HashSet<string> { connectionId }
};
_deviceStreaming.AddOrUpdate(deviceId, streamingInfo, (key, existing) =>
{
existing.ClientsStreaming.Add(connectionId);
return existing;
});
// Start streaming task if not already running
if (!_deviceStreaming.ContainsKey(deviceId) ||
_deviceStreaming[deviceId].ClientsStreaming.Count == 1)
{
await StartDeviceDataStream(deviceId, intervalMs);
}
await Clients.Caller.SendAsync("DeviceStreamingStarted", new
{
DeviceId = deviceId,
IntervalMs = intervalMs,
Timestamp = DateTime.UtcNow
});
}
}
/// <summary>
/// Client requests to stop device streaming
/// </summary>
public async Task StopDeviceStreaming(int deviceId)
{
var connectionId = Context.ConnectionId;
if (_connectedClients.TryGetValue(connectionId, out var clientInfo))
{
clientInfo.StreamingDevices.Remove(deviceId);
await StopDeviceStreamingInternal(deviceId);
await Clients.Caller.SendAsync("DeviceStreamingStopped", new
{
DeviceId = deviceId,
Timestamp = DateTime.UtcNow
});
}
}
/// <summary>
/// Client requests to join alerts group
/// </summary>
public async Task JoinAlertsGroup()
{
var connectionId = Context.ConnectionId;
await Groups.AddToGroupAsync(connectionId, "alerts");
await Clients.Caller.SendAsync("JoinedAlertsGroup", new
{
Timestamp = DateTime.UtcNow
});
}
/// <summary>
/// Client requests to leave alerts group
/// </summary>
public async Task LeaveAlertsGroup()
{
var connectionId = Context.ConnectionId;
await Groups.RemoveFromGroupAsync(connectionId, "alerts");
await Clients.Caller.SendAsync("LeftAlertsGroup", new
{
Timestamp = DateTime.UtcNow
});
}
/// <summary>
/// Client sends ping to keep connection alive
/// </summary>
public async Task Ping()
{
var connectionId = Context.ConnectionId;
if (_connectedClients.TryGetValue(connectionId, out var clientInfo))
{
clientInfo.LastActivity = DateTime.UtcNow;
}
await Clients.Caller.SendAsync("Pong", new
{
Timestamp = DateTime.UtcNow
});
}
/// <summary>
/// Client requests system information
/// </summary>
public async Task GetSystemInfo()
{
var systemInfo = new
{
Timestamp = DateTime.UtcNow,
ServerTime = DateTime.UtcNow,
Uptime = DateTime.UtcNow,
Version = "1.0.0", // This would come from app settings
ConnectedClients = _connectedClients.Count,
StreamingDevices = _deviceStreaming.Count
};
await Clients.Caller.SendAsync("SystemInfo", systemInfo);
}
/// <summary>
/// Client requests client list
/// </summary>
public async Task GetClientList()
{
var clients = _connectedClients.Values.Select(c => new
{
c.ConnectionId,
c.UserId,
c.ClientType,
c.ConnectedAt,
c.LastActivity,
c.DashboardId,
MonitoredDevices = c.MonitoredDevices.ToList(),
StreamingDevices = c.StreamingDevices.ToList()
}).ToList();
await Clients.Caller.SendAsync("ClientList", clients);
}
#region Private Methods
private async Task StartDeviceDataStream(int deviceId, int intervalMs)
{
try
{
while (_deviceStreaming.TryGetValue(deviceId, out var streamingInfo) && streamingInfo.ClientsStreaming.Any())
{
try
{
// Get current device status
var deviceStatus = await _deviceCollectionService.GetDeviceCurrentStatusAsync(deviceId);
// Get current production data
var production = await _productionService.GetDeviceProductionForDateAsync(deviceId, DateTime.Today);
// Create streaming message
var streamingMessage = new DeviceStreamingMessage
{
DeviceId = deviceId,
DeviceName = deviceStatus.DeviceName,
Status = deviceStatus.Status,
CurrentProgram = deviceStatus.CurrentProgram,
Runtime = deviceStatus.Runtime,
Quantity = production,
Timestamp = DateTime.UtcNow,
IntervalMs = intervalMs
};
// Send to device group
await Clients.Group($"device_{deviceId}").SendAsync("DeviceStreamingData", streamingMessage);
// Update last streaming time
streamingInfo.LastUpdate = DateTime.UtcNow;
}
catch (Exception ex)
{
// Log error but continue streaming
await Clients.Caller.SendAsync("StreamingError", new
{
DeviceId = deviceId,
ErrorMessage = ex.Message,
Timestamp = DateTime.UtcNow
});
}
await Task.Delay(intervalMs);
}
}
catch (Exception ex)
{
// Log fatal error
Console.WriteLine($"Device streaming task for device {deviceId} failed: {ex.Message}");
}
}
private async Task StopDeviceStreamingInternal(int deviceId)
{
if (_deviceStreaming.TryGetValue(deviceId, out var streamingInfo))
{
var connectionId = Context.ConnectionId;
streamingInfo.ClientsStreaming.Remove(connectionId);
if (!streamingInfo.ClientsStreaming.Any())
{
// No more clients streaming, remove from dictionary
_deviceStreaming.TryRemove(deviceId, out _);
}
else
{
// Update the streaming info
_deviceStreaming.AddOrUpdate(deviceId, streamingInfo, (key, existing) => streamingInfo);
}
}
}
private async Task<DashboardUpdate> GetDashboardUpdateAsync()
{
// This would typically call the production service to get current dashboard data
// For now, returning a simplified version
var date = DateTime.Today;
return new DashboardUpdate
{
Timestamp = DateTime.UtcNow,
TotalDevices = 10, // Placeholder
ActiveDevices = 8, // Placeholder
OfflineDevices = 2, // Placeholder
TotalProductionToday = 1250, // Placeholder
TotalProductionThisWeek = 8750, // Placeholder
TotalProductionThisMonth = 35000, // Placeholder
OverallEfficiency = 85.5m, // Placeholder
QualityRate = 98.2m, // Placeholder
DeviceSummaries = new List<DeviceSummary>() // Placeholder
};
}
private int GetDeviceClientCount(int deviceId)
{
return _connectedClients.Values.Count(c => c.MonitoredDevices.Contains(deviceId));
}
#endregion
}
#region Supporting Classes
public class ClientConnectionInfo
{
public string ConnectionId { get; set; }
public string UserId { get; set; }
public string ClientType { get; set; }
public DateTime ConnectedAt { get; set; }
public DateTime LastActivity { get; set; }
public string DashboardId { get; set; }
public string UserAgent { get; set; }
public string IpAddress { get; set; }
public HashSet<int> MonitoredDevices { get; set; } = new HashSet<int>();
public HashSet<int> StreamingDevices { get; set; } = new HashSet<int>();
}
public class DeviceStreamingInfo
{
public int DeviceId { get; set; }
public int IntervalMs { get; set; }
public DateTime StartedAt { get; set; }
public DateTime LastUpdate { get; set; }
public HashSet<string> ClientsStreaming { get; set; } = new HashSet<string>();
}
// These are the same models as in the RealTimeService but duplicated here for SignalR-specific usage
public class DeviceStreamingMessage
{
public int DeviceId { get; set; }
public string DeviceName { get; set; }
public DeviceStatus Status { get; set; }
public string CurrentProgram { get; set; }
public TimeSpan Runtime { get; set; }
public decimal Quantity { get; set; }
public DateTime Timestamp { get; set; }
public int IntervalMs { get; set; }
}
public class DashboardUpdate
{
public DateTime Timestamp { get; set; }
public int TotalDevices { get; set; }
public int ActiveDevices { get; set; }
public int OfflineDevices { get; set; }
public decimal TotalProductionToday { get; set; }
public decimal TotalProductionThisWeek { get; set; }
public decimal TotalProductionThisMonth { get; set; }
public decimal OverallEfficiency { get; set; }
public decimal QualityRate { get; set; }
public List<DeviceSummary> DeviceSummaries { get; set; }
}
public class TestResult
{
public string TestId { get; set; }
public DateTime Timestamp { get; set; }
public int ConnectedClients { get; set; }
public List<int> ActiveStreamingDevices { get; set; }
public string Status { get; set; }
}
#endregion
}