using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Net.Http; using System.Net.NetworkInformation; using System.Threading; using System.Threading.Tasks; using Dapper; using MySqlConnector; using Newtonsoft.Json.Linq; using CncModels.Entity; using CncModels.Enum; using CncCollector.Config; using log4net; namespace CncCollector.Core { /// /// 单个采集地址的工作线程。 /// 循环执行:Ping检测 → HTTP采集 → JSON解析 → 数据入库 → 产量跟踪。 /// public class CollectWorker { private static readonly ILog _log = LogManager.GetLogger(typeof(CollectWorker)); private readonly CollectAddress _address; private readonly CollectorConfig _config; private readonly ProductionTracker _tracker; private readonly AnalysisEngine _analysisEngine; private readonly string _businessConnStr; private readonly string _logConnStr; private Thread _thread; private volatile bool _running; private int _consecutiveFailCount; /// 采集地址ID public int AddressId => _address.Id; /// 采集地址名称 public string AddressName => _address.Name; /// 采集地址URL public string AddressUrl => _address.Url; /// 是否运行中 public bool IsRunning => _running; /// 最后采集时间 public DateTime? LastCollectTime { get; private set; } /// 成功次数 public long SuccessCount { get; private set; } /// 失败次数 public long FailCount { get; private set; } /// /// 初始化采集工作线程 /// /// 采集地址配置 /// 全局配置 /// 产量跟踪器 /// 业务库连接字符串 /// 日志库连接字符串 public CollectWorker(CollectAddress address, CollectorConfig config, ProductionTracker tracker, AnalysisEngine analysisEngine, string businessConnStr, string logConnStr) { _address = address; _config = config; _tracker = tracker; _analysisEngine = analysisEngine; _businessConnStr = businessConnStr; _logConnStr = logConnStr; } /// /// 启动工作线程 /// public void Start() { if (_running) return; _running = true; _thread = new Thread(Run) { IsBackground = true, Name = $"Collector-{_address.Id}" }; _thread.Start(); _log.Info($"采集工作线程已启动: {_address.Name}(间隔={_address.CollectInterval}秒)"); } /// /// 停止工作线程 /// public void Stop() { _running = false; _log.Info($"采集工作线程已停止: {_address.Name}"); } /// /// 工作线程主循环 /// private void Run() { while (_running) { try { DoCollectCycle(); } catch (Exception ex) { _log.Error($"采集循环异常(地址={_address.Name})", ex); } // 等待下一次采集 int intervalMs = _address.CollectInterval * 1000; for (int i = 0; i < intervalMs / 100 && _running; i++) { Thread.Sleep(100); } } } /// /// 执行一次完整的采集周期:逐台Ping机床IP → HTTP采集 → 解析 → 入库 /// private void DoCollectCycle() { var requestTime = DateTime.Now; // 1. Ping 每台机床的IP地址,更新各自的在线状态 PingAllMachines(); // 2. HTTP 采集(含重试) string rawJson = null; bool success = false; string errorMsg = null; long durationMs = 0; int? httpStatusCode = null; int retryCount = _config.CollectRetryCount; for (int attempt = 0; attempt <= retryCount; attempt++) { if (attempt > 0) { _log.Warn($"采集重试第{attempt}次(地址={_address.Name})"); Thread.Sleep(_config.CollectRetryIntervalSeconds * 1000); } var sw = Stopwatch.StartNew(); try { using (var client = new HttpClient { Timeout = TimeSpan.FromSeconds(30) }) { var response = client.GetAsync(_address.Url).Result; sw.Stop(); durationMs = sw.ElapsedMilliseconds; httpStatusCode = (int)response.StatusCode; if (response.IsSuccessStatusCode) { rawJson = response.Content.ReadAsStringAsync().Result; success = true; break; } else { errorMsg = $"HTTP {(int)response.StatusCode}: {response.ReasonPhrase}"; } } } catch (Exception ex) { sw.Stop(); durationMs = sw.ElapsedMilliseconds; errorMsg = ex.Message; } } if (!success) { _consecutiveFailCount++; FailCount++; _log.Warn($"采集失败(地址={_address.Name}, 连续失败={_consecutiveFailCount}): {errorMsg}"); // 写入失败记录 CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, null, rawJson, _address.Id, requestTime, durationMs, false, errorMsg, httpStatusCode); CollectRecordWriter.RecordFailure(_businessConnStr, _address.Id, errorMsg); // 连续失败超过阈值,触发告警 if (_consecutiveFailCount >= _config.CollectFailAlertThreshold) { CreateAlert(AlertType.CollectFail, null, _address.Id, $"采集地址「{_address.Name}」连续{_consecutiveFailCount}次采集失败", errorMsg); } return; } // 3. 解析 JSON _consecutiveFailCount = 0; SuccessCount++; LastCollectTime = DateTime.Now; try { ParseAndSave(rawJson, requestTime, durationMs, httpStatusCode ?? 200); } catch (Exception ex) { _log.Error($"JSON解析/入库失败(地址={_address.Name})", ex); } } /// /// Ping每台机床的IP地址,更新各自的在线状态(并行执行) /// private void PingAllMachines() { try { // 加载此地址下所有启用的机床(ID + IP地址) List<(int Id, string Ip)> machines; using (var conn = new MySqlConnection(_businessConnStr)) { machines = conn.Query<(int Id, string Ip)>( "SELECT id, ip_address FROM cnc_machine WHERE collect_address_id = @AddrId AND is_enabled = 1", new { AddrId = _address.Id }).AsList(); } if (machines.Count == 0) return; // 并行Ping所有机床(超时2秒/台) var results = new ConcurrentDictionary(); var tasks = machines.Select(m => Task.Run(() => { results[m.Id] = PingHost(m.Ip); })).ToArray(); Task.WaitAll(tasks, Math.Min(machines.Count * 3000, 30000)); // 按在线/离线分组批量更新 var onlineIds = results.Where(kv => kv.Value).Select(kv => kv.Key).ToList(); var offlineIds = results.Where(kv => !kv.Value).Select(kv => kv.Key).ToList(); using (var conn = new MySqlConnection(_businessConnStr)) { if (onlineIds.Count > 0) conn.Execute(@"UPDATE cnc_machine SET last_ping_time = NOW(), updated_at = NOW() WHERE id IN @Ids", new { Ids = onlineIds }); if (offlineIds.Count > 0) conn.Execute(@"UPDATE cnc_machine SET last_ping_time = NOW(), updated_at = NOW() WHERE id IN @Ids", new { Ids = offlineIds }); } _log.Info($"Ping完成(地址={_address.Name}):在线{onlineIds.Count}台,离线{offlineIds.Count}台"); } catch (Exception ex) { _log.Error($"Ping机床失败(地址={_address.Name})", ex); } } /// /// Ping指定主机地址(超时2秒) /// private bool PingHost(string host) { try { using (var ping = new Ping()) { var reply = ping.Send(host, 2000); return reply.Status == IPStatus.Success; } } catch (Exception ex) { _log.Debug($"Ping失败(主机={host}): {ex.Message}"); return false; } } /// /// 解析采集到的 JSON 数据并写入数据库 /// private void ParseAndSave(string rawJson, DateTime requestTime, long durationMs, int statusCode = 200) { var collectTime = DateTime.Now; // 1. 加载品牌配置和字段映射 Brand brand = null; List mappings = null; List machines = null; using (var conn = new MySqlConnection(_businessConnStr)) { brand = conn.QueryFirstOrDefault( "SELECT id as Id, brand_name as BrandName, device_field as DeviceField, tags_path as TagsPath, is_enabled as IsEnabled, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_brand WHERE id = @BrandId", new { BrandId = _address.BrandId }); if (brand != null) { mappings = conn.Query( "SELECT id as Id, brand_id as BrandId, standard_field as StandardField, field_name as FieldName, match_by as MatchBy, data_type as DataType, is_required as IsRequired, created_at as CreatedAt FROM cnc_brand_field_mapping WHERE brand_id = @BrandId", new { BrandId = brand.Id }).AsList(); } // 加载此地址下的机床列表 machines = conn.Query( "SELECT id as Id, device_code as DeviceCode, name as Name, workshop_id as WorkshopId, collect_address_id as CollectAddressId, ip_address as IpAddress, brand_id as BrandId, is_enabled as IsEnabled, last_ping_time as LastPingTime, last_collect_time as LastCollectTime, last_device_status as LastDeviceStatus, last_run_status as LastRunStatus, last_program_name as LastProgramName, last_part_count as LastPartCount, last_operate_mode as LastOperateMode, last_machining_status as LastMachiningStatus, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_machine WHERE collect_address_id = @AddrId AND is_enabled = 1", new { AddrId = _address.Id }).AsList(); } if (brand == null || mappings == null || mappings.Count == 0) { _log.Warn($"品牌配置或字段映射缺失(地址={_address.Name}, brandId={_address.BrandId})"); return; } // 2. 构建 device_code → machine 的查找字典 var machineDict = new Dictionary(StringComparer.OrdinalIgnoreCase); if (machines != null) { foreach (var m in machines) { if (!string.IsNullOrEmpty(m.DeviceCode)) machineDict[m.DeviceCode] = m; } } // 3. 解析 JSON 数组 JArray devices; try { devices = JArray.Parse(rawJson); } catch (Exception ex) { _log.Error($"JSON解析失败(地址={_address.Name})", ex); return; } var records = new List(); foreach (var deviceToken in devices) { var deviceObj = deviceToken as JObject; if (deviceObj == null) continue; // 提取 device 字段值 string deviceCode = DataParser.ExtractDeviceCode(deviceObj, brand.DeviceField ?? "device"); if (string.IsNullOrEmpty(deviceCode)) { _log.Warn("发现无device字段的设备数据,跳过"); continue; } // 匹配机床 Machine machine; if (!machineDict.TryGetValue(deviceCode, out machine)) { // 未知设备 → 记录告警 _log.Warn($"未知设备: device={deviceCode}(地址={_address.Name})"); CreateAlert(AlertType.UnknownDevice, null, _address.Id, $"发现未知设备: {deviceCode}", $"采集地址「{_address.Name}」返回了未在机床表中配置的设备编码: {deviceCode}"); continue; } // 解析字段 var parsed = DataParser.ParseDevice(deviceObj, brand, mappings); // 构建 CollectRecord var record = new CollectRecord { MachineId = machine.Id, CollectTime = collectTime, ProgramName = GetStringValue(parsed, "program_name"), PartCount = GetDecimalValue(parsed, "part_count"), DeviceStatus = GetStringValue(parsed, "device_status"), RunStatus = GetStringValue(parsed, "run_status"), OperateMode = GetStringValue(parsed, "operate_mode"), SpindleSpeedSet = GetDecimalValue(parsed, "spindle_speed_set"), FeedSpeedSet = GetDecimalValue(parsed, "feed_speed_set"), SpindleSpeedActual = GetDecimalValue(parsed, "spindle_speed_actual"), FeedSpeedActual = GetDecimalValue(parsed, "feed_speed_actual"), SpindleLoad = GetDecimalValue(parsed, "spindle_load"), SpindleOverride = GetDecimalValue(parsed, "spindle_override"), PowerOnTime = GetDecimalValue(parsed, "power_on_time"), RunTime = GetDecimalValue(parsed, "run_time"), CuttingTime = GetDecimalValue(parsed, "cutting_time"), CycleTime = GetDecimalValue(parsed, "cycle_time"), MachiningStatus = GetStringValue(parsed, "machining_status") }; records.Add(record); // 产量跟踪 _tracker.Track(machine.Id, record.ProgramName, record.PartCount, collectTime); } // 4. 批量写入 long rawLogId = CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, records, rawJson, _address.Id, requestTime, durationMs, true, null, statusCode); // 采集分析:将分析任务委托给 AnalysisEngine if (rawLogId > 0 && records != null && records.Count > 0 && _analysisEngine != null) { _analysisEngine.AnalyzeAndRecord(rawLogId, _address.Id, _address.Name, records, machineDict, requestTime, durationMs); } _log.Info($"采集完成: {_address.Name} → {records.Count}台设备, {durationMs}ms"); } /// /// 从解析结果中获取字符串值 /// private string GetStringValue(Dictionary parsed, string field) { DataParser.ParsedField pf; if (parsed.TryGetValue(field, out pf)) return pf.StringValue; return null; } /// /// 从解析结果中获取数值 /// private decimal? GetDecimalValue(Dictionary parsed, string field) { DataParser.ParsedField pf; if (parsed.TryGetValue(field, out pf)) return pf.NumericValue; return null; } /// /// 创建告警记录 /// private void CreateAlert(string alertType, int? machineId, int? addressId, string title, string detail) { try { using (var conn = new MySqlConnection(_businessConnStr)) { conn.Execute(@"INSERT INTO cnc_alert (alert_type, machine_id, collect_address_id, title, detail, is_resolved, created_at) VALUES (@AlertType, @MachineId, @AddressId, @Title, @Detail, 0, NOW())", new { AlertType = alertType, MachineId = machineId, AddressId = addressId, Title = title, Detail = detail }); } } catch (Exception ex) { _log.Error("创建告警记录失败", ex); } } } }