You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
haoliang-net/src/CncCollector/Core/CollectWorker.cs

535 lines
22 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.NetworkInformation;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using MySqlConnector;
using Newtonsoft.Json.Linq;
using CncModels.Entity;
using CncModels.Enum;
using CncCollector.Config;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// 单个采集地址的工作线程。
/// 循环执行Ping检测 → HTTP采集 → JSON解析 → 数据入库 → 产量跟踪。
/// </summary>
public class CollectWorker
{
private static readonly ILog _log = LogManager.GetLogger(typeof(CollectWorker));
private readonly CollectAddress _address;
private readonly CollectorConfig _config;
private readonly ProductionTracker _tracker;
private readonly AnalysisEngine _analysisEngine;
private readonly string _businessConnStr;
private readonly string _logConnStr;
private Thread _thread;
private volatile bool _running;
private int _consecutiveFailCount;
/// <summary>采集地址ID</summary>
public int AddressId => _address.Id;
/// <summary>采集地址名称</summary>
public string AddressName => _address.Name;
/// <summary>采集地址URL</summary>
public string AddressUrl => _address.Url;
/// <summary>是否运行中</summary>
public bool IsRunning => _running;
/// <summary>最后采集时间</summary>
public DateTime? LastCollectTime { get; private set; }
/// <summary>成功次数</summary>
public long SuccessCount { get; private set; }
/// <summary>失败次数</summary>
public long FailCount { get; private set; }
/// <summary>
/// 初始化采集工作线程
/// </summary>
/// <param name="address">采集地址配置</param>
/// <param name="config">全局配置</param>
/// <param name="tracker">产量跟踪器</param>
/// <param name="businessConnStr">业务库连接字符串</param>
/// <param name="logConnStr">日志库连接字符串</param>
public CollectWorker(CollectAddress address, CollectorConfig config, ProductionTracker tracker,
AnalysisEngine analysisEngine, string businessConnStr, string logConnStr)
{
_address = address;
_config = config;
_tracker = tracker;
_analysisEngine = analysisEngine;
_businessConnStr = businessConnStr;
_logConnStr = logConnStr;
}
/// <summary>
/// 启动工作线程
/// </summary>
public void Start()
{
if (_running) return;
_running = true;
_thread = new Thread(Run) { IsBackground = true, Name = $"Collector-{_address.Id}" };
_thread.Start();
_log.Info($"采集工作线程已启动: {_address.Name}(间隔={_address.CollectInterval}秒)");
}
/// <summary>
/// 停止工作线程
/// </summary>
public void Stop()
{
_running = false;
_log.Info($"采集工作线程已停止: {_address.Name}");
}
/// <summary>
/// 递归提取异常链中的详细错误信息,包含所有内部异常的类型和消息。
/// 解决 AggregateException.Message 只返回"发生一个或多个错误。"而丢失根因的问题。
/// </summary>
private static string GetDetailedErrorMessage(Exception ex, int maxLength = 1800)
{
if (ex == null) return "";
var sb = new StringBuilder();
int depth = 0;
while (ex != null && sb.Length < maxLength)
{
if (depth > 0) sb.Append(" ← ");
sb.Append($"[{ex.GetType().Name}] {ex.Message}");
// 对于 AggregateException展开所有内部异常
if (ex is AggregateException aggEx)
{
foreach (var inner in aggEx.InnerExceptions)
{
if (sb.Length >= maxLength) break;
sb.Append(" | ");
sb.Append($"[{inner.GetType().Name}] {inner.Message}");
if (inner.InnerException != null)
{
ex = inner.InnerException;
depth++;
goto NextLevel;
}
}
}
ex = ex.InnerException;
depth++;
NextLevel:;
}
if (sb.Length >= maxLength)
{
sb.Length = maxLength - 3;
sb.Append("...");
}
return sb.ToString();
}
/// <summary>
/// 工作线程主循环
/// </summary>
private void Run()
{
while (_running)
{
try
{
DoCollectCycle();
}
catch (Exception ex)
{
_log.Error($"采集循环异常(地址={_address.Name}: {GetDetailedErrorMessage(ex, 500)}", ex);
}
// 等待下一次采集
int intervalMs = _address.CollectInterval * 1000;
for (int i = 0; i < intervalMs / 100 && _running; i++)
{
Thread.Sleep(100);
}
}
}
/// <summary>
/// 执行一次完整的采集周期逐台Ping机床IP → HTTP采集 → 解析 → 入库
/// </summary>
private void DoCollectCycle()
{
var requestTime = DateTime.Now;
// 1. Ping 每台机床的IP地址更新各自的在线状态
PingAllMachines();
// 2. HTTP 采集(含重试)
string rawJson = null;
bool success = false;
string errorMsg = null;
long durationMs = 0;
int? httpStatusCode = null;
int retryCount = _config.CollectRetryCount;
for (int attempt = 0; attempt <= retryCount; attempt++)
{
if (attempt > 0)
{
_log.Warn($"采集重试第{attempt}次(地址={_address.Name}");
Thread.Sleep(_config.CollectRetryIntervalSeconds * 1000);
}
var sw = Stopwatch.StartNew();
try
{
using (var client = new HttpClient { Timeout = TimeSpan.FromSeconds(30) })
{
var response = client.GetAsync(_address.Url).Result;
sw.Stop();
durationMs = sw.ElapsedMilliseconds;
httpStatusCode = (int)response.StatusCode;
if (response.IsSuccessStatusCode)
{
rawJson = response.Content.ReadAsStringAsync().Result;
success = true;
break;
}
else
{
errorMsg = $"HTTP {(int)response.StatusCode}: {response.ReasonPhrase}";
}
}
}
catch (Exception ex)
{
sw.Stop();
durationMs = sw.ElapsedMilliseconds;
errorMsg = GetDetailedErrorMessage(ex);
_log.Error($"HTTP采集异常地址={_address.Name}, 第{attempt+1}次尝试)", ex);
}
}
if (!success)
{
_consecutiveFailCount++;
FailCount++;
_log.Warn($"采集失败(地址={_address.Name}, 连续失败={_consecutiveFailCount}: {errorMsg}");
// 写入失败记录
CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, null, rawJson,
_address.Id, requestTime, durationMs, false, errorMsg, httpStatusCode);
CollectRecordWriter.RecordFailure(_businessConnStr, _address.Id, errorMsg);
// 连续失败超过阈值,触发告警
if (_consecutiveFailCount >= _config.CollectFailAlertThreshold)
{
CreateAlert(AlertType.CollectFail, null, _address.Id,
$"采集地址「{_address.Name}」连续{_consecutiveFailCount}次采集失败",
errorMsg);
}
return;
}
// 3. 解析 JSON
_consecutiveFailCount = 0;
SuccessCount++;
LastCollectTime = DateTime.Now;
try
{
ParseAndSave(rawJson, requestTime, durationMs, httpStatusCode ?? 200);
}
catch (Exception ex)
{
var detailedErr = GetDetailedErrorMessage(ex);
_log.Error($"JSON解析/入库失败(地址={_address.Name}: {detailedErr}", ex);
// 写入失败记录到日志库,便于远程诊断
CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, null, rawJson,
_address.Id, requestTime, durationMs, false, detailedErr, httpStatusCode);
}
}
/// <summary>
/// Ping每台机床的IP地址更新各自的在线状态并行执行
/// 每台机床连续Ping 4次取平均值作为延迟
/// </summary>
private void PingAllMachines()
{
try
{
// 加载此地址下所有启用的机床ID + IP地址
List<(int Id, string Ip)> machines;
using (var conn = new MySqlConnection(_businessConnStr))
{
machines = conn.Query<(int Id, string Ip)>(
"SELECT id, ip_address FROM cnc_machine WHERE collect_address_id = @AddrId AND is_enabled = 1",
new { AddrId = _address.Id }).AsList();
}
if (machines.Count == 0) return;
// 并行Ping所有机床每台4次取平均超时2秒/次)
var results = new ConcurrentDictionary<int, int>(); // machineId → 平均延迟(ms)-1表示离线
var tasks = machines.Select(m => Task.Run(() =>
{
results[m.Id] = PingHostAvg(m.Ip, 4);
})).ToArray();
Task.WaitAll(tasks, Math.Min(machines.Count * 12000, 120000));
// 按在线/离线分组
var onlineIds = results.Where(kv => kv.Value >= 0).Select(kv => kv.Key).ToList();
var offlineIds = results.Where(kv => kv.Value < 0).Select(kv => kv.Key).ToList();
using (var conn = new MySqlConnection(_businessConnStr))
{
// 只更新在线机床的 last_ping_time 和 last_ping_latency
foreach (var kv in results.Where(kv => kv.Value >= 0))
{
conn.Execute(@"UPDATE cnc_machine SET last_ping_time = NOW(), last_ping_latency = @Latency, updated_at = NOW() WHERE id = @Id",
new { Id = kv.Key, Latency = kv.Value });
}
// 离线机床:不更新 last_ping_time保留上次延迟值
if (offlineIds.Count > 0)
{
conn.Execute(@"UPDATE cnc_machine SET updated_at = NOW() WHERE id IN @Ids",
new { Ids = offlineIds });
}
}
_log.Info($"Ping完成地址={_address.Name}):在线{onlineIds.Count}台,离线{offlineIds.Count}台");
}
catch (Exception ex)
{
_log.Error($"Ping机床失败地址={_address.Name}", ex);
}
}
/// <summary>
/// Ping指定主机地址连续ping count次取平均延迟(ms)。离线返回-1
/// </summary>
private int PingHostAvg(string host, int count = 4)
{
try
{
long totalMs = 0;
int successCount = 0;
using (var ping = new Ping())
{
for (int i = 0; i < count; i++)
{
try
{
var reply = ping.Send(host, 2000);
if (reply.Status == IPStatus.Success)
{
totalMs += reply.RoundtripTime;
successCount++;
}
}
catch
{
// 单次失败忽略
}
}
}
return successCount > 0 ? (int)(totalMs / successCount) : -1;
}
catch (Exception ex)
{
_log.Debug($"Ping失败主机={host}: {ex.Message}");
return -1;
}
}
/// <summary>
/// 解析采集到的 JSON 数据并写入数据库
/// </summary>
private void ParseAndSave(string rawJson, DateTime requestTime, long durationMs, int statusCode = 200)
{
var collectTime = DateTime.Now;
// 1. 加载品牌配置和字段映射
Brand brand = null;
List<BrandFieldMapping> mappings = null;
List<Machine> machines = null;
using (var conn = new MySqlConnection(_businessConnStr))
{
brand = conn.QueryFirstOrDefault<Brand>(
"SELECT id as Id, brand_name as BrandName, device_field as DeviceField, tags_path as TagsPath, is_enabled as IsEnabled, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_brand WHERE id = @BrandId",
new { BrandId = _address.BrandId });
if (brand != null)
{
mappings = conn.Query<BrandFieldMapping>(
"SELECT id as Id, brand_id as BrandId, standard_field as StandardField, field_name as FieldName, match_by as MatchBy, data_type as DataType, is_required as IsRequired, created_at as CreatedAt FROM cnc_brand_field_mapping WHERE brand_id = @BrandId",
new { BrandId = brand.Id }).AsList();
}
// 加载此地址下的机床列表
machines = conn.Query<Machine>(
"SELECT id as Id, device_code as DeviceCode, name as Name, workshop_id as WorkshopId, collect_address_id as CollectAddressId, ip_address as IpAddress, brand_id as BrandId, is_enabled as IsEnabled, last_ping_time as LastPingTime, last_collect_time as LastCollectTime, last_device_status as LastDeviceStatus, last_run_status as LastRunStatus, last_program_name as LastProgramName, last_part_count as LastPartCount, last_operate_mode as LastOperateMode, last_machining_status as LastMachiningStatus, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_machine WHERE collect_address_id = @AddrId AND is_enabled = 1",
new { AddrId = _address.Id }).AsList();
}
if (brand == null || mappings == null || mappings.Count == 0)
{
_log.Warn($"品牌配置或字段映射缺失(地址={_address.Name}, brandId={_address.BrandId}");
return;
}
// 2. 构建 device_code → machine 的查找字典
var machineDict = new Dictionary<string, Machine>(StringComparer.OrdinalIgnoreCase);
if (machines != null)
{
foreach (var m in machines)
{
if (!string.IsNullOrEmpty(m.DeviceCode))
machineDict[m.DeviceCode] = m;
}
}
// 3. 解析 JSON 数组
JArray devices;
try
{
devices = JArray.Parse(rawJson);
}
catch (Exception ex)
{
var detailedErr = GetDetailedErrorMessage(ex);
_log.Error($"JSON解析失败地址={_address.Name}: {detailedErr}", ex);
// 写入失败记录到日志库,便于远程诊断
CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, null, rawJson,
_address.Id, requestTime, durationMs, false, detailedErr, statusCode);
return;
}
var records = new List<CollectRecord>();
foreach (var deviceToken in devices)
{
var deviceObj = deviceToken as JObject;
if (deviceObj == null) continue;
// 提取 device 字段值
string deviceCode = DataParser.ExtractDeviceCode(deviceObj, brand.DeviceField ?? "device");
if (string.IsNullOrEmpty(deviceCode))
{
_log.Warn("发现无device字段的设备数据跳过");
continue;
}
// 匹配机床
Machine machine;
if (!machineDict.TryGetValue(deviceCode, out machine))
{
// 未知设备 → 记录告警
_log.Warn($"未知设备: device={deviceCode}(地址={_address.Name}");
CreateAlert(AlertType.UnknownDevice, null, _address.Id,
$"发现未知设备: {deviceCode}",
$"采集地址「{_address.Name}」返回了未在机床表中配置的设备编码: {deviceCode}");
continue;
}
// 解析字段
var parsed = DataParser.ParseDevice(deviceObj, brand, mappings);
// 构建 CollectRecord仅包含实际FANUC数据中存在的字段
var record = new CollectRecord
{
MachineId = machine.Id,
CollectTime = collectTime,
ProgramName = GetStringValue(parsed, "program_name"),
PartCount = GetDecimalValue(parsed, "part_count"),
TotalPartCount = GetDecimalValue(parsed, "total_part_count"),
DeviceStatus = GetStringValue(parsed, "device_status"),
RunStatus = GetStringValue(parsed, "run_status"),
OperateMode = GetStringValue(parsed, "operate_mode"),
PowerOnTime = GetDecimalValue(parsed, "power_on_time"),
RunTime = GetDecimalValue(parsed, "run_time")
};
// 断电设备过滤如果program_name和part_count都被过滤为空
// 说明设备处于断电状态quality≠0跳过此设备不生成记录
if (string.IsNullOrEmpty(record.ProgramName) && !record.PartCount.HasValue)
{
_log.Debug($"设备{deviceCode}所有有效tag为空可能断电跳过记录");
continue;
}
records.Add(record);
// 产量跟踪program_name为空时不触发分段逻辑
_tracker.Track(machine.Id, record.ProgramName, record.PartCount, collectTime);
}
// 4. 批量写入
long rawLogId = CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, records, rawJson,
_address.Id, requestTime, durationMs, true, null, statusCode);
// 采集分析:将分析任务委托给 AnalysisEngine
if (rawLogId > 0 && records != null && records.Count > 0 && _analysisEngine != null)
{
_analysisEngine.AnalyzeAndRecord(rawLogId, _address.Id, _address.Name, records, machineDict, requestTime, durationMs);
}
_log.Info($"采集完成: {_address.Name} → {records.Count}台设备, {durationMs}ms");
}
/// <summary>
/// 从解析结果中获取字符串值
/// </summary>
private string GetStringValue(Dictionary<string, DataParser.ParsedField> parsed, string field)
{
DataParser.ParsedField pf;
if (parsed.TryGetValue(field, out pf))
return pf.StringValue;
return null;
}
/// <summary>
/// 从解析结果中获取数值
/// </summary>
private decimal? GetDecimalValue(Dictionary<string, DataParser.ParsedField> parsed, string field)
{
DataParser.ParsedField pf;
if (parsed.TryGetValue(field, out pf))
return pf.NumericValue;
return null;
}
/// <summary>
/// 创建告警记录
/// </summary>
private void CreateAlert(string alertType, int? machineId, int? addressId, string title, string detail)
{
try
{
using (var conn = new MySqlConnection(_businessConnStr))
{
conn.Execute(@"INSERT INTO cnc_alert (alert_type, machine_id, collect_address_id, title, detail, is_resolved, created_at)
VALUES (@AlertType, @MachineId, @AddressId, @Title, @Detail, 0, NOW())",
new { AlertType = alertType, MachineId = machineId, AddressId = addressId, Title = title, Detail = detail });
}
}
catch (Exception ex)
{
_log.Error("创建告警记录失败", ex);
}
}
}
}