You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
haoliang-net/Core/CollectWorker.cs

125 lines
5.1 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.NetworkInformation;
using System.Threading;
using System.Threading.Tasks;
using MySql.Data.MySqlClient;
using log4net;
using CncModels.Entity;
using Newtonsoft.Json;
using System.Linq;
namespace CncCollector.Core
{
/// <summary>
/// 单个采集地址工作线程:负责定时抓取、解析、入库及健康状态上报。
/// </summary>
public class CollectWorker
{
private readonly CollectAddress _address;
private readonly string _connectionString;
private readonly ProductionTracker _tracker;
private readonly string _apiKey;
private readonly ILog _log = LogManager.GetLogger(typeof(CollectWorker));
private Thread _thread;
private volatile bool _stop;
public CollectWorker(CollectAddress address, string connectionString, ProductionTracker tracker, string apiKey)
{
_address = address ?? throw new ArgumentNullException(nameof(address));
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
_tracker = tracker ?? throw new ArgumentNullException(nameof(tracker));
_apiKey = apiKey;
}
public void Start()
{
_stop = false;
_thread = new Thread(Run)
{
IsBackground = true,
Name = $"CollectWorker-{_address?.Id ?? 0}"
};
_thread.Start();
}
public void Stop()
{
_stop = true;
_thread?.Join();
}
private void Run()
{
while (!_stop)
{
try
{
// 1) Ping 测试连通性
if (!string.IsNullOrWhiteSpace(_address?.Url))
{
try
{
var host = new Uri(_address.Url).Host;
var ping = new Ping();
var reply = ping.Send(host, 1000);
// 写入简要在线状态
// 实际实现:更新 cnc_machine.is_online 等字段
}
catch { /* 忽略 Ping 失败带来的异常 */ }
}
// 2) HTTP GET 采集
if (!string.IsNullOrWhiteSpace(_address?.Url))
{
using (var http = new HttpClient { Timeout = TimeSpan.FromSeconds(30) })
{
var resp = http.GetAsync(_address.Url).Result;
if (resp.IsSuccessStatusCode)
{
var json = resp.Content.ReadAsStringAsync().Result;
// 解析(品牌信息在实际实现中注入)
var parsed = Core.DataParser.Parse(_address.BrandName, json, null);
if (parsed != null && parsed.Count > 0)
{
// 将解析后的字段映射为 CollectRecord简化实现创建空记录集合以便调用写入
var recs = new List<CollectRecord>();
// 实际实现应根据 parsed 构造 CollectRecord 对象
if (recs.Count > 0)
{
CollectRecordWriter.WriteBatch(_connectionString, recs, json, _address.Id);
}
// 产量跟踪(简化实现:尝试从第一个字段的值计算产量)
int produced = 0;
var first = parsed.Values.FirstOrDefault();
if (first != null && first.Value != null)
{
if (first.Value is int iv) produced = iv;
else if (first.Value is decimal dv) produced = (int)dv;
else if (first.Value is long lv) produced = (int)lv;
}
_tracker?.Track(_address.MachineId, parsed.Values.Select(v => v.FieldName).FirstOrDefault() ?? string.Empty, produced, DateTime.Now);
}
}
}
}
}
catch (Exception ex)
{
_log.Error("CollectWorker 异常", ex);
}
// 采集间隔,若未配置则 30 秒
var interval = 30;
try
{
if (_address != null && _address.CollectInterval > 0) interval = _address.CollectInterval;
}
catch { }
Thread.Sleep(interval * 1000);
}
}
}
}