using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.NetworkInformation;
using System.Threading;
using System.Threading.Tasks;
using MySql.Data.MySqlClient;
using log4net;
using CncModels.Entity;
using Newtonsoft.Json;
using System.Linq;
namespace CncCollector.Core
{
///
/// 单个采集地址工作线程:负责定时抓取、解析、入库及健康状态上报。
///
public class CollectWorker
{
private readonly CollectAddress _address;
private readonly string _connectionString;
private readonly ProductionTracker _tracker;
private readonly string _apiKey;
private readonly ILog _log = LogManager.GetLogger(typeof(CollectWorker));
private Thread _thread;
private volatile bool _stop;
public CollectWorker(CollectAddress address, string connectionString, ProductionTracker tracker, string apiKey)
{
_address = address ?? throw new ArgumentNullException(nameof(address));
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
_tracker = tracker ?? throw new ArgumentNullException(nameof(tracker));
_apiKey = apiKey;
}
public void Start()
{
_stop = false;
_thread = new Thread(Run)
{
IsBackground = true,
Name = $"CollectWorker-{_address?.Id ?? 0}"
};
_thread.Start();
}
public void Stop()
{
_stop = true;
_thread?.Join();
}
private void Run()
{
while (!_stop)
{
try
{
// 1) Ping 测试连通性
if (!string.IsNullOrWhiteSpace(_address?.Url))
{
try
{
var host = new Uri(_address.Url).Host;
var ping = new Ping();
var reply = ping.Send(host, 1000);
// 写入简要在线状态
// 实际实现:更新 cnc_machine.is_online 等字段
}
catch { /* 忽略 Ping 失败带来的异常 */ }
}
// 2) HTTP GET 采集
if (!string.IsNullOrWhiteSpace(_address?.Url))
{
using (var http = new HttpClient { Timeout = TimeSpan.FromSeconds(30) })
{
var resp = http.GetAsync(_address.Url).Result;
if (resp.IsSuccessStatusCode)
{
var json = resp.Content.ReadAsStringAsync().Result;
// 解析(品牌信息在实际实现中注入)
var parsed = Core.DataParser.Parse(_address.BrandName, json, null);
if (parsed != null && parsed.Count > 0)
{
// 将解析后的字段映射为 CollectRecord,简化实现:创建空记录集合以便调用写入
var recs = new List();
// 实际实现应根据 parsed 构造 CollectRecord 对象
if (recs.Count > 0)
{
CollectRecordWriter.WriteBatch(_connectionString, recs, json, _address.Id);
}
// 产量跟踪(简化实现:尝试从第一个字段的值计算产量)
int produced = 0;
var first = parsed.Values.FirstOrDefault();
if (first != null && first.Value != null)
{
if (first.Value is int iv) produced = iv;
else if (first.Value is decimal dv) produced = (int)dv;
else if (first.Value is long lv) produced = (int)lv;
}
_tracker?.Track(_address.MachineId, parsed.Values.Select(v => v.FieldName).FirstOrDefault() ?? string.Empty, produced, DateTime.Now);
}
}
}
}
}
catch (Exception ex)
{
_log.Error("CollectWorker 异常", ex);
}
// 采集间隔,若未配置则 30 秒
var interval = 30;
try
{
if (_address != null && _address.CollectInterval > 0) interval = _address.CollectInterval;
}
catch { }
Thread.Sleep(interval * 1000);
}
}
}
}