using System; using System.Collections.Generic; using System.Linq; using System.Threading; using Dapper; using MySql.Data.MySqlClient; using log4net; using CncModels.Entity; using CncModels.Enum; namespace CncCollector.Core { /// /// 采集引擎:负责加载采集地址、创建工作线程、心跳和配置轮询等核心调度。 /// public class CollectorEngine { private readonly string _connectionString; private readonly CollectorConfig _config; private readonly List _workers = new List(); private readonly ProductionTracker _tracker; private readonly DailySummaryJob _dailyJob; private readonly ILog _log = LogManager.GetLogger(typeof(CollectorEngine)); private Thread _pollThread; private volatile bool _running; public CollectorEngine(string connectionString, CollectorConfig config) { _connectionString = connectionString; _config = config; _tracker = new ProductionTracker(connectionString); _dailyJob = new DailySummaryJob(connectionString); } public void Start() { _log.Info("CollectorEngine starting..."); _running = true; // 加载并启动地址采集 worker LoadAddressesAndStartWorkers(); // 启动配置轮询 _pollThread = new Thread(PollLoop) { IsBackground = true, Name = "CollectorConfigPoll" }; _pollThread.Start(); // 简化心跳机制:直接输出日志 } public void Stop() { _log.Info("CollectorEngine stopping..."); _running = false; foreach (var w in _workers) w.Stop(); _workers.Clear(); } public string Status => _running ? "Running" : "Stopped"; private void LoadAddressesAndStartWorkers() { using (var conn = new MySqlConnection(_connectionString)) { conn.Open(); var addresses = conn.Query("SELECT * FROM cnc_collect_address WHERE is_enabled=1"); foreach (var addr in addresses) { var w = new CollectWorker(addr, _connectionString, _tracker, _config.ApiKey); w.Start(); _workers.Add(w); } } } private void PollLoop() { while (_running) { try { // 轮询配置变更(简化实现) Thread.Sleep(30000); } catch { } } } /// 重新加载地址配置并重启工作线程 public void Refresh() { Stop(); Start(); } } }