You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
haoliang-net/Core/CollectorEngine.cs

93 lines
2.9 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Dapper;
using MySql.Data.MySqlClient;
using log4net;
using CncModels.Entity;
using CncModels.Enum;
namespace CncCollector.Core
{
/// <summary>
/// 采集引擎:负责加载采集地址、创建工作线程、心跳和配置轮询等核心调度。
/// </summary>
public class CollectorEngine
{
private readonly string _connectionString;
private readonly CollectorConfig _config;
private readonly List<CollectWorker> _workers = new List<CollectWorker>();
private readonly ProductionTracker _tracker;
private readonly DailySummaryJob _dailyJob;
private readonly ILog _log = LogManager.GetLogger(typeof(CollectorEngine));
private Thread _pollThread;
private volatile bool _running;
public CollectorEngine(string connectionString, CollectorConfig config)
{
_connectionString = connectionString;
_config = config;
_tracker = new ProductionTracker(connectionString);
_dailyJob = new DailySummaryJob(connectionString);
}
public void Start()
{
_log.Info("CollectorEngine starting...");
_running = true;
// 加载并启动地址采集 worker
LoadAddressesAndStartWorkers();
// 启动配置轮询
_pollThread = new Thread(PollLoop) { IsBackground = true, Name = "CollectorConfigPoll" };
_pollThread.Start();
// 简化心跳机制:直接输出日志
}
public void Stop()
{
_log.Info("CollectorEngine stopping...");
_running = false;
foreach (var w in _workers) w.Stop();
_workers.Clear();
}
public string Status => _running ? "Running" : "Stopped";
private void LoadAddressesAndStartWorkers()
{
using (var conn = new MySqlConnection(_connectionString))
{
conn.Open();
var addresses = conn.Query<CollectAddress>("SELECT * FROM cnc_collect_address WHERE is_enabled=1");
foreach (var addr in addresses)
{
var w = new CollectWorker(addr, _connectionString, _tracker, _config.ApiKey);
w.Start();
_workers.Add(w);
}
}
}
private void PollLoop()
{
while (_running)
{
try
{
// 轮询配置变更(简化实现)
Thread.Sleep(30000);
}
catch { }
}
}
/// <summary>重新加载地址配置并重启工作线程</summary>
public void Refresh()
{
Stop();
Start();
}
}
}