From 6fd1d616ac5640a3f51872b8b1cdd75ed8216fbe Mon Sep 17 00:00:00 2001 From: haoliang <821644@qq.com> Date: Fri, 1 May 2026 12:22:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9ECncCollector=E9=87=87?= =?UTF-8?q?=E9=9B=86=E6=9C=8D=E5=8A=A1=EF=BC=88=E9=85=8D=E7=BD=AE=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD+JSON=E8=A7=A3=E6=9E=90+=E5=AD=97=E6=AE=B5=E6=98=A0?= =?UTF-8?q?=E5=B0=84+HTTP=E9=87=87=E9=9B=86+=E4=BA=A7=E9=87=8F=E8=B7=9F?= =?UTF-8?q?=E8=B8=AA+=E6=97=A5=E7=BB=88=E6=B1=87=E6=80=BB+=E5=BF=83?= =?UTF-8?q?=E8=B7=B3+=E7=AE=A1=E7=90=86API=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CncDataSystem.sln | 11 +- src/CncCollector/Api/CollectorApiServer.cs | 163 +++++++ src/CncCollector/CncCollector.csproj | 51 +++ src/CncCollector/Config/CollectorConfig.cs | 79 ++++ src/CncCollector/Config/ConfigLoader.cs | 77 ++++ src/CncCollector/Core/CollectRecordWriter.cs | 186 ++++++++ src/CncCollector/Core/CollectWorker.cs | 427 +++++++++++++++++++ src/CncCollector/Core/CollectorEngine.cs | 318 ++++++++++++++ src/CncCollector/Core/DailySummaryJob.cs | 134 ++++++ src/CncCollector/Core/DataParser.cs | 124 ++++++ src/CncCollector/Core/ProductionTracker.cs | 260 +++++++++++ src/CncCollector/Program.cs | 87 ++++ src/CncCollector/Properties/AssemblyInfo.cs | 18 + src/CncCollector/collector.json | 10 + src/CncCollector/log4net.config | 24 ++ 15 files changed, 1968 insertions(+), 1 deletion(-) create mode 100644 src/CncCollector/Api/CollectorApiServer.cs create mode 100644 src/CncCollector/CncCollector.csproj create mode 100644 src/CncCollector/Config/CollectorConfig.cs create mode 100644 src/CncCollector/Config/ConfigLoader.cs create mode 100644 src/CncCollector/Core/CollectRecordWriter.cs create mode 100644 src/CncCollector/Core/CollectWorker.cs create mode 100644 src/CncCollector/Core/CollectorEngine.cs create mode 100644 src/CncCollector/Core/DailySummaryJob.cs create mode 100644 src/CncCollector/Core/DataParser.cs create mode 100644 src/CncCollector/Core/ProductionTracker.cs create mode 100644 src/CncCollector/Program.cs create mode 100644 src/CncCollector/Properties/AssemblyInfo.cs create mode 100644 src/CncCollector/collector.json create mode 100644 src/CncCollector/log4net.config diff --git a/CncDataSystem.sln b/CncDataSystem.sln index 8e5989b..f6ddeb3 100644 --- a/CncDataSystem.sln +++ b/CncDataSystem.sln @@ -1,4 +1,4 @@ - + Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17.VisualStudioVersion = 17.0.31903.59 MinimumVisualStudioVersion = 10.0.40219.1 @@ -25,6 +25,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "frontend", "frontend", "{D0 frontend\tsconfig.json = frontend\tsconfig.json EndProjectSection EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{331E2BF6-5FA9-42A8-9170-3B8FEE1E8C2B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CncCollector", "src\CncCollector\CncCollector.csproj", "{66697FD0-07FB-4237-B119-B645470CEB04}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -63,10 +67,15 @@ Global {4DAE1DA8-E028-4025-AA80-1E698976F74C}.Debug|Any CPU.Build.0 = Debug|Any CPU {4DAE1DA8-E028-4025-AA80-1E698976F74C}.Release|Any CPU.ActiveCfg = Release|Any CPU {4DAE1DA8-E028-4025-AA80-1E698976F74C}.Release|Any CPU.Build.0 = Release|Any CPU + {66697FD0-07FB-4237-B119-B645470CEB04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {66697FD0-07FB-4237-B119-B645470CEB04}.Debug|Any CPU.Build.0 = Debug|Any CPU + {66697FD0-07FB-4237-B119-B645470CEB04}.Release|Any CPU.ActiveCfg = Release|Any CPU + {66697FD0-07FB-4237-B119-B645470CEB04}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution + {66697FD0-07FB-4237-B119-B645470CEB04} = {331E2BF6-5FA9-42A8-9170-3B8FEE1E8C2B} EndGlobalSection EndGlobal diff --git a/src/CncCollector/Api/CollectorApiServer.cs b/src/CncCollector/Api/CollectorApiServer.cs new file mode 100644 index 0000000..7cb5be4 --- /dev/null +++ b/src/CncCollector/Api/CollectorApiServer.cs @@ -0,0 +1,163 @@ +using System; +using System.Net; +using System.Text; +using System.Threading; +using Newtonsoft.Json; +using CncCollector.Core; +using log4net; + +namespace CncCollector.Api +{ + /// + /// 采集服务管理API HTTP服务。 + /// 使用 HttpListener 提供轻量管理接口,支持启停控制和状态查询。 + /// + public class CollectorApiServer + { + private static readonly ILog _log = LogManager.GetLogger(typeof(CollectorApiServer)); + + private readonly CollectorEngine _engine; + private readonly string _apiKey; + private readonly int _port; + private HttpListener _listener; + private Thread _listenerThread; + private volatile bool _running; + + /// + /// 初始化管理API服务 + /// + /// 采集引擎实例 + /// API Key(用于认证) + /// 监听端口 + public CollectorApiServer(CollectorEngine engine, string apiKey, int port) + { + _engine = engine; + _apiKey = apiKey; + _port = port; + } + + /// + /// 启动HTTP监听 + /// + public void Start() + { + if (_running) return; + + try + { + _listener = new HttpListener(); + _listener.Prefixes.Add($"http://+:{_port}/api/collector/"); + _listener.Start(); + _running = true; + + _listenerThread = new Thread(() => + { + while (_running) + { + try + { + var context = _listener.GetContext(); + ThreadPool.QueueUserWorkItem(_ => ProcessRequest(context)); + } + catch (HttpListenerException) { break; } + catch (ObjectDisposedException) { break; } + } + }) { IsBackground = true }; + _listenerThread.Start(); + + _log.Info($"管理API已启动: http://localhost:{_port}/api/collector/"); + } + catch (Exception ex) + { + _log.Error($"管理API启动失败(端口={_port})", ex); + } + } + + /// + /// 停止HTTP监听 + /// + public void Stop() + { + _running = false; + try { _listener?.Stop(); } catch { } + _log.Info("管理API已停止"); + } + + /// + /// 处理单个HTTP请求 + /// + private void ProcessRequest(HttpListenerContext ctx) + { + string path = ctx.Request.Url.AbsolutePath.TrimEnd('/').ToLower(); + string method = ctx.Request.HttpMethod; + + try + { + // API Key 认证 + string apiKeyHeader = ctx.Request.Headers["X-Api-Key"]; + if (string.IsNullOrEmpty(apiKeyHeader) || apiKeyHeader != _apiKey) + { + SendJson(ctx, 401, new { code = 40101, message = "API Key无效", data = (object)null }); + return; + } + + // 路由 + if (path == "/api/collector/status" && method == "GET") + { + var status = _engine.GetStatus(); + SendSuccess(ctx, status); + } + else if (path == "/api/collector/start" && method == "POST") + { + _engine.Start(); + SendSuccess(ctx, new { message = "采集服务已启动" }); + } + else if (path == "/api/collector/stop" && method == "POST") + { + _engine.Stop(); + SendSuccess(ctx, new { message = "采集服务已停止" }); + } + else if (path == "/api/collector/refresh" && method == "POST") + { + _engine.Refresh(); + SendSuccess(ctx, new { message = "配置已刷新" }); + } + else + { + SendJson(ctx, 404, new { code = 40400, message = "未知端点", data = (object)null }); + } + } + catch (Exception ex) + { + _log.Error("处理API请求异常", ex); + SendJson(ctx, 500, new { code = 50001, message = ex.Message, data = (object)null }); + } + } + + /// + /// 发送成功响应 + /// + private void SendSuccess(HttpListenerContext ctx, object data) + { + SendJson(ctx, 200, new { code = 0, message = "success", data }); + } + + /// + /// 发送JSON响应 + /// + private void SendJson(HttpListenerContext ctx, int statusCode, object obj) + { + try + { + string json = JsonConvert.SerializeObject(obj); + byte[] bytes = Encoding.UTF8.GetBytes(json); + ctx.Response.StatusCode = statusCode; + ctx.Response.ContentType = "application/json; charset=utf-8"; + ctx.Response.ContentLength64 = bytes.Length; + ctx.Response.OutputStream.Write(bytes, 0, bytes.Length); + ctx.Response.OutputStream.Close(); + } + catch { } + } + } +} diff --git a/src/CncCollector/CncCollector.csproj b/src/CncCollector/CncCollector.csproj new file mode 100644 index 0000000..33d3278 --- /dev/null +++ b/src/CncCollector/CncCollector.csproj @@ -0,0 +1,51 @@ + + + + net472 + x64 + CncCollector + CncCollector + true + false + + Exe + bin\ + false + false + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + PreserveNewest + + + PreserveNewest + + + + diff --git a/src/CncCollector/Config/CollectorConfig.cs b/src/CncCollector/Config/CollectorConfig.cs new file mode 100644 index 0000000..0e477f5 --- /dev/null +++ b/src/CncCollector/Config/CollectorConfig.cs @@ -0,0 +1,79 @@ +using System; +using System.IO; +using Newtonsoft.Json; + +namespace CncCollector.Config +{ + /// + /// 采集服务配置(从 collector.json 读取) + /// + public class CollectorConfig + { + /// 业务库连接字符串 + [JsonProperty("businessConnection")] + public string BusinessConnection { get; set; } + + /// 日志库连接字符串 + [JsonProperty("logConnection")] + public string LogConnection { get; set; } + + /// 管理API端口 + [JsonProperty("apiPort")] + public int ApiPort { get; set; } = 5800; + + /// 服务间通信API Key + [JsonProperty("apiKey")] + public string ApiKey { get; set; } = "collector_api_key_2026"; + + /// 心跳上报间隔(秒) + [JsonProperty("heartbeatIntervalSeconds")] + public int HeartbeatIntervalSeconds { get; set; } = 10; + + /// 配置轮询间隔(秒) + [JsonProperty("configPollIntervalSeconds")] + public int ConfigPollIntervalSeconds { get; set; } = 30; + + /// 日终汇总执行时间(HH:mm格式) + [JsonProperty("dailySummaryTime")] + public string DailySummaryTime { get; set; } = "01:00"; + + /// 服务ID标识 + [JsonProperty("serviceId")] + public string ServiceId { get; set; } = "CncCollector"; + + // ===== 以下为从DB加载的运行时配置 ===== + + /// 采集失败重试次数(默认3) + [JsonIgnore] + public int CollectRetryCount { get; set; } = 3; + + /// 采集重试间隔秒数(默认30) + [JsonIgnore] + public int CollectRetryIntervalSeconds { get; set; } = 30; + + /// 连续失败N次触发告警(默认5) + [JsonIgnore] + public int CollectFailAlertThreshold { get; set; } = 5; + + /// + /// 从文件加载配置 + /// + public static CollectorConfig Load() + { + // 优先从运行目录读取 + string configPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "collector.json"); + if (!File.Exists(configPath)) + { + // 开发模式:从项目根目录读取 + configPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "..", "..", "..", "collector.json"); + } + if (!File.Exists(configPath)) + { + throw new FileNotFoundException("找不到配置文件 collector.json"); + } + + string json = File.ReadAllText(configPath); + return JsonConvert.DeserializeObject(json); + } + } +} diff --git a/src/CncCollector/Config/ConfigLoader.cs b/src/CncCollector/Config/ConfigLoader.cs new file mode 100644 index 0000000..0853bb7 --- /dev/null +++ b/src/CncCollector/Config/ConfigLoader.cs @@ -0,0 +1,77 @@ +using System; +using System.Collections.Generic; +using Dapper; +using MySqlConnector; +using log4net; + +namespace CncCollector.Config +{ + /// + /// 从 cnc_sys_config 表加载运行时配置,覆盖 CollectorConfig 的默认值 + /// + public static class ConfigLoader + { + private static readonly ILog _log = LogManager.GetLogger(typeof(ConfigLoader)); + + /// + /// 从数据库加载配置并覆盖 CollectorConfig 中的默认值 + /// + /// 业务库连接字符串 + /// 要覆盖的配置对象 + public static void LoadRuntimeConfig(string connectionString, CollectorConfig config) + { + try + { + using (var conn = new MySqlConnection(connectionString)) + { + conn.Open(); + var rows = conn.Query>( + "SELECT config_key as `Key`, config_value as `Value` FROM cnc_sys_config"); + + foreach (var row in rows) + { + string key = row.Key; + string val = row.Value ?? ""; + + switch (key) + { + case "collector_api_port": + if (int.TryParse(val, out var port)) config.ApiPort = port; + break; + case "collector_api_key": + if (!string.IsNullOrEmpty(val)) config.ApiKey = val; + break; + case "heartbeat_interval": + if (int.TryParse(val, out var hb)) config.HeartbeatIntervalSeconds = hb; + break; + case "config_poll_interval": + if (int.TryParse(val, out var cp)) config.ConfigPollIntervalSeconds = cp; + break; + case "daily_summary_time": + if (!string.IsNullOrEmpty(val)) config.DailySummaryTime = val; + break; + case "collect_retry_count": + if (int.TryParse(val, out var rc)) config.CollectRetryCount = rc; + break; + case "collect_retry_interval": + if (int.TryParse(val, out var ri)) config.CollectRetryIntervalSeconds = ri; + break; + case "collect_fail_alert_threshold": + if (int.TryParse(val, out var ft)) config.CollectFailAlertThreshold = ft; + break; + } + } + } + + _log.Info($"数据库配置加载完成: ApiPort={config.ApiPort}, Heartbeat={config.HeartbeatIntervalSeconds}s, " + + $"ConfigPoll={config.ConfigPollIntervalSeconds}s, SummaryTime={config.DailySummaryTime}, " + + $"RetryCount={config.CollectRetryCount}, RetryInterval={config.CollectRetryIntervalSeconds}s, " + + $"FailThreshold={config.CollectFailAlertThreshold}"); + } + catch (Exception ex) + { + _log.Error("从数据库加载配置失败,将使用默认值", ex); + } + } + } +} diff --git a/src/CncCollector/Core/CollectRecordWriter.cs b/src/CncCollector/Core/CollectRecordWriter.cs new file mode 100644 index 0000000..d30f20e --- /dev/null +++ b/src/CncCollector/Core/CollectRecordWriter.cs @@ -0,0 +1,186 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Dapper; +using MySqlConnector; +using CncModels.Entity; +using log4net; + +namespace CncCollector.Core +{ + /// + /// 采集数据批量写入器。 + /// 负责写入采集结构化记录、原始JSON日志,更新机床和地址实时状态。 + /// + public class CollectRecordWriter + { + private static readonly ILog _log = LogManager.GetLogger(typeof(CollectRecordWriter)); + + /// + /// 批量写入采集记录并更新相关状态 + /// + /// 业务库连接字符串 + /// 日志库连接字符串 + /// 采集记录列表 + /// 原始JSON字符串 + /// 采集地址ID + /// 请求开始时间 + /// 响应耗时(毫秒) + /// 是否采集成功 + /// 错误信息(失败时) + public static void WriteBatch(string businessConnStr, string logConnStr, + List records, string rawJson, int collectAddressId, + DateTime requestTime, long? responseDurationMs, bool isSuccess, string errorMessage) + { + var now = DateTime.Now; + + // 1. 写入原始JSON到日志库 + try + { + using (var conn = new MySqlConnection(logConnStr)) + { + conn.Open(); + conn.Execute(@"INSERT INTO log_collect_raw (collect_address_id, request_time, response_time, response_duration, is_success, status_code, raw_json, error_message, created_at) + VALUES (@CollectAddressId, @RequestTime, @ResponseTime, @ResponseDuration, @IsSuccess, @StatusCode, @RawJson, @ErrorMessage, @CreatedAt)", + new + { + CollectAddressId = collectAddressId, + RequestTime = requestTime, + ResponseTime = now, + ResponseDuration = responseDurationMs, + IsSuccess = isSuccess ? 1 : 0, + StatusCode = isSuccess ? (int?)200 : null, + RawJson = rawJson ?? "", + ErrorMessage = errorMessage ?? (string)null, + CreatedAt = now + }); + } + } + catch (Exception ex) + { + _log.Error($"写入原始JSON日志失败(地址ID={collectAddressId})", ex); + } + + if (!isSuccess || records == null || records.Count == 0) return; + + // 2. 批量写入采集结构化记录到业务库 + try + { + using (var conn = new MySqlConnection(businessConnStr)) + { + conn.Open(); + using (var tran = conn.BeginTransaction()) + { + try + { + foreach (var r in records) + { + conn.Execute(@"INSERT INTO cnc_collect_record (machine_id, collect_time, device_time, program_name, part_count, + device_status, run_status, operate_mode, spindle_speed_set, feed_speed_set, + spindle_speed_actual, feed_speed_actual, spindle_load, spindle_override, + power_on_time, run_time, cutting_time, cycle_time, machining_status, extra_data, created_at) + VALUES (@MachineId, @CollectTime, @DeviceTime, @ProgramName, @PartCount, + @DeviceStatus, @RunStatus, @OperateMode, @SpindleSpeedSet, @FeedSpeedSet, + @SpindleSpeedActual, @FeedSpeedActual, @SpindleLoad, @SpindleOverride, + @PowerOnTime, @RunTime, @CuttingTime, @CycleTime, @MachiningStatus, @ExtraData, @CreatedAt)", + new + { + r.MachineId, + CollectTime = r.CollectTime, + DeviceTime = r.DeviceTime, + ProgramName = r.ProgramName ?? (string)null, + PartCount = r.PartCount, + DeviceStatus = r.DeviceStatus ?? (string)null, + RunStatus = r.RunStatus ?? (string)null, + OperateMode = r.OperateMode ?? (string)null, + SpindleSpeedSet = r.SpindleSpeedSet, + FeedSpeedSet = r.FeedSpeedSet, + SpindleSpeedActual = r.SpindleSpeedActual, + FeedSpeedActual = r.FeedSpeedActual, + SpindleLoad = r.SpindleLoad, + SpindleOverride = r.SpindleOverride, + PowerOnTime = r.PowerOnTime, + RunTime = r.RunTime, + CuttingTime = r.CuttingTime, + CycleTime = r.CycleTime, + MachiningStatus = r.MachiningStatus ?? (string)null, + ExtraData = r.ExtraData ?? (string)null, + CreatedAt = now + }, tran); + } + tran.Commit(); + } + catch + { + tran.Rollback(); + throw; + } + } + + // 3. 更新每台机床的实时状态字段 + foreach (var r in records) + { + try + { + conn.Execute(@"UPDATE cnc_machine SET last_collect_time = @CollectTime, + last_device_status = @DeviceStatus, last_run_status = @RunStatus, + last_program_name = @ProgramName, last_part_count = @PartCount, + last_operate_mode = @OperateMode, last_machining_status = @MachiningStatus + WHERE id = @MachineId", + new + { + r.MachineId, + r.CollectTime, + DeviceStatus = r.DeviceStatus ?? (string)null, + RunStatus = r.RunStatus ?? (string)null, + ProgramName = r.ProgramName ?? (string)null, + r.PartCount, + OperateMode = r.OperateMode ?? (string)null, + MachiningStatus = r.MachiningStatus ?? (string)null + }); + } + catch (Exception ex) + { + _log.Error($"更新机床实时状态失败(machine_id={r.MachineId})", ex); + } + } + + // 4. 更新采集地址状态 + try + { + conn.Execute(@"UPDATE cnc_collect_address SET last_collect_time = @Time, last_collect_status = 'success', fail_count = 0, updated_at = NOW() + WHERE id = @Id", + new { Time = now, Id = collectAddressId }); + } + catch (Exception ex) + { + _log.Error($"更新采集地址状态失败(address_id={collectAddressId})", ex); + } + } + } + catch (Exception ex) + { + _log.Error($"批量写入采集记录失败(地址ID={collectAddressId})", ex); + } + } + + /// + /// 记录采集失败:更新采集地址的失败计数和状态 + /// + public static void RecordFailure(string businessConnStr, int collectAddressId, string errorMsg) + { + try + { + using (var conn = new MySqlConnection(businessConnStr)) + { + conn.Execute(@"UPDATE cnc_collect_address SET last_collect_status = 'fail', fail_count = fail_count + 1, updated_at = NOW() WHERE id = @Id", + new { Id = collectAddressId }); + } + } + catch (Exception ex) + { + _log.Error($"记录采集失败状态时出错(address_id={collectAddressId})", ex); + } + } + } +} diff --git a/src/CncCollector/Core/CollectWorker.cs b/src/CncCollector/Core/CollectWorker.cs new file mode 100644 index 0000000..8ddba03 --- /dev/null +++ b/src/CncCollector/Core/CollectWorker.cs @@ -0,0 +1,427 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net; +using System.Net.Http; +using System.Net.NetworkInformation; +using System.Threading; +using Dapper; +using MySqlConnector; +using Newtonsoft.Json.Linq; +using CncModels.Entity; +using CncModels.Enum; +using CncCollector.Config; +using log4net; + +namespace CncCollector.Core +{ + /// + /// 单个采集地址的工作线程。 + /// 循环执行:Ping检测 → HTTP采集 → JSON解析 → 数据入库 → 产量跟踪。 + /// + public class CollectWorker + { + private static readonly ILog _log = LogManager.GetLogger(typeof(CollectWorker)); + + private readonly CollectAddress _address; + private readonly CollectorConfig _config; + private readonly ProductionTracker _tracker; + private readonly string _businessConnStr; + private readonly string _logConnStr; + private Thread _thread; + private volatile bool _running; + private int _consecutiveFailCount; + + /// 采集地址ID + public int AddressId => _address.Id; + + /// 采集地址名称 + public string AddressName => _address.Name; + + /// 是否运行中 + public bool IsRunning => _running; + + /// 最后采集时间 + public DateTime? LastCollectTime { get; private set; } + + /// 成功次数 + public long SuccessCount { get; private set; } + + /// 失败次数 + public long FailCount { get; private set; } + + /// + /// 初始化采集工作线程 + /// + /// 采集地址配置 + /// 全局配置 + /// 产量跟踪器 + /// 业务库连接字符串 + /// 日志库连接字符串 + public CollectWorker(CollectAddress address, CollectorConfig config, ProductionTracker tracker, + string businessConnStr, string logConnStr) + { + _address = address; + _config = config; + _tracker = tracker; + _businessConnStr = businessConnStr; + _logConnStr = logConnStr; + } + + /// + /// 启动工作线程 + /// + public void Start() + { + if (_running) return; + _running = true; + _thread = new Thread(Run) { IsBackground = true, Name = $"Collector-{_address.Id}" }; + _thread.Start(); + _log.Info($"采集工作线程已启动: {_address.Name}(间隔={_address.CollectInterval}秒)"); + } + + /// + /// 停止工作线程 + /// + public void Stop() + { + _running = false; + _log.Info($"采集工作线程已停止: {_address.Name}"); + } + + /// + /// 工作线程主循环 + /// + private void Run() + { + while (_running) + { + try + { + DoCollectCycle(); + } + catch (Exception ex) + { + _log.Error($"采集循环异常(地址={_address.Name})", ex); + } + + // 等待下一次采集 + int intervalMs = _address.CollectInterval * 1000; + for (int i = 0; i < intervalMs / 100 && _running; i++) + { + Thread.Sleep(100); + } + } + } + + /// + /// 执行一次完整的采集周期:Ping → HTTP采集 → 解析 → 入库 + /// + private void DoCollectCycle() + { + var requestTime = DateTime.Now; + + // 1. Ping 检测 + if (!PingAddress()) + { + // Ping 失败,更新机床离线状态 + UpdateMachineOnlineStatus(false); + return; + } + + // Ping 成功,更新机床在线状态 + UpdateMachineOnlineStatus(true); + + // 2. HTTP 采集(含重试) + string rawJson = null; + bool success = false; + string errorMsg = null; + long durationMs = 0; + int retryCount = _config.CollectRetryCount; + + for (int attempt = 0; attempt <= retryCount; attempt++) + { + if (attempt > 0) + { + _log.Warn($"采集重试第{attempt}次(地址={_address.Name})"); + Thread.Sleep(_config.CollectRetryIntervalSeconds * 1000); + } + + var sw = Stopwatch.StartNew(); + try + { + using (var client = new HttpClient { Timeout = TimeSpan.FromSeconds(30) }) + { + var response = client.GetAsync(_address.Url).Result; + sw.Stop(); + durationMs = sw.ElapsedMilliseconds; + + if (response.IsSuccessStatusCode) + { + rawJson = response.Content.ReadAsStringAsync().Result; + success = true; + break; + } + else + { + errorMsg = $"HTTP {(int)response.StatusCode}: {response.ReasonPhrase}"; + } + } + } + catch (Exception ex) + { + sw.Stop(); + durationMs = sw.ElapsedMilliseconds; + errorMsg = ex.Message; + } + } + + if (!success) + { + _consecutiveFailCount++; + FailCount++; + _log.Warn($"采集失败(地址={_address.Name}, 连续失败={_consecutiveFailCount}): {errorMsg}"); + + // 写入失败记录 + CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, null, rawJson, + _address.Id, requestTime, durationMs, false, errorMsg); + CollectRecordWriter.RecordFailure(_businessConnStr, _address.Id, errorMsg); + + // 连续失败超过阈值,触发告警 + if (_consecutiveFailCount >= _config.CollectFailAlertThreshold) + { + CreateAlert(AlertType.CollectFail, null, _address.Id, + $"采集地址「{_address.Name}」连续{_consecutiveFailCount}次采集失败", + errorMsg); + } + return; + } + + // 3. 解析 JSON + _consecutiveFailCount = 0; + SuccessCount++; + LastCollectTime = DateTime.Now; + + try + { + ParseAndSave(rawJson, requestTime, durationMs); + } + catch (Exception ex) + { + _log.Error($"JSON解析/入库失败(地址={_address.Name})", ex); + } + } + + /// + /// Ping 检测采集地址可达性 + /// + private bool PingAddress() + { + try + { + // 从 URL 提取主机名 + var uri = new Uri(_address.Url); + string host = uri.Host; + + using (var ping = new Ping()) + { + var reply = ping.Send(host, 5000); + return reply.Status == IPStatus.Success; + } + } + catch (Exception ex) + { + _log.Debug($"Ping失败(地址={_address.Name}): {ex.Message}"); + return false; + } + } + + /// + /// 更新此地址下所有机床的在线状态 + /// + private void UpdateMachineOnlineStatus(bool isOnline) + { + try + { + using (var conn = new MySqlConnection(_businessConnStr)) + { + conn.Execute(@"UPDATE cnc_machine SET is_online = @Online, last_ping_time = NOW(), updated_at = NOW() + WHERE collect_address_id = @AddrId AND is_enabled = 1", + new { Online = isOnline ? 1 : 0, AddrId = _address.Id }); + } + } + catch (Exception ex) + { + _log.Error($"更新机床在线状态失败(地址={_address.Name})", ex); + } + } + + /// + /// 解析采集到的 JSON 数据并写入数据库 + /// + private void ParseAndSave(string rawJson, DateTime requestTime, long durationMs) + { + var collectTime = DateTime.Now; + + // 1. 加载品牌配置和字段映射 + Brand brand = null; + List mappings = null; + List machines = null; + + using (var conn = new MySqlConnection(_businessConnStr)) + { + brand = conn.QueryFirstOrDefault( + "SELECT id as Id, brand_name as BrandName, device_field as DeviceField, tags_path as TagsPath, is_enabled as IsEnabled, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_brand WHERE id = @BrandId", + new { BrandId = _address.BrandId }); + + if (brand != null) + { + mappings = conn.Query( + "SELECT id as Id, brand_id as BrandId, standard_field as StandardField, field_name as FieldName, match_by as MatchBy, data_type as DataType, is_required as IsRequired, created_at as CreatedAt FROM cnc_brand_field_mapping WHERE brand_id = @BrandId", + new { BrandId = brand.Id }).AsList(); + } + + // 加载此地址下的机床列表 + machines = conn.Query( + "SELECT id as Id, device_code as DeviceCode, name as Name, workshop_id as WorkshopId, collect_address_id as CollectAddressId, ip_address as IpAddress, brand_id as BrandId, is_enabled as IsEnabled, is_online as IsOnline, last_ping_time as LastPingTime, last_collect_time as LastCollectTime, last_device_status as LastDeviceStatus, last_run_status as LastRunStatus, last_program_name as LastProgramName, last_part_count as LastPartCount, last_operate_mode as LastOperateMode, last_machining_status as LastMachiningStatus, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_machine WHERE collect_address_id = @AddrId AND is_enabled = 1", + new { AddrId = _address.Id }).AsList(); + } + + if (brand == null || mappings == null || mappings.Count == 0) + { + _log.Warn($"品牌配置或字段映射缺失(地址={_address.Name}, brandId={_address.BrandId})"); + return; + } + + // 2. 构建 device_code → machine 的查找字典 + var machineDict = new Dictionary(StringComparer.OrdinalIgnoreCase); + if (machines != null) + { + foreach (var m in machines) + { + if (!string.IsNullOrEmpty(m.DeviceCode)) + machineDict[m.DeviceCode] = m; + } + } + + // 3. 解析 JSON 数组 + JArray devices; + try + { + devices = JArray.Parse(rawJson); + } + catch (Exception ex) + { + _log.Error($"JSON解析失败(地址={_address.Name})", ex); + return; + } + + var records = new List(); + + foreach (var deviceToken in devices) + { + var deviceObj = deviceToken as JObject; + if (deviceObj == null) continue; + + // 提取 device 字段值 + string deviceCode = DataParser.ExtractDeviceCode(deviceObj, brand.DeviceField ?? "device"); + if (string.IsNullOrEmpty(deviceCode)) + { + _log.Warn("发现无device字段的设备数据,跳过"); + continue; + } + + // 匹配机床 + Machine machine; + if (!machineDict.TryGetValue(deviceCode, out machine)) + { + // 未知设备 → 记录告警 + _log.Warn($"未知设备: device={deviceCode}(地址={_address.Name})"); + CreateAlert(AlertType.UnknownDevice, null, _address.Id, + $"发现未知设备: {deviceCode}", + $"采集地址「{_address.Name}」返回了未在机床表中配置的设备编码: {deviceCode}"); + continue; + } + + // 解析字段 + var parsed = DataParser.ParseDevice(deviceObj, brand, mappings); + + // 构建 CollectRecord + var record = new CollectRecord + { + MachineId = machine.Id, + CollectTime = collectTime, + ProgramName = GetStringValue(parsed, "program_name"), + PartCount = GetDecimalValue(parsed, "part_count"), + DeviceStatus = GetStringValue(parsed, "device_status"), + RunStatus = GetStringValue(parsed, "run_status"), + OperateMode = GetStringValue(parsed, "operate_mode"), + SpindleSpeedSet = GetDecimalValue(parsed, "spindle_speed_set"), + FeedSpeedSet = GetDecimalValue(parsed, "feed_speed_set"), + SpindleSpeedActual = GetDecimalValue(parsed, "spindle_speed_actual"), + FeedSpeedActual = GetDecimalValue(parsed, "feed_speed_actual"), + SpindleLoad = GetDecimalValue(parsed, "spindle_load"), + SpindleOverride = GetDecimalValue(parsed, "spindle_override"), + PowerOnTime = GetDecimalValue(parsed, "power_on_time"), + RunTime = GetDecimalValue(parsed, "run_time"), + CuttingTime = GetDecimalValue(parsed, "cutting_time"), + CycleTime = GetDecimalValue(parsed, "cycle_time"), + MachiningStatus = GetStringValue(parsed, "machining_status") + }; + + records.Add(record); + + // 产量跟踪 + _tracker.Track(machine.Id, record.ProgramName, record.PartCount, collectTime); + } + + // 4. 批量写入 + CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, records, rawJson, + _address.Id, requestTime, durationMs, true, null); + + _log.Info($"采集完成: {_address.Name} → {records.Count}台设备, {durationMs}ms"); + } + + /// + /// 从解析结果中获取字符串值 + /// + private string GetStringValue(Dictionary parsed, string field) + { + DataParser.ParsedField pf; + if (parsed.TryGetValue(field, out pf)) + return pf.StringValue; + return null; + } + + /// + /// 从解析结果中获取数值 + /// + private decimal? GetDecimalValue(Dictionary parsed, string field) + { + DataParser.ParsedField pf; + if (parsed.TryGetValue(field, out pf)) + return pf.NumericValue; + return null; + } + + /// + /// 创建告警记录 + /// + private void CreateAlert(string alertType, int? machineId, int? addressId, string title, string detail) + { + try + { + using (var conn = new MySqlConnection(_businessConnStr)) + { + conn.Execute(@"INSERT INTO cnc_alert (alert_type, machine_id, collect_address_id, title, detail, is_resolved, created_at) + VALUES (@AlertType, @MachineId, @AddressId, @Title, @Detail, 0, NOW())", + new { AlertType = alertType, MachineId = machineId, AddressId = addressId, Title = title, Detail = detail }); + } + } + catch (Exception ex) + { + _log.Error("创建告警记录失败", ex); + } + } + } +} diff --git a/src/CncCollector/Core/CollectorEngine.cs b/src/CncCollector/Core/CollectorEngine.cs new file mode 100644 index 0000000..3072041 --- /dev/null +++ b/src/CncCollector/Core/CollectorEngine.cs @@ -0,0 +1,318 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using Dapper; +using MySqlConnector; +using CncModels.Entity; +using CncCollector.Config; +using log4net; + +namespace CncCollector.Core +{ + /// + /// 采集引擎主控。 + /// 负责加载采集地址、管理工作线程、心跳上报、配置轮询和日终汇总调度。 + /// + public class CollectorEngine + { + private static readonly ILog _log = LogManager.GetLogger(typeof(CollectorEngine)); + + private readonly CollectorConfig _config; + private readonly ConcurrentDictionary _workers = new ConcurrentDictionary(); + private readonly ProductionTracker _tracker; + private readonly DailySummaryJob _dailySummary; + private Timer _heartbeatTimer; + private Timer _configPollTimer; + private Timer _dailySummaryTimer; + private DateTime _startTime; + private long _totalSuccess; + private long _totalFail; + private volatile bool _isRunning; + private DateTime _lastSummaryDate = DateTime.MinValue; + + /// 是否运行中 + public bool IsRunning => _isRunning; + + /// 启动时间 + public DateTime StartTime => _startTime; + + /// 运行时长(秒) + public long UptimeSeconds => _isRunning ? (long)(DateTime.Now - _startTime).TotalSeconds : 0; + + /// 工作线程数量 + public int WorkerCount => _workers.Count; + + /// + /// 初始化采集引擎 + /// + /// 全局配置 + public CollectorEngine(CollectorConfig config) + { + _config = config; + _tracker = new ProductionTracker(config.BusinessConnection); + _dailySummary = new DailySummaryJob(config.BusinessConnection); + } + + /// + /// 启动采集引擎:加载地址、启动工作线程、启动定时任务 + /// + public void Start() + { + if (_isRunning) return; + + _log.Info("===== 采集引擎启动 ====="); + _startTime = DateTime.Now; + _isRunning = true; + + // 1. 加载并启动采集地址 + LoadAndStartWorkers(); + + // 2. 启动心跳上报定时器 + _heartbeatTimer = new Timer(OnHeartbeat, null, + TimeSpan.FromSeconds(_config.HeartbeatIntervalSeconds), + TimeSpan.FromSeconds(_config.HeartbeatIntervalSeconds)); + + // 3. 启动配置轮询定时器 + _configPollTimer = new Timer(OnConfigPoll, null, + TimeSpan.FromSeconds(_config.ConfigPollIntervalSeconds), + TimeSpan.FromSeconds(_config.ConfigPollIntervalSeconds)); + + // 4. 启动日终汇总检查定时器(每分钟检查一次) + _dailySummaryTimer = new Timer(OnDailySummaryCheck, null, + TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); + + _log.Info($"===== 采集引擎已启动({_workers.Count}个采集地址)====="); + } + + /// + /// 停止采集引擎 + /// + public void Stop() + { + if (!_isRunning) return; + + _log.Info("===== 采集引擎停止中 ====="); + _isRunning = false; + + // 停止所有工作线程 + foreach (var kvp in _workers) + { + kvp.Value.Stop(); + } + _workers.Clear(); + + // 结账所有活跃段 + _tracker.Dispose(); + + // 停止定时器 + _heartbeatTimer?.Dispose(); + _configPollTimer?.Dispose(); + _dailySummaryTimer?.Dispose(); + + // 写入停止状态心跳 + WriteHeartbeat("stopped"); + + _log.Info("===== 采集引擎已停止 ====="); + } + + /// + /// 刷新配置:重新加载采集地址,处理新增/删除/变更 + /// + public void Refresh() + { + _log.Info("刷新采集配置..."); + LoadAndStartWorkers(); + _log.Info("采集配置刷新完成"); + } + + /// + /// 获取引擎状态摘要 + /// + public Dictionary GetStatus() + { + var status = new Dictionary + { + ["isRunning"] = _isRunning, + ["startTime"] = _startTime.ToString("yyyy-MM-dd HH:mm:ss"), + ["uptimeSeconds"] = UptimeSeconds, + ["workerCount"] = _workers.Count, + ["totalSuccess"] = _totalSuccess, + ["totalFail"] = _totalFail + }; + + var workerList = new List>(); + foreach (var kvp in _workers) + { + workerList.Add(new Dictionary + { + ["addressId"] = kvp.Key, + ["name"] = kvp.Value.AddressName, + ["isRunning"] = kvp.Value.IsRunning + }); + } + status["workers"] = workerList; + + return status; + } + + /// + /// 加载启用的采集地址并启动工作线程 + /// + private void LoadAndStartWorkers() + { + try + { + List addresses; + using (var conn = new MySqlConnection(_config.BusinessConnection)) + { + addresses = conn.Query( + "SELECT id as Id, name as Name, url as Url, brand_id as BrandId, collect_interval as CollectInterval, is_enabled as IsEnabled, last_collect_time as LastCollectTime, last_collect_status as LastCollectStatus, fail_count as FailCount, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_collect_address WHERE is_enabled = 1" + ).AsList(); + } + + // 停止已删除的地址 + foreach (var kvp in _workers) + { + bool exists = false; + foreach (var addr in addresses) + { + if (addr.Id == kvp.Key) { exists = true; break; } + } + if (!exists) + { + kvp.Value.Stop(); + CollectWorker removed; + _workers.TryRemove(kvp.Key, out removed); + _log.Info($"已停止删除的采集地址: {kvp.Value.AddressName}"); + } + } + + // 启动新增的地址 + foreach (var addr in addresses) + { + if (!_workers.ContainsKey(addr.Id)) + { + var worker = new CollectWorker(addr, _config, _tracker, + _config.BusinessConnection, _config.LogConnection); + worker.Start(); + _workers[addr.Id] = worker; + _log.Info($"已启动采集地址: {addr.Name}(URL={addr.Url}, 间隔={addr.CollectInterval}秒)"); + } + } + } + catch (Exception ex) + { + _log.Error("加载采集地址失败", ex); + } + } + + /// + /// 心跳上报回调 + /// + private void OnHeartbeat(object state) + { + try + { + // 统计成功/失败次数 + long success = 0, fail = 0; + foreach (var kvp in _workers) + { + success += kvp.Value.SuccessCount; + fail += kvp.Value.FailCount; + } + _totalSuccess = success; + _totalFail = fail; + + WriteHeartbeat("running"); + } + catch (Exception ex) + { + _log.Error("心跳上报失败", ex); + } + } + + /// + /// 写入心跳记录到 log_collector_heartbeat + /// + private void WriteHeartbeat(string status) + { + try + { + using (var conn = new MySqlConnection(_config.LogConnection)) + { + conn.Execute(@"INSERT INTO log_collector_heartbeat (service_id, status, last_collect_time, success_count, fail_count, uptime_seconds, detail, created_at) + VALUES (@ServiceId, @Status, @LastCollectTime, @SuccessCount, @FailCount, @UptimeSeconds, @Detail, NOW())", + new + { + ServiceId = _config.ServiceId, + Status = status, + LastCollectTime = DateTime.Now, + SuccessCount = _totalSuccess, + FailCount = _totalFail, + UptimeSeconds = UptimeSeconds, + Detail = (string)null + }); + } + } + catch (Exception ex) + { + _log.Error("写入心跳记录失败", ex); + } + } + + /// + /// 配置轮询回调:检查是否有配置变更 + /// + private void OnConfigPoll(object state) + { + try + { + // 重新从DB加载运行时配置 + ConfigLoader.LoadRuntimeConfig(_config.BusinessConnection, _config); + + // 重新加载采集地址 + LoadAndStartWorkers(); + } + catch (Exception ex) + { + _log.Error("配置轮询失败", ex); + } + } + + /// + /// 日终汇总检查回调:检查是否到了配置的汇总时间 + /// + private void OnDailySummaryCheck(object state) + { + try + { + // 解析汇总时间 + TimeSpan summaryTime; + if (!TimeSpan.TryParse(_config.DailySummaryTime, out summaryTime)) + { + summaryTime = new TimeSpan(1, 0, 0); // 默认01:00 + } + + var now = DateTime.Now; + var targetTime = new DateTime(now.Year, now.Month, now.Day, summaryTime.Hours, summaryTime.Minutes, 0); + + // 检查是否到了汇总时间(±1分钟内) + if (Math.Abs((now - targetTime).TotalMinutes) <= 1) + { + // 汇总昨天的数据 + DateTime summaryDate = now.Date.AddDays(-1); + if (_lastSummaryDate != summaryDate) + { + _lastSummaryDate = summaryDate; + _dailySummary.Execute(summaryDate); + } + } + } + catch (Exception ex) + { + _log.Error("日终汇总检查失败", ex); + } + } + } +} diff --git a/src/CncCollector/Core/DailySummaryJob.cs b/src/CncCollector/Core/DailySummaryJob.cs new file mode 100644 index 0000000..c32277e --- /dev/null +++ b/src/CncCollector/Core/DailySummaryJob.cs @@ -0,0 +1,134 @@ +using System; +using System.Collections.Generic; +using Dapper; +using MySqlConnector; +using CncModels.Entity; +using CncModels.Enum; +using log4net; + +namespace CncCollector.Core +{ + /// + /// 日终汇总作业。 + /// 在配置的时间(默认01:00)执行,结账所有活跃段,汇总日产量数据。 + /// + public class DailySummaryJob + { + private static readonly ILog _log = LogManager.GetLogger(typeof(DailySummaryJob)); + + private readonly string _businessConnStr; + + /// + /// 初始化日终汇总作业 + /// + /// 业务库连接字符串 + public DailySummaryJob(string businessConnStr) + { + _businessConnStr = businessConnStr; + } + + /// + /// 执行日终汇总,汇总指定日期的产量数据 + /// + /// 要汇总的日期(通常为昨天) + public void Execute(DateTime summaryDate) + { + _log.Info($"========== 日终汇总开始(日期={summaryDate:yyyy-MM-dd}) =========="); + var sw = System.Diagnostics.Stopwatch.StartNew(); + + try + { + using (var conn = new MySqlConnection(_businessConnStr)) + { + conn.Open(); + using (var tran = conn.BeginTransaction()) + { + try + { + // 1. 结账所有活跃段(end_of_day) + conn.Execute(@"UPDATE cnc_production_segment + SET end_time = COALESCE(end_time, NOW()), + close_reason = @Reason, is_settled = 1, updated_at = NOW() + WHERE is_settled = 0 AND DATE(start_time) <= @Date", + new { Reason = SegmentCloseReason.EndOfDay, Date = summaryDate }, tran); + + // 2. 为 quantity 为 NULL 的段计算产量 + conn.Execute(@"UPDATE cnc_production_segment + SET quantity = GREATEST(0, COALESCE(end_part_count, 0) - start_part_count) + WHERE quantity IS NULL AND is_settled = 1 AND DATE(start_time) <= @Date", + new { Date = summaryDate }, tran); + + // 3. 按 (machine_id, production_date, program_name) 汇总到 cnc_daily_production + // 先删除旧汇总(幂等) + conn.Execute(@"DELETE FROM cnc_daily_production WHERE production_date = @Date", + new { Date = summaryDate }, tran); + + // 插入新汇总 + conn.Execute(@"INSERT INTO cnc_daily_production (machine_id, production_date, program_name, total_quantity, segment_count, total_run_time, total_cutting_time, total_cycle_time, created_at, updated_at) + SELECT machine_id, @Date, program_name, + SUM(COALESCE(quantity, 0)), + COUNT(*), + NULL, NULL, NULL, + NOW(), NOW() + FROM cnc_production_segment + WHERE DATE(start_time) = @Date AND is_settled = 1 + GROUP BY machine_id, program_name", + new { Date = summaryDate }, tran); + + // 4. 汇总到 cnc_worker_daily_summary(通过绑定关系) + conn.Execute(@"DELETE FROM cnc_worker_daily_summary WHERE production_date = @Date", + new { Date = summaryDate }, tran); + + conn.Execute(@"INSERT INTO cnc_worker_daily_summary (worker_id, production_date, total_quantity, machine_count, program_count, created_at, updated_at) + SELECT wm.worker_id, @Date, + SUM(dp.total_quantity), + COUNT(DISTINCT dp.machine_id), + COUNT(DISTINCT dp.program_name), + NOW(), NOW() + FROM cnc_daily_production dp + INNER JOIN cnc_worker_machine wm ON dp.machine_id = wm.machine_id + WHERE dp.production_date = @Date + GROUP BY wm.worker_id", + new { Date = summaryDate }, tran); + + // 5. 更新 cnc_machine_daily_status + conn.Execute(@"DELETE FROM cnc_machine_daily_status WHERE production_date = @Date", + new { Date = summaryDate }, tran); + + // 有采集记录的 → normal + conn.Execute(@"INSERT INTO cnc_machine_daily_status (machine_id, production_date, data_status, created_at, updated_at) + SELECT DISTINCT machine_id, @Date, 'normal', NOW(), NOW() + FROM cnc_collect_record WHERE DATE(collect_time) = @Date", + new { Date = summaryDate }, tran); + + // 未开机 → offline + conn.Execute(@"INSERT INTO cnc_machine_daily_status (machine_id, production_date, data_status, created_at, updated_at) + SELECT m.id, @Date, 'offline', NOW(), NOW() + FROM cnc_machine m + WHERE m.is_enabled = 1 AND m.id NOT IN ( + SELECT DISTINCT machine_id FROM cnc_collect_record WHERE DATE(collect_time) = @Date + ) AND m.id NOT IN ( + SELECT machine_id FROM cnc_machine_daily_status WHERE production_date = @Date + )", + new { Date = summaryDate }, tran); + + tran.Commit(); + } + catch + { + tran.Rollback(); + throw; + } + } + } + + sw.Stop(); + _log.Info($"========== 日终汇总完成(日期={summaryDate:yyyy-MM-dd}, 耗时={sw.ElapsedMilliseconds}ms) =========="); + } + catch (Exception ex) + { + _log.Error($"日终汇总失败(日期={summaryDate:yyyy-MM-dd})", ex); + } + } + } +} diff --git a/src/CncCollector/Core/DataParser.cs b/src/CncCollector/Core/DataParser.cs new file mode 100644 index 0000000..7dcae5b --- /dev/null +++ b/src/CncCollector/Core/DataParser.cs @@ -0,0 +1,124 @@ +using System; +using System.Collections.Generic; +using Newtonsoft.Json.Linq; +using CncModels.Entity; +using log4net; + +namespace CncCollector.Core +{ + /// + /// JSON 解析 + 字段映射引擎。 + /// 根据品牌配置从采集到的 JSON 中提取结构化字段数据。 + /// + public class DataParser + { + private static readonly ILog _log = LogManager.GetLogger(typeof(DataParser)); + + /// + /// 解析后的字段容器 + /// + public class ParsedField + { + /// 标准字段名(如 program_name) + public string StandardField { get; set; } + + /// 字段值(字符串形式) + public string StringValue { get; set; } + + /// 数值(如果可解析为 decimal) + public decimal? NumericValue { get; set; } + + /// 数据类型(number/string) + public string DataType { get; set; } + } + + /// + /// 解析单台设备的 JSON tags 数据 + /// + /// 设备 JSON 对象 + /// 品牌配置 + /// 品牌字段映射列表 + /// 标准字段名 → 解析后的字段值 + public static Dictionary ParseDevice(JObject deviceObj, Brand brand, List mappings) + { + var result = new Dictionary(); + + if (deviceObj == null || mappings == null || mappings.Count == 0) + return result; + + // 定位 tags 数组 + string tagsPath = brand?.TagsPath ?? "tags"; + JToken tagsToken = deviceObj.SelectToken(tagsPath); + if (tagsToken == null || tagsToken.Type != JTokenType.Array) + { + _log.Warn($"无法定位 tags 数组(路径: {tagsPath})"); + return result; + } + + var tagsArray = tagsToken as JArray; + + // 构建 tag id → value 的快速查找字典 + var tagDict = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var tag in tagsArray) + { + string id = tag["id"]?.ToString() ?? ""; + string value = tag["value"]?.ToString() ?? ""; + if (!string.IsNullOrEmpty(id)) + { + tagDict[id] = value; + } + } + + // 按字段映射表提取 + foreach (var mapping in mappings) + { + string tagId = mapping.FieldName; // FieldName 存的是 tag 的 id(如 "Tag5") + string standardField = mapping.StandardField; + string dataType = mapping.DataType ?? "string"; + + string rawValue; + if (!tagDict.TryGetValue(tagId, out rawValue)) + { + // 字段不存在,跳过(非必填字段可能不存在) + continue; + } + + var field = new ParsedField + { + StandardField = standardField, + DataType = dataType, + StringValue = rawValue + }; + + // 数值型:去除 .00000 尾缀 + if (dataType == "number" && !string.IsNullOrEmpty(rawValue)) + { + // 尝试解析为 decimal + string cleanValue = rawValue.Trim(); + decimal numVal; + if (decimal.TryParse(cleanValue, System.Globalization.NumberStyles.Float, + System.Globalization.CultureInfo.InvariantCulture, out numVal)) + { + field.NumericValue = numVal; + } + } + + result[standardField] = field; + } + + return result; + } + + /// + /// 从设备 JSON 对象中提取 device 字段值 + /// + /// 设备 JSON 对象 + /// 品牌配置的 device 字段名(默认 "device") + /// device 值 + public static string ExtractDeviceCode(JObject deviceObj, string deviceField = "device") + { + if (deviceObj == null) return null; + return deviceObj[deviceField]?.ToString(); + } + } +} diff --git a/src/CncCollector/Core/ProductionTracker.cs b/src/CncCollector/Core/ProductionTracker.cs new file mode 100644 index 0000000..2a19f4b --- /dev/null +++ b/src/CncCollector/Core/ProductionTracker.cs @@ -0,0 +1,260 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using Dapper; +using MySqlConnector; +using CncModels.Entity; +using CncModels.Enum; +using log4net; + +namespace CncCollector.Core +{ + /// + /// 零件产量分段跟踪引擎。 + /// 为每台机床维护内存中的当前活跃段状态,检测程序切换和手动清零。 + /// + public class ProductionTracker : IDisposable + { + private static readonly ILog _log = LogManager.GetLogger(typeof(ProductionTracker)); + private readonly string _connectionString; + private readonly object _lock = new object(); + + /// + /// 内存缓存:machineId → 当前活跃段ID(减少DB查询) + /// + private readonly ConcurrentDictionary _activeSegmentIds = new ConcurrentDictionary(); + + /// + /// 内存缓存:machineId → 上一次采集的 (programName, partCount) + /// + private readonly ConcurrentDictionary> _lastCollectState = new ConcurrentDictionary>(); + + /// + /// 初始化产量跟踪器 + /// + /// 业务库连接字符串 + public ProductionTracker(string connectionString) + { + _connectionString = connectionString; + } + + /// + /// 处理一次采集结果:检测程序切换/手动清零,管理活跃段 + /// + /// 机床ID + /// 当前NC程序名 + /// 当前零件数 + /// 采集时间 + public void Track(int machineId, string programName, decimal? partCount, DateTime collectTime) + { + if (string.IsNullOrEmpty(programName)) return; + + lock (_lock) + { + try + { + // 获取上次采集状态 + Tuple lastState; + bool hasLast = _lastCollectState.TryGetValue(machineId, out lastState); + + // 检测是否需要结账 + bool needClose = false; + string closeReason = ""; + + if (hasLast) + { + string lastProgram = lastState.Item1; + decimal? lastPartCount = lastState.Item2; + + // 情况1:NC程序名变化 + if (!string.Equals(lastProgram, programName, StringComparison.OrdinalIgnoreCase)) + { + needClose = true; + closeReason = SegmentCloseReason.ProgramChange; + _log.Info($"机床{machineId}: 程序切换 {lastProgram} → {programName}({closeReason})"); + } + // 情况2:同程序下 part_count 下降(手动清零) + else if (partCount.HasValue && lastPartCount.HasValue && partCount.Value < lastPartCount.Value) + { + needClose = true; + closeReason = SegmentCloseReason.ManualReset; + _log.Info($"机床{machineId}: 零件数下降 {lastPartCount} → {partCount}({closeReason})"); + } + } + + // 结账当前活跃段 + if (needClose) + { + CloseActiveSegment(machineId, partCount, closeReason, collectTime); + } + + // 确保有活跃段 + long activeSegmentId = EnsureActiveSegment(machineId, programName, partCount, collectTime); + + // 更新活跃段的实时 end_part_count + if (activeSegmentId > 0 && partCount.HasValue) + { + UpdateSegmentEndCount(activeSegmentId, partCount.Value); + } + + // 更新内存缓存 + _lastCollectState[machineId] = Tuple.Create(programName, partCount); + } + catch (Exception ex) + { + _log.Error($"产量跟踪处理失败(machine_id={machineId})", ex); + } + } + } + + /// + /// 服务停止时结账所有活跃段 + /// + public void CloseAllActiveSegments() + { + _log.Info("正在结账所有活跃段(服务停止)..."); + + try + { + using (var conn = new MySqlConnection(_connectionString)) + { + // 结账所有活跃段:is_settled=0 且 end_time IS NULL + conn.Execute(@"UPDATE cnc_production_segment + SET end_time = NOW(), end_part_count = start_part_count, quantity = 0, + close_reason = @Reason, is_settled = 1, updated_at = NOW() + WHERE is_settled = 0 AND end_time IS NULL", + new { Reason = SegmentCloseReason.ServiceStop }); + } + + _activeSegmentIds.Clear(); + _lastCollectState.Clear(); + _log.Info("所有活跃段已结账"); + } + catch (Exception ex) + { + _log.Error("结账所有活跃段失败", ex); + } + } + + /// + /// 结账指定机床的活跃段 + /// + /// 机床ID + /// 结束时的零件数 + /// 关闭原因 + /// 结束时间 + public void CloseActiveSegment(int machineId, decimal? endPartCount, string reason, DateTime endTime) + { + try + { + long segmentId; + if (!_activeSegmentIds.TryGetValue(machineId, out segmentId)) + { + // 从DB查找 + using (var conn = new MySqlConnection(_connectionString)) + { + segmentId = conn.ExecuteScalar( + "SELECT id FROM cnc_production_segment WHERE machine_id=@MId AND is_settled=0 AND end_time IS NULL ORDER BY start_time DESC LIMIT 1", + new { MId = machineId }); + } + } + + if (segmentId <= 0) return; + + using (var conn = new MySqlConnection(_connectionString)) + { + // 获取 start_part_count 计算 quantity + var startCount = conn.ExecuteScalar( + "SELECT start_part_count FROM cnc_production_segment WHERE id=@Id", new { Id = segmentId }); + + decimal? quantity = null; + if (startCount.HasValue && endPartCount.HasValue) + { + quantity = Math.Max(0, endPartCount.Value - startCount.Value); + } + + conn.Execute(@"UPDATE cnc_production_segment + SET end_time = @EndTime, end_part_count = @EndPartCount, quantity = @Quantity, + close_reason = @Reason, is_settled = 1, updated_at = NOW() + WHERE id = @Id", + new { Id = segmentId, EndTime = endTime, EndPartCount = endPartCount, Quantity = quantity, Reason = reason }); + } + + _activeSegmentIds.TryRemove(machineId, out _); + _log.Debug($"机床{machineId}: 结账段{segmentId}({reason},quantity={endPartCount})"); + } + catch (Exception ex) + { + _log.Error($"结账活跃段失败(machine_id={machineId})", ex); + } + } + + /// + /// 确保指定机床有一个活跃段,没有则创建 + /// + private long EnsureActiveSegment(int machineId, string programName, decimal? partCount, DateTime collectTime) + { + long segmentId; + + if (_activeSegmentIds.TryGetValue(machineId, out segmentId) && segmentId > 0) + return segmentId; + + // 从DB查找 + using (var conn = new MySqlConnection(_connectionString)) + { + segmentId = conn.ExecuteScalar( + "SELECT id FROM cnc_production_segment WHERE machine_id=@MId AND is_settled=0 AND end_time IS NULL ORDER BY start_time DESC LIMIT 1", + new { MId = machineId }); + + if (segmentId <= 0) + { + // 创建新段 + var now = DateTime.Now; + segmentId = conn.ExecuteScalar(@"INSERT INTO cnc_production_segment + (machine_id, program_name, production_date, start_time, start_part_count, is_settled, created_at, updated_at) + VALUES (@MachineId, @ProgramName, @ProductionDate, @StartTime, @StartPartCount, 0, @CreatedAt, @UpdatedAt); + SELECT LAST_INSERT_ID();", + new + { + MachineId = machineId, + ProgramName = programName, + ProductionDate = collectTime.Date, + StartTime = collectTime, + StartPartCount = partCount ?? 0, + CreatedAt = now, + UpdatedAt = now + }); + + _log.Debug($"机床{machineId}: 创建新段{segmentId}(程序={programName}, startCount={partCount})"); + } + } + + _activeSegmentIds[machineId] = segmentId; + return segmentId; + } + + /// + /// 更新活跃段的实时 end_part_count + /// + private void UpdateSegmentEndCount(long segmentId, decimal partCount) + { + try + { + using (var conn = new MySqlConnection(_connectionString)) + { + conn.Execute("UPDATE cnc_production_segment SET end_part_count = @Count, updated_at = NOW() WHERE id = @Id", + new { Id = segmentId, Count = partCount }); + } + } + catch (Exception ex) + { + _log.Error($"更新活跃段end_part_count失败(segment_id={segmentId})", ex); + } + } + + public void Dispose() + { + CloseAllActiveSegments(); + } + } +} diff --git a/src/CncCollector/Program.cs b/src/CncCollector/Program.cs new file mode 100644 index 0000000..6180120 --- /dev/null +++ b/src/CncCollector/Program.cs @@ -0,0 +1,87 @@ +using System; +using System.IO; +using CncCollector.Config; +using CncCollector.Core; +using CncCollector.Api; +using log4net; + +namespace CncCollector +{ + /// + /// CNC机床数据采集服务主入口。 + /// 支持控制台模式运行和Windows Service部署。 + /// + class Program + { + private static readonly ILog _log = LogManager.GetLogger(typeof(Program)); + + static void Main(string[] args) + { + // 初始化log4net + var logConfig = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "log4net.config"); + if (File.Exists(logConfig)) + { + log4net.Config.XmlConfigurator.Configure(new FileInfo(logConfig)); + } + else + { + // 开发模式:从项目根目录读取 + logConfig = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "..", "..", "..", "log4net.config"); + if (File.Exists(logConfig)) + log4net.Config.XmlConfigurator.Configure(new FileInfo(logConfig)); + } + + Console.WriteLine("CNC 机床数据采集服务 v1.0"); + Console.WriteLine("================================================"); + + // 加载配置 + CollectorConfig config; + try + { + config = CollectorConfig.Load(); + _log.Info("配置文件加载成功"); + } + catch (Exception ex) + { + Console.WriteLine($"错误: 加载配置文件失败 - {ex.Message}"); + _log.Fatal("加载配置文件失败", ex); + Console.WriteLine("按任意键退出..."); + Console.ReadKey(); + return; + } + + // 从数据库加载运行时配置 + try + { + ConfigLoader.LoadRuntimeConfig(config.BusinessConnection, config); + } + catch (Exception ex) + { + _log.Warn("从数据库加载配置失败,使用文件默认值", ex); + Console.WriteLine($" [!] 数据库配置加载失败: {ex.Message}"); + } + + // 创建引擎和API + var engine = new CollectorEngine(config); + var apiServer = new CollectorApiServer(engine, config.ApiKey, config.ApiPort); + + // 启动引擎 + engine.Start(); + + // 启动管理API + apiServer.Start(); + + Console.WriteLine($"\n管理API: http://localhost:{config.ApiPort}/api/collector/status"); + Console.WriteLine($"API Key: {config.ApiKey}"); + Console.WriteLine($"\n按任意键退出..."); + Console.ReadKey(); + + // 退出 + engine.Stop(); + apiServer.Stop(); + + _log.Info("采集服务已退出"); + Console.WriteLine("已退出。"); + } + } +} diff --git a/src/CncCollector/Properties/AssemblyInfo.cs b/src/CncCollector/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..0e63ad3 --- /dev/null +++ b/src/CncCollector/Properties/AssemblyInfo.cs @@ -0,0 +1,18 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// 有关程序集的一般信息由以下特性控制。 +// 更改这些特性值可修改与程序集关联的信息。 +[assembly: AssemblyTitle("CncCollector")] +[assembly: AssemblyDescription("CNC机床数据采集服务")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("CncCollector")] +[assembly: AssemblyCopyright("Copyright © 2026")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// 程序集的版本信息由下列四个值组成: +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/CncCollector/collector.json b/src/CncCollector/collector.json new file mode 100644 index 0000000..f116327 --- /dev/null +++ b/src/CncCollector/collector.json @@ -0,0 +1,10 @@ +{ + "businessConnection": "Server=localhost;Database=cnc_business;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;", + "logConnection": "Server=localhost;Database=cnc_log;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;", + "apiPort": 5800, + "apiKey": "collector_api_key_2026", + "heartbeatIntervalSeconds": 10, + "configPollIntervalSeconds": 30, + "dailySummaryTime": "01:00", + "serviceId": "CncCollector" +} diff --git a/src/CncCollector/log4net.config b/src/CncCollector/log4net.config new file mode 100644 index 0000000..c1b1b5f --- /dev/null +++ b/src/CncCollector/log4net.config @@ -0,0 +1,24 @@ + + + + + + + + + + + + + + + + + + + + + + + +