You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

471 lines
18 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Haoliang.Models.Device;
using Haoliang.Models.System;
using Haoliang.Models.DataCollection;
using Microsoft.AspNetCore.SignalR;
namespace Haoliang.Core.Services
{
public interface IRealTimeService
{
Task ConnectClientAsync(string connectionId, string userId);
Task DisconnectClientAsync(string connectionId);
Task SubscribeToDevicesAsync(string connectionId, IEnumerable<int> deviceIds);
Task UnsubscribeFromDevicesAsync(string connectionId, IEnumerable<int> deviceIds);
Task SubscribeToAllDevicesAsync(string connectionId);
Task UnsubscribeFromAllDevicesAsync(string connectionId);
Task BroadcastDeviceStatusAsync(DeviceCurrentStatus status);
Task BroadcastProductionUpdateAsync(ProductionUpdate update);
Task BroadcastAlarmAsync(Alarm alarm);
Task BroadcastSystemMessageAsync(SystemMessage message);
Task SendToUserAsync(string userId, string method, object data);
Task SendToUsersAsync(IEnumerable<string> userIds, string method, object data);
Task SendToAllAsync(string method, object data);
Task<int> GetConnectedClientsCountAsync();
Task<IEnumerable<string>> GetConnectedUsersAsync();
Task<bool> IsUserConnectedAsync(string userId);
Task<IEnumerable<int>> GetUserSubscribedDevicesAsync(string userId);
Task StartHeartbeatAsync();
Task StopHeartbeatAsync();
}
public interface IWebSocketHub
{
Task OnConnectedAsync(string connectionId);
Task OnDisconnectedAsync(string connectionId);
Task OnSubscribeToDevicesAsync(string connectionId, IEnumerable<int> deviceIds);
Task OnUnsubscribeFromDevicesAsync(string connectionId, IEnumerable<int> deviceIds);
Task OnSubscribeToAlarmsAsync(string connectionId);
Task OnUnsubscribeFromAlarmsAsync(string connectionId);
Task OnRequestDeviceStatusAsync(string connectionId, int deviceId);
Task OnRequestProductionDataAsync(string connectionId, int deviceId);
Task OnRequestSystemStatsAsync(string connectionId);
}
public interface IWebSocketAuthMiddleware
{
Task AuthenticateAsync(string connectionId, string token);
Task<string> GetUserIdAsync(string connectionId);
Task<string> GetConnectionIdAsync(string userId);
Task<bool> IsAuthenticatedAsync(string connectionId);
Task<bool> HasPermissionAsync(string connectionId, string permission);
}
public class RealTimeManager : IRealTimeService
{
private readonly IHubContext<RealTimeHub> _hubContext;
private readonly IWebSocketAuthMiddleware _authMiddleware;
private readonly ICachingService _cachingService;
// 用户连接信息
private readonly ConcurrentDictionary<string, string> _connectionUsers = new();
// 用户订阅的设备
private readonly ConcurrentDictionary<string, HashSet<int>> _userDeviceSubscriptions = new();
// 用户订阅告警
private readonly ConcurrentDictionary<string, bool> _userAlarmSubscriptions = new();
// 心跳定时器
private System.Threading.Timer _heartbeatTimer;
private bool _isHeartbeatRunning = false;
public RealTimeManager(
IHubContext<RealTimeHub> hubContext,
IWebSocketAuthMiddleware authMiddleware,
ICachingService cachingService)
{
_hubContext = hubContext;
_authMiddleware = authMiddleware;
_cachingService = cachingService;
}
public async Task ConnectClientAsync(string connectionId, string userId)
{
_connectionUsers[connectionId] = userId;
await _cachingService.SetAsync($"user_connections_{userId}", new List<string> { connectionId }, TimeSpan.FromMinutes(30));
LogDebug($"Client {connectionId} connected for user {userId}");
}
public async Task DisconnectClientAsync(string connectionId)
{
if (_connectionUsers.TryRemove(connectionId, out var userId))
{
// 更新用户连接列表
var userConnections = await _cachingService.GetAsync<List<string>>($"user_connections_{userId}");
if (userConnections != null)
{
userConnections.Remove(connectionId);
if (userConnections.Count > 0)
{
await _cachingService.SetAsync($"user_connections_{userId}", userConnections, TimeSpan.FromMinutes(30));
}
else
{
await _cachingService.RemoveAsync($"user_connections_{userId}");
}
}
// 清理设备订阅
if (_userDeviceSubscriptions.TryRemove(connectionId, out var deviceIds))
{
await UnsubscribeFromDevicesAsync(connectionId, deviceIds);
}
// 清理告警订阅
_userAlarmSubscriptions.TryRemove(connectionId, out _);
LogDebug($"Client {connectionId} disconnected for user {userId}");
}
}
public async Task SubscribeToDevicesAsync(string connectionId, IEnumerable<int> deviceIds)
{
if (!_connectionUsers.ContainsKey(connectionId))
{
throw new InvalidOperationException("Connection not found");
}
var userId = _connectionUsers[connectionId];
// 获取或创建设备订阅集合
if (!_userDeviceSubscriptions.TryGetValue(connectionId, out var subscriptions))
{
subscriptions = new HashSet<int>();
_userDeviceSubscriptions[connectionId] = subscriptions;
}
// 添加新订阅
var newSubscriptions = deviceIds.Except(subscriptions).ToList();
foreach (var deviceId in newSubscriptions)
{
subscriptions.Add(deviceId);
}
// 如果有新订阅,发送确认
if (newSubscriptions.Count > 0)
{
await SendToUserAsync(userId, "DeviceSubscribed", new { DeviceIds = newSubscriptions });
LogDebug($"User {userId} subscribed to devices: {string.Join(", ", newSubscriptions)}");
}
}
public async Task UnsubscribeFromDevicesAsync(string connectionId, IEnumerable<int> deviceIds)
{
if (_userDeviceSubscriptions.TryGetValue(connectionId, out var subscriptions))
{
var removedSubscriptions = deviceIds.Intersect(subscriptions).ToList();
foreach (var deviceId in removedSubscriptions)
{
subscriptions.Remove(deviceId);
}
// 如果有取消订阅,发送确认
if (removedSubscriptions.Count > 0)
{
var userId = _connectionUsers[connectionId];
await SendToUserAsync(userId, "DeviceUnsubscribed", new { DeviceIds = removedSubscriptions });
LogDebug($"User {userId} unsubscribed from devices: {string.Join(", ", removedSubscriptions)}");
}
}
}
public async Task SubscribeToAllDevicesAsync(string connectionId)
{
if (!_connectionUsers.ContainsKey(connectionId))
{
throw new InvalidOperationException("Connection not found");
}
var userId = _connectionUsers[connectionId];
await _cachingService.SetAsync($"user_all_devices_{userId}", true, TimeSpan.FromMinutes(30));
await SendToUserAsync(userId, "SubscribedToAllDevices", null);
LogDebug($"User {userId} subscribed to all devices");
}
public async Task UnsubscribeFromAllDevicesAsync(string connectionId)
{
if (!_connectionUsers.ContainsKey(connectionId))
{
throw new InvalidOperationException("Connection not found");
}
var userId = _connectionUsers[connectionId];
await _cachingService.RemoveAsync($"user_all_devices_{userId}");
// 清理设备订阅
if (_userDeviceSubscriptions.TryGetValue(connectionId, out var deviceIds))
{
await UnsubscribeFromDevicesAsync(connectionId, deviceIds);
}
await SendToUserAsync(userId, "UnsubscribedFromAllDevices", null);
LogDebug($"User {userId} unsubscribed from all devices");
}
public async Task BroadcastDeviceStatusAsync(DeviceCurrentStatus status)
{
var message = new
{
DeviceId = status.DeviceId,
DeviceCode = status.DeviceCode,
Status = status.Status,
NCProgram = status.NCProgram,
CumulativeCount = status.CumulativeCount,
RecordTime = status.RecordTime,
IsActive = status.IsActive
};
// 发送给订阅了该设备的用户
await SendToSubscribedUsersAsync($"Device_{status.DeviceId}", message);
// 如果有用户订阅了所有设备,也发送给他们
await SendToAllDevicesSubscribersAsync("DeviceStatusUpdate", message);
}
public async Task BroadcastProductionUpdateAsync(ProductionUpdate update)
{
var message = new
{
DeviceId = update.DeviceId,
DeviceCode = update.DeviceCode,
NCProgram = update.NCProgram,
Quantity = update.Quantity,
Timestamp = update.Timestamp,
TotalCount = update.TotalCount
};
await SendToAllAsync("ProductionUpdate", message);
}
public async Task BroadcastAlarmAsync(Alarm alarm)
{
var message = new
{
AlarmId = alarm.AlarmId,
DeviceId = alarm.DeviceId,
DeviceCode = alarm.DeviceCode,
AlarmType = alarm.AlarmType,
Severity = alarm.Severity,
Title = alarm.Title,
Description = alarm.Description,
AlarmStatus = alarm.AlarmStatus,
CreateTime = alarm.CreateTime
};
// 发送给所有订阅告警的用户
await SendToAlarmSubscribersAsync("AlarmCreated", message);
// 发送给所有用户
await SendToAllAsync("AlarmUpdate", message);
}
public async Task BroadcastSystemMessageAsync(SystemMessage message)
{
await SendToAllAsync("SystemMessage", message);
}
public async Task SendToUserAsync(string userId, string method, object data)
{
var connections = await _cachingService.GetAsync<List<string>>($"user_connections_{userId}");
if (connections != null)
{
foreach (var connectionId in connections)
{
await _hubContext.Clients.Client(connectionId).SendAsync(method, data);
}
}
}
public async Task SendToUsersAsync(IEnumerable<string> userIds, string method, object data)
{
foreach (var userId in userIds)
{
await SendToUserAsync(userId, method, data);
}
}
public async Task SendToAllAsync(string method, object data)
{
await _hubContext.Clients.All.SendAsync(method, data);
}
public async Task<int> GetConnectedClientsCountAsync()
{
return _connectionUsers.Count;
}
public async Task<IEnumerable<string>> GetConnectedUsersAsync()
{
return _connectionUsers.Values.Distinct();
}
public async Task<bool> IsUserConnectedAsync(string userId)
{
var connections = await _cachingService.GetAsync<List<string>>($"user_connections_{userId}");
return connections != null && connections.Count > 0;
}
public async Task<IEnumerable<int>> GetUserSubscribedDevicesAsync(string userId)
{
var devices = new List<int>();
// 获取直接订阅的设备
foreach (var kvp in _userDeviceSubscriptions)
{
var userConnections = await _cachingService.GetAsync<List<string>>($"user_connections_{userId}");
if (userConnections != null && userConnections.Contains(kvp.Key))
{
devices.AddRange(kvp.Value);
}
}
// 获取订阅所有设备的用户
var allDevicesSubscribed = await _cachingService.GetAsync<bool>($"user_all_devices_{userId}");
if (allDevicesSubscribed)
{
// 获取所有设备ID
devices.AddRange(await GetAllDeviceIdsAsync());
}
return devices.Distinct();
}
public async Task StartHeartbeatAsync()
{
if (_isHeartbeatRunning)
{
return;
}
_heartbeatTimer = new System.Threading.Timer(
async _ => await SendHeartbeatAsync(),
null,
TimeSpan.Zero,
TimeSpan.FromSeconds(30));
_isHeartbeatRunning = true;
}
public async Task StopHeartbeatAsync()
{
if (_isHeartbeatRunning)
{
_heartbeatTimer?.Dispose();
_heartbeatTimer = null;
_isHeartbeatRunning = false;
}
}
private async Task SendToSubscribedUsersAsync(string group, object message)
{
await _hubContext.Clients.Group(group).SendAsync("DeviceStatusUpdate", message);
}
private async Task SendToAllDevicesSubscribersAsync(string method, object message)
{
var userIds = (await GetConnectedUsersAsync()).ToList();
await SendToUsersAsync(userIds, method, message);
}
private async Task SendToAlarmSubscribersAsync(string method, object message)
{
var alarmSubscribers = _userAlarmSubscriptions.Keys.ToList();
await SendToUsersAsync(alarmSubscribers, method, message);
}
private async Task SendHeartbeatAsync()
{
var heartbeat = new
{
Timestamp = DateTime.Now,
ConnectedUsers = await GetConnectedClientsCountAsync(),
ActiveDevices = await GetActiveDeviceCountAsync()
};
await SendToAllAsync("Heartbeat", heartbeat);
}
private async Task<int> GetActiveDeviceCountAsync()
{
// 这里需要从设备服务获取活跃设备数量
// 暂时返回0实际实现时需要注入设备服务
return 0;
}
private async Task<IEnumerable<int>> GetAllDeviceIdsAsync()
{
// 这里需要从设备服务获取所有设备ID
// 暂时返回空集合,实际实现时需要注入设备服务
return Enumerable.Empty<int>();
}
private void LogDebug(string message)
{
// 这里应该注入日志服务
Console.WriteLine($"[RealTimeManager] {message}");
}
}
public class RealTimeHub : Hub<IWebSocketHub>
{
private readonly IRealTimeService _realTimeService;
private readonly IWebSocketAuthMiddleware _authMiddleware;
public RealTimeHub(
IRealTimeService realTimeService,
IWebSocketAuthMiddleware authMiddleware)
{
_realTimeService = realTimeService;
_authMiddleware = authMiddleware;
}
public override async Task OnConnectedAsync()
{
var connectionId = Context.ConnectionId;
// 获取用户token从查询参数或头信息
var token = Context.GetHttpContext()?.Request.Query["token"];
if (!string.IsNullOrEmpty(token))
{
await _authMiddleware.AuthenticateAsync(connectionId, token);
}
await base.OnConnectedAsync();
}
public override async Task OnDisconnectedAsync(Exception exception)
{
await _realTimeService.DisconnectClientAsync(Context.ConnectionId);
await base.OnDisconnectedAsync(exception);
}
public async Task SubscribeToDevicesAsync(IEnumerable<int> deviceIds)
{
await _realTimeService.SubscribeToDevicesAsync(Context.ConnectionId, deviceIds);
await Clients.Caller.OnSubscribeToDevicesComplete();
}
public async Task UnsubscribeFromDevicesAsync(IEnumerable<int> deviceIds)
{
await _realTimeService.UnsubscribeFromDevicesAsync(Context.ConnectionId, deviceIds);
await Clients.Caller.OnUnsubscribeFromDevicesComplete();
}
public async Task SubscribeToAlarmsAsync()
{
// 实现订阅告警的逻辑
await Clients.Caller.OnSubscribeToAlarmsComplete();
}
public async Task UnsubscribeFromAlarmsAsync()
{
// 实现取消订阅告警的逻辑
await Clients.Caller.OnUnsubscribeFromAlarmsComplete();
}
}
}