You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
haoliang-net/src/CncCollector/Core/CollectorEngine.cs

320 lines
11 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Dapper;
using MySqlConnector;
using CncModels.Entity;
using CncCollector.Config;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// 采集引擎主控。
/// 负责加载采集地址、管理工作线程、心跳上报、配置轮询和日终汇总调度。
/// </summary>
public class CollectorEngine
{
private static readonly ILog _log = LogManager.GetLogger(typeof(CollectorEngine));
private readonly CollectorConfig _config;
private readonly ConcurrentDictionary<int, CollectWorker> _workers = new ConcurrentDictionary<int, CollectWorker>();
private readonly ProductionTracker _tracker;
private readonly DailySummaryJob _dailySummary;
private Timer _heartbeatTimer;
private Timer _configPollTimer;
private Timer _dailySummaryTimer;
private DateTime _startTime;
private long _totalSuccess;
private long _totalFail;
private volatile bool _isRunning;
private DateTime _lastSummaryDate = DateTime.MinValue;
/// <summary>是否运行中</summary>
public bool IsRunning => _isRunning;
/// <summary>启动时间</summary>
public DateTime StartTime => _startTime;
/// <summary>运行时长(秒)</summary>
public long UptimeSeconds => _isRunning ? (long)(DateTime.Now - _startTime).TotalSeconds : 0;
/// <summary>工作线程数量</summary>
public int WorkerCount => _workers.Count;
/// <summary>
/// 初始化采集引擎
/// </summary>
/// <param name="config">全局配置</param>
public CollectorEngine(CollectorConfig config)
{
_config = config;
_tracker = new ProductionTracker(config.BusinessConnection);
_dailySummary = new DailySummaryJob(config.BusinessConnection);
}
/// <summary>
/// 启动采集引擎:加载地址、启动工作线程、启动定时任务
/// </summary>
public void Start()
{
if (_isRunning) return;
_log.Info("===== 采集引擎启动 =====");
_startTime = DateTime.Now;
_isRunning = true;
// 1. 加载并启动采集地址
LoadAndStartWorkers();
// 2. 启动心跳上报定时器
_heartbeatTimer = new Timer(OnHeartbeat, null,
TimeSpan.FromSeconds(_config.HeartbeatIntervalSeconds),
TimeSpan.FromSeconds(_config.HeartbeatIntervalSeconds));
// 3. 启动配置轮询定时器
_configPollTimer = new Timer(OnConfigPoll, null,
TimeSpan.FromSeconds(_config.ConfigPollIntervalSeconds),
TimeSpan.FromSeconds(_config.ConfigPollIntervalSeconds));
// 4. 启动日终汇总检查定时器(每分钟检查一次)
_dailySummaryTimer = new Timer(OnDailySummaryCheck, null,
TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
_log.Info($"===== 采集引擎已启动({_workers.Count}个采集地址)=====");
}
/// <summary>
/// 停止采集引擎
/// </summary>
public void Stop()
{
if (!_isRunning) return;
_log.Info("===== 采集引擎停止中 =====");
_isRunning = false;
// 停止所有工作线程
foreach (var kvp in _workers)
{
kvp.Value.Stop();
}
_workers.Clear();
// 结账所有活跃段
_tracker.Dispose();
// 停止定时器
_heartbeatTimer?.Dispose();
_configPollTimer?.Dispose();
_dailySummaryTimer?.Dispose();
// 写入停止状态心跳
WriteHeartbeat("stopped");
_log.Info("===== 采集引擎已停止 =====");
}
/// <summary>
/// 刷新配置:重新加载采集地址,处理新增/删除/变更
/// </summary>
public void Refresh()
{
_log.Info("刷新采集配置...");
LoadAndStartWorkers();
_log.Info("采集配置刷新完成");
}
/// <summary>
/// 获取引擎状态摘要
/// </summary>
public Dictionary<string, object> GetStatus()
{
var status = new Dictionary<string, object>
{
["isRunning"] = _isRunning,
["startTime"] = _startTime.ToString("yyyy-MM-dd HH:mm:ss"),
["uptimeSeconds"] = UptimeSeconds,
["workerCount"] = _workers.Count,
["totalSuccess"] = _totalSuccess,
["totalFail"] = _totalFail
};
var workerList = new List<Dictionary<string, object>>();
foreach (var kvp in _workers)
{
workerList.Add(new Dictionary<string, object>
{
["addressId"] = kvp.Key,
["name"] = kvp.Value.AddressName,
["url"] = kvp.Value.AddressUrl,
["isRunning"] = kvp.Value.IsRunning
});
}
status["workers"] = workerList;
return status;
}
/// <summary>
/// 加载启用的采集地址并启动工作线程
/// </summary>
private void LoadAndStartWorkers()
{
try
{
List<CollectAddress> addresses;
using (var conn = new MySqlConnection(_config.BusinessConnection))
{
addresses = conn.Query<CollectAddress>(
"SELECT id as Id, name as Name, url as Url, brand_id as BrandId, collect_interval as CollectInterval, is_enabled as IsEnabled, last_collect_time as LastCollectTime, last_collect_status as LastCollectStatus, fail_count as FailCount, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_collect_address WHERE is_enabled = 1"
).AsList();
}
// 停止已删除的地址
foreach (var kvp in _workers)
{
bool exists = false;
foreach (var addr in addresses)
{
if (addr.Id == kvp.Key) { exists = true; break; }
}
if (!exists)
{
kvp.Value.Stop();
CollectWorker removed;
_workers.TryRemove(kvp.Key, out removed);
_log.Info($"已停止删除的采集地址: {kvp.Value.AddressName}");
}
}
// 启动新增的地址
foreach (var addr in addresses)
{
if (!_workers.ContainsKey(addr.Id))
{
var worker = new CollectWorker(addr, _config, _tracker,
_config.BusinessConnection, _config.LogConnection);
worker.Start();
_workers[addr.Id] = worker;
_log.Info($"已启动采集地址: {addr.Name}URL={addr.Url}, 间隔={addr.CollectInterval}秒)");
}
}
}
catch (Exception ex)
{
_log.Error("加载采集地址失败", ex);
}
}
/// <summary>
/// 心跳上报回调
/// </summary>
private void OnHeartbeat(object state)
{
try
{
// 统计成功/失败次数
long success = 0, fail = 0;
foreach (var kvp in _workers)
{
success += kvp.Value.SuccessCount;
fail += kvp.Value.FailCount;
}
_totalSuccess = success;
_totalFail = fail;
WriteHeartbeat("running");
}
catch (Exception ex)
{
_log.Error("心跳上报失败", ex);
}
}
/// <summary>
/// 写入心跳记录到 log_collector_heartbeat
/// </summary>
private void WriteHeartbeat(string status)
{
try
{
using (var conn = new MySqlConnection(_config.LogConnection))
{
conn.Execute(@"INSERT INTO log_collector_heartbeat (service_id, status, last_collect_time, success_count, fail_count, uptime_seconds, detail, created_at)
VALUES (@ServiceId, @Status, @LastCollectTime, @SuccessCount, @FailCount, @UptimeSeconds, @Detail, NOW())",
new
{
ServiceId = _config.ServiceId,
Status = status,
LastCollectTime = DateTime.Now,
SuccessCount = _totalSuccess,
FailCount = _totalFail,
UptimeSeconds = UptimeSeconds,
Detail = (string)null
});
}
}
catch (Exception ex)
{
_log.Error("写入心跳记录失败", ex);
}
}
/// <summary>
/// 配置轮询回调:检查是否有配置变更
/// </summary>
private void OnConfigPoll(object state)
{
try
{
// 重新从DB加载运行时配置
ConfigLoader.LoadRuntimeConfig(_config.BusinessConnection, _config);
// 重新加载采集地址
LoadAndStartWorkers();
}
catch (Exception ex)
{
_log.Error("配置轮询失败", ex);
}
}
/// <summary>
/// 日终汇总检查回调:检查是否到了配置的汇总时间
/// </summary>
private void OnDailySummaryCheck(object state)
{
try
{
// 解析汇总时间
TimeSpan summaryTime;
if (!TimeSpan.TryParse(_config.DailySummaryTime, out summaryTime))
{
summaryTime = new TimeSpan(1, 0, 0); // 默认01:00
}
var now = DateTime.Now;
var targetTime = new DateTime(now.Year, now.Month, now.Day, summaryTime.Hours, summaryTime.Minutes, 0);
// 检查是否到了汇总时间±1分钟内
if (Math.Abs((now - targetTime).TotalMinutes) <= 1)
{
// 汇总昨天的数据
DateTime summaryDate = now.Date.AddDays(-1);
if (_lastSummaryDate != summaryDate)
{
_lastSummaryDate = summaryDate;
_dailySummary.Execute(summaryDate);
}
}
}
catch (Exception ex)
{
_log.Error("日终汇总检查失败", ex);
}
}
}
}