You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
haoliang-net/src/CncCollector/Core/CollectWorker.cs

460 lines
18 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.NetworkInformation;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using MySqlConnector;
using Newtonsoft.Json.Linq;
using CncModels.Entity;
using CncModels.Enum;
using CncCollector.Config;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// 单个采集地址的工作线程。
/// 循环执行Ping检测 → HTTP采集 → JSON解析 → 数据入库 → 产量跟踪。
/// </summary>
public class CollectWorker
{
private static readonly ILog _log = LogManager.GetLogger(typeof(CollectWorker));
private readonly CollectAddress _address;
private readonly CollectorConfig _config;
private readonly ProductionTracker _tracker;
private readonly AnalysisEngine _analysisEngine;
private readonly string _businessConnStr;
private readonly string _logConnStr;
private Thread _thread;
private volatile bool _running;
private int _consecutiveFailCount;
/// <summary>采集地址ID</summary>
public int AddressId => _address.Id;
/// <summary>采集地址名称</summary>
public string AddressName => _address.Name;
/// <summary>采集地址URL</summary>
public string AddressUrl => _address.Url;
/// <summary>是否运行中</summary>
public bool IsRunning => _running;
/// <summary>最后采集时间</summary>
public DateTime? LastCollectTime { get; private set; }
/// <summary>成功次数</summary>
public long SuccessCount { get; private set; }
/// <summary>失败次数</summary>
public long FailCount { get; private set; }
/// <summary>
/// 初始化采集工作线程
/// </summary>
/// <param name="address">采集地址配置</param>
/// <param name="config">全局配置</param>
/// <param name="tracker">产量跟踪器</param>
/// <param name="businessConnStr">业务库连接字符串</param>
/// <param name="logConnStr">日志库连接字符串</param>
public CollectWorker(CollectAddress address, CollectorConfig config, ProductionTracker tracker,
AnalysisEngine analysisEngine, string businessConnStr, string logConnStr)
{
_address = address;
_config = config;
_tracker = tracker;
_analysisEngine = analysisEngine;
_businessConnStr = businessConnStr;
_logConnStr = logConnStr;
}
/// <summary>
/// 启动工作线程
/// </summary>
public void Start()
{
if (_running) return;
_running = true;
_thread = new Thread(Run) { IsBackground = true, Name = $"Collector-{_address.Id}" };
_thread.Start();
_log.Info($"采集工作线程已启动: {_address.Name}(间隔={_address.CollectInterval}秒)");
}
/// <summary>
/// 停止工作线程
/// </summary>
public void Stop()
{
_running = false;
_log.Info($"采集工作线程已停止: {_address.Name}");
}
/// <summary>
/// 工作线程主循环
/// </summary>
private void Run()
{
while (_running)
{
try
{
DoCollectCycle();
}
catch (Exception ex)
{
_log.Error($"采集循环异常(地址={_address.Name}", ex);
}
// 等待下一次采集
int intervalMs = _address.CollectInterval * 1000;
for (int i = 0; i < intervalMs / 100 && _running; i++)
{
Thread.Sleep(100);
}
}
}
/// <summary>
/// 执行一次完整的采集周期逐台Ping机床IP → HTTP采集 → 解析 → 入库
/// </summary>
private void DoCollectCycle()
{
var requestTime = DateTime.Now;
// 1. Ping 每台机床的IP地址更新各自的在线状态
PingAllMachines();
// 2. HTTP 采集(含重试)
string rawJson = null;
bool success = false;
string errorMsg = null;
long durationMs = 0;
int? httpStatusCode = null;
int retryCount = _config.CollectRetryCount;
for (int attempt = 0; attempt <= retryCount; attempt++)
{
if (attempt > 0)
{
_log.Warn($"采集重试第{attempt}次(地址={_address.Name}");
Thread.Sleep(_config.CollectRetryIntervalSeconds * 1000);
}
var sw = Stopwatch.StartNew();
try
{
using (var client = new HttpClient { Timeout = TimeSpan.FromSeconds(30) })
{
var response = client.GetAsync(_address.Url).Result;
sw.Stop();
durationMs = sw.ElapsedMilliseconds;
httpStatusCode = (int)response.StatusCode;
if (response.IsSuccessStatusCode)
{
rawJson = response.Content.ReadAsStringAsync().Result;
success = true;
break;
}
else
{
errorMsg = $"HTTP {(int)response.StatusCode}: {response.ReasonPhrase}";
}
}
}
catch (Exception ex)
{
sw.Stop();
durationMs = sw.ElapsedMilliseconds;
errorMsg = ex.Message;
}
}
if (!success)
{
_consecutiveFailCount++;
FailCount++;
_log.Warn($"采集失败(地址={_address.Name}, 连续失败={_consecutiveFailCount}: {errorMsg}");
// 写入失败记录
CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, null, rawJson,
_address.Id, requestTime, durationMs, false, errorMsg, httpStatusCode);
CollectRecordWriter.RecordFailure(_businessConnStr, _address.Id, errorMsg);
// 连续失败超过阈值,触发告警
if (_consecutiveFailCount >= _config.CollectFailAlertThreshold)
{
CreateAlert(AlertType.CollectFail, null, _address.Id,
$"采集地址「{_address.Name}」连续{_consecutiveFailCount}次采集失败",
errorMsg);
}
return;
}
// 3. 解析 JSON
_consecutiveFailCount = 0;
SuccessCount++;
LastCollectTime = DateTime.Now;
try
{
ParseAndSave(rawJson, requestTime, durationMs, httpStatusCode ?? 200);
}
catch (Exception ex)
{
_log.Error($"JSON解析/入库失败(地址={_address.Name}", ex);
}
}
/// <summary>
/// Ping每台机床的IP地址更新各自的在线状态并行执行
/// </summary>
private void PingAllMachines()
{
try
{
// 加载此地址下所有启用的机床ID + IP地址
List<(int Id, string Ip)> machines;
using (var conn = new MySqlConnection(_businessConnStr))
{
machines = conn.Query<(int Id, string Ip)>(
"SELECT id, ip_address FROM cnc_machine WHERE collect_address_id = @AddrId AND is_enabled = 1",
new { AddrId = _address.Id }).AsList();
}
if (machines.Count == 0) return;
// 并行Ping所有机床超时2秒/台)
var results = new ConcurrentDictionary<int, bool>();
var tasks = machines.Select(m => Task.Run(() =>
{
results[m.Id] = PingHost(m.Ip);
})).ToArray();
Task.WaitAll(tasks, Math.Min(machines.Count * 3000, 30000));
// 按在线/离线分组批量更新
var onlineIds = results.Where(kv => kv.Value).Select(kv => kv.Key).ToList();
var offlineIds = results.Where(kv => !kv.Value).Select(kv => kv.Key).ToList();
using (var conn = new MySqlConnection(_businessConnStr))
{
if (onlineIds.Count > 0)
conn.Execute(@"UPDATE cnc_machine SET last_ping_time = NOW(), updated_at = NOW() WHERE id IN @Ids",
new { Ids = onlineIds });
if (offlineIds.Count > 0)
conn.Execute(@"UPDATE cnc_machine SET last_ping_time = NOW(), updated_at = NOW() WHERE id IN @Ids",
new { Ids = offlineIds });
}
_log.Info($"Ping完成地址={_address.Name}):在线{onlineIds.Count}台,离线{offlineIds.Count}台");
}
catch (Exception ex)
{
_log.Error($"Ping机床失败地址={_address.Name}", ex);
}
}
/// <summary>
/// Ping指定主机地址超时2秒
/// </summary>
private bool PingHost(string host)
{
try
{
using (var ping = new Ping())
{
var reply = ping.Send(host, 2000);
return reply.Status == IPStatus.Success;
}
}
catch (Exception ex)
{
_log.Debug($"Ping失败主机={host}: {ex.Message}");
return false;
}
}
/// <summary>
/// 解析采集到的 JSON 数据并写入数据库
/// </summary>
private void ParseAndSave(string rawJson, DateTime requestTime, long durationMs, int statusCode = 200)
{
var collectTime = DateTime.Now;
// 1. 加载品牌配置和字段映射
Brand brand = null;
List<BrandFieldMapping> mappings = null;
List<Machine> machines = null;
using (var conn = new MySqlConnection(_businessConnStr))
{
brand = conn.QueryFirstOrDefault<Brand>(
"SELECT id as Id, brand_name as BrandName, device_field as DeviceField, tags_path as TagsPath, is_enabled as IsEnabled, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_brand WHERE id = @BrandId",
new { BrandId = _address.BrandId });
if (brand != null)
{
mappings = conn.Query<BrandFieldMapping>(
"SELECT id as Id, brand_id as BrandId, standard_field as StandardField, field_name as FieldName, match_by as MatchBy, data_type as DataType, is_required as IsRequired, created_at as CreatedAt FROM cnc_brand_field_mapping WHERE brand_id = @BrandId",
new { BrandId = brand.Id }).AsList();
}
// 加载此地址下的机床列表
machines = conn.Query<Machine>(
"SELECT id as Id, device_code as DeviceCode, name as Name, workshop_id as WorkshopId, collect_address_id as CollectAddressId, ip_address as IpAddress, brand_id as BrandId, is_enabled as IsEnabled, last_ping_time as LastPingTime, last_collect_time as LastCollectTime, last_device_status as LastDeviceStatus, last_run_status as LastRunStatus, last_program_name as LastProgramName, last_part_count as LastPartCount, last_operate_mode as LastOperateMode, last_machining_status as LastMachiningStatus, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_machine WHERE collect_address_id = @AddrId AND is_enabled = 1",
new { AddrId = _address.Id }).AsList();
}
if (brand == null || mappings == null || mappings.Count == 0)
{
_log.Warn($"品牌配置或字段映射缺失(地址={_address.Name}, brandId={_address.BrandId}");
return;
}
// 2. 构建 device_code → machine 的查找字典
var machineDict = new Dictionary<string, Machine>(StringComparer.OrdinalIgnoreCase);
if (machines != null)
{
foreach (var m in machines)
{
if (!string.IsNullOrEmpty(m.DeviceCode))
machineDict[m.DeviceCode] = m;
}
}
// 3. 解析 JSON 数组
JArray devices;
try
{
devices = JArray.Parse(rawJson);
}
catch (Exception ex)
{
_log.Error($"JSON解析失败地址={_address.Name}", ex);
return;
}
var records = new List<CollectRecord>();
foreach (var deviceToken in devices)
{
var deviceObj = deviceToken as JObject;
if (deviceObj == null) continue;
// 提取 device 字段值
string deviceCode = DataParser.ExtractDeviceCode(deviceObj, brand.DeviceField ?? "device");
if (string.IsNullOrEmpty(deviceCode))
{
_log.Warn("发现无device字段的设备数据跳过");
continue;
}
// 匹配机床
Machine machine;
if (!machineDict.TryGetValue(deviceCode, out machine))
{
// 未知设备 → 记录告警
_log.Warn($"未知设备: device={deviceCode}(地址={_address.Name}");
CreateAlert(AlertType.UnknownDevice, null, _address.Id,
$"发现未知设备: {deviceCode}",
$"采集地址「{_address.Name}」返回了未在机床表中配置的设备编码: {deviceCode}");
continue;
}
// 解析字段
var parsed = DataParser.ParseDevice(deviceObj, brand, mappings);
// 构建 CollectRecord
var record = new CollectRecord
{
MachineId = machine.Id,
CollectTime = collectTime,
ProgramName = GetStringValue(parsed, "program_name"),
PartCount = GetDecimalValue(parsed, "part_count"),
DeviceStatus = GetStringValue(parsed, "device_status"),
RunStatus = GetStringValue(parsed, "run_status"),
OperateMode = GetStringValue(parsed, "operate_mode"),
SpindleSpeedSet = GetDecimalValue(parsed, "spindle_speed_set"),
FeedSpeedSet = GetDecimalValue(parsed, "feed_speed_set"),
SpindleSpeedActual = GetDecimalValue(parsed, "spindle_speed_actual"),
FeedSpeedActual = GetDecimalValue(parsed, "feed_speed_actual"),
SpindleLoad = GetDecimalValue(parsed, "spindle_load"),
SpindleOverride = GetDecimalValue(parsed, "spindle_override"),
PowerOnTime = GetDecimalValue(parsed, "power_on_time"),
RunTime = GetDecimalValue(parsed, "run_time"),
CuttingTime = GetDecimalValue(parsed, "cutting_time"),
CycleTime = GetDecimalValue(parsed, "cycle_time"),
MachiningStatus = GetStringValue(parsed, "machining_status")
};
records.Add(record);
// 产量跟踪
_tracker.Track(machine.Id, record.ProgramName, record.PartCount, collectTime);
}
// 4. 批量写入
long rawLogId = CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, records, rawJson,
_address.Id, requestTime, durationMs, true, null, statusCode);
// 采集分析:将分析任务委托给 AnalysisEngine
if (rawLogId > 0 && records != null && records.Count > 0 && _analysisEngine != null)
{
_analysisEngine.AnalyzeAndRecord(rawLogId, _address.Id, _address.Name, records, machineDict, requestTime, durationMs);
}
_log.Info($"采集完成: {_address.Name} → {records.Count}台设备, {durationMs}ms");
}
/// <summary>
/// 从解析结果中获取字符串值
/// </summary>
private string GetStringValue(Dictionary<string, DataParser.ParsedField> parsed, string field)
{
DataParser.ParsedField pf;
if (parsed.TryGetValue(field, out pf))
return pf.StringValue;
return null;
}
/// <summary>
/// 从解析结果中获取数值
/// </summary>
private decimal? GetDecimalValue(Dictionary<string, DataParser.ParsedField> parsed, string field)
{
DataParser.ParsedField pf;
if (parsed.TryGetValue(field, out pf))
return pf.NumericValue;
return null;
}
/// <summary>
/// 创建告警记录
/// </summary>
private void CreateAlert(string alertType, int? machineId, int? addressId, string title, string detail)
{
try
{
using (var conn = new MySqlConnection(_businessConnStr))
{
conn.Execute(@"INSERT INTO cnc_alert (alert_type, machine_id, collect_address_id, title, detail, is_resolved, created_at)
VALUES (@AlertType, @MachineId, @AddressId, @Title, @Detail, 0, NOW())",
new { AlertType = alertType, MachineId = machineId, AddressId = addressId, Title = title, Detail = detail });
}
}
catch (Exception ex)
{
_log.Error("创建告警记录失败", ex);
}
}
}
}