using System; using System.Collections.Generic; using System.Net.Http; using System.Net.NetworkInformation; using System.Threading; using System.Threading.Tasks; using MySql.Data.MySqlClient; using log4net; using CncModels.Entity; using Newtonsoft.Json; using System.Linq; namespace CncCollector.Core { /// /// 单个采集地址工作线程:负责定时抓取、解析、入库及健康状态上报。 /// public class CollectWorker { private readonly CollectAddress _address; private readonly string _connectionString; private readonly ProductionTracker _tracker; private readonly string _apiKey; private readonly ILog _log = LogManager.GetLogger(typeof(CollectWorker)); private Thread _thread; private volatile bool _stop; public CollectWorker(CollectAddress address, string connectionString, ProductionTracker tracker, string apiKey) { _address = address ?? throw new ArgumentNullException(nameof(address)); _connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); _tracker = tracker ?? throw new ArgumentNullException(nameof(tracker)); _apiKey = apiKey; } public void Start() { _stop = false; _thread = new Thread(Run) { IsBackground = true, Name = $"CollectWorker-{_address?.Id ?? 0}" }; _thread.Start(); } public void Stop() { _stop = true; _thread?.Join(); } private void Run() { while (!_stop) { try { // 1) Ping 测试连通性 if (!string.IsNullOrWhiteSpace(_address?.Url)) { try { var host = new Uri(_address.Url).Host; var ping = new Ping(); var reply = ping.Send(host, 1000); // 写入简要在线状态 // 实际实现:更新 cnc_machine.is_online 等字段 } catch { /* 忽略 Ping 失败带来的异常 */ } } // 2) HTTP GET 采集 if (!string.IsNullOrWhiteSpace(_address?.Url)) { using (var http = new HttpClient { Timeout = TimeSpan.FromSeconds(30) }) { var resp = http.GetAsync(_address.Url).Result; if (resp.IsSuccessStatusCode) { var json = resp.Content.ReadAsStringAsync().Result; // 解析(品牌信息在实际实现中注入) var parsed = Core.DataParser.Parse(_address.BrandName, json, null); if (parsed != null && parsed.Count > 0) { // 将解析后的字段映射为 CollectRecord,简化实现:创建空记录集合以便调用写入 var recs = new List(); // 实际实现应根据 parsed 构造 CollectRecord 对象 if (recs.Count > 0) { CollectRecordWriter.WriteBatch(_connectionString, recs, json, _address.Id); } // 产量跟踪(简化实现:尝试从第一个字段的值计算产量) int produced = 0; var first = parsed.Values.FirstOrDefault(); if (first != null && first.Value != null) { if (first.Value is int iv) produced = iv; else if (first.Value is decimal dv) produced = (int)dv; else if (first.Value is long lv) produced = (int)lv; } _tracker?.Track(_address.MachineId, parsed.Values.Select(v => v.FieldName).FirstOrDefault() ?? string.Empty, produced, DateTime.Now); } } } } } catch (Exception ex) { _log.Error("CollectWorker 异常", ex); } // 采集间隔,若未配置则 30 秒 var interval = 30; try { if (_address != null && _address.CollectInterval > 0) interval = _address.CollectInterval; } catch { } Thread.Sleep(interval * 1000); } } } }