You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
haoliang-net/docs/06-采集服务设计.md

34 KiB

CNC机床数据采集系统 - 采集服务设计文档

最后更新2026-04-30 状态:设计中


一、概述

采集服务CncCollector是一个独立 Windows Service负责定时从采集地址HTTP拉取JSON数据通过品牌字段映射解析为结构化数据实时更新机床状态追踪产量分段并执行日终汇总。

1.1 核心职责

职责 说明
HTTP数据拉取 按采集地址配置的间隔定时GET拉取JSON
字段映射解析 用品牌模板的映射规则从JSON中提取标准字段
实时状态更新 更新机床表的在线状态、采集状态、程序名、零件数等
产量分段追踪 检测NC程序名切换、零件数手动清零自动结账/开段
日终汇总 按自然天汇总产量到机床+程序级别、工人级别
原始数据存档 存原始JSON到日志库支持事后重解析
心跳上报 定时写心跳记录,供管理后台判断服务是否在线
告警生成 采集失败、未知设备、字段缺失等异常生成告警记录

1.2 技术栈

选型 说明
框架 .NET Framework 4.7.2 Windows Service 与Web API同框架
ORM Dapper 复用现有CncRepository层
HTTP客户端 System.Net.Http.HttpClient 拉取采集地址JSON
JSON解析 Newtonsoft.Json 12.0.3 与Web API版本一致
日志 log4net 文件日志 + DB日志双写
数据库 MariaDB 业务库 + 日志库双库
管理API HttpListener轻量自托管 5800端口独立于IIS

1.3 与现有系统的关系

┌─────────────────────────────────────────────────┐
│                 Windows Server                   │
│                                                  │
│  ┌──────────────┐    ┌───────────────────────┐  │
│  │  IIS          │    │  CncCollector          │  │
│  │  CncWebApi    │    │  (Windows Service)     │  │
│  │  :80          │    │  管理API :5800          │  │
│  │  管理后台/大屏  │    │  采集循环+Ping+汇总     │  │
│  └──────┬───────┘    └───────────┬───────────┘  │
│         │                        │               │
│         └──────────┬─────────────┘               │
│                    ▼                             │
│         ┌────────────────────┐                   │
│         │    MariaDB          │                  │
│         │  cnc_business       │                  │
│         │  cnc_log            │                  │
│         └────────────────────┘                   │
└─────────────────────────────────────────────────┘
  • CncCollector 复用现有的 CncModelsCncRepository 层(直接引用项目)
  • 不复用 CncService采集服务有独立的业务逻辑不依赖Web API的Service
  • 通过数据库共享数据不直接调用Web API

二、项目结构

2.1 新建项目

在解决方案中新增 CncCollector 项目:

src/CncCollector/
├── CncCollector.csproj         ← .NET Framework 4.7.2 类库
├── ServiceEntry.cs             ← Windows Service 入口ServiceBase子类
├── ServiceInstaller.cs         ← 安装器Installer子类installutil用
├── Program.cs                  ← 主入口Main
│
├── Engine/
│   ├── CollectorEngine.cs      ← 引擎主控(启停所有任务、生命周期管理)
│   ├── CollectorTask.cs        ← 单个采集地址的采集循环
│   └── PingTask.cs             ← 单个采集地址的Ping循环
│
├── Parser/
│   ├── FieldMapper.cs          ← 字段映射解析器纯函数输入JSON+映射规则→标准字段字典)
│   └── ValueConverter.cs       ← 数值转换(去.00000尾缀string/number类型转换
│
├── Tracker/
│   └── ProductionTracker.cs    ← 产量分段追踪(检测程序切换/手动清零→结账/开段)
│
├── Jobs/
│   └── DailySummaryJob.cs      ← 日终汇总任务(关闭活跃段→合并产量→工人汇总→日状态)
│
├── Config/
│   ├── ConfigProvider.cs       ← 配置热更新DB轮询+HTTP通知双保险
│   └── CollectorConfig.cs      ← 配置快照数据结构
│
├── Heartbeat/
│   └── HeartbeatReporter.cs    ← 心跳上报定时写log_collector_heartbeat
│
├── Api/
│   ├── ManagementServer.cs     ← 轻量HTTP管理APIHttpListener自托管
│   └── ManagementController.cs ← API端点实现状态/重载/启停/汇总)
│
├── Logging/
│   └── DualLogger.cs           ← 双写日志文件log4net + DB log_system
│
└── App.config                  ← 连接串、端口、API Key、log4net配置

2.2 项目引用

CncCollector → CncRepository → CncModels
                    ↘
              (不引用 CncService、CncWebApi

CncCollector 直接使用 Repository 层操作数据库,不经过 Service 层。原因:

  • 采集服务有大量独立的业务逻辑字段映射、产量分段、汇总计算与Web API的CRUD Service职责不同
  • 避免与Web API产生耦合Service层可能被修改影响采集服务

三、核心流程设计

3.1 采集地址的数据模型

采集地址cnc_collect_address
├── URLHTTP端点地址如 http://192.168.1.101/
├── 品牌:决定字段映射规则
├── 采集间隔:本地址独立配置
├── 关联机床:该地址下要采的机床列表
└── 启停状态is_enabled

一次HTTP GET 返回:
[  ← JSON数组
  { device:"fanake_1.8", desc:"西-1.8", tags:[{id:"Tag5",value:"1566.NC"},...] },
  { device:"fanake_1.9", desc:"西-1.9", tags:[{id:"Tag5",value:"O1"},...] },
  ...
]

device值 匹配 cnc_machine.device_code → 定位具体机床
tags数组 通过品牌字段映射规则 → 提取标准字段

3.2 双循环模型Ping + 采集)

每个启用的采集地址启动两个独立的 Task

3.2.1 Ping循环

PingTask(address):
  while(服务运行中 && address启用):
    try:
      success = Ping(address.Url)    // ICMP Ping采集地址
    catch:
      success = false

    更新该地址下所有机床:
      is_online = success ? 1 : 0
      last_ping_time = NOW()

    sleep(ping_interval)  // 全局配置默认60秒

关键点

  • Ping结果仅用于管理后台/大屏显示机床在线状态
  • Ping不通不影响采集逻辑——采集循环照常HTTP拉取
  • 机床表的 ip_address 字段不参与Ping逻辑仅信息记录
  • Ping是针对采集地址URLHTTP端点不是单台机床

3.2.2 采集循环

CollectTask(address):
  while(服务运行中 && address启用):
    roundStart = NOW()

    // 检查重载/停止请求(本轮开始前)
    if(停止请求): 安全退出
    if(重载请求): 执行重载后继续

    try:
      // 1. HTTP拉取
      responseJson, duration = HTTP GET address.Url

      // 2. 原始JSON存档日志库先于解析防止崩溃丢数据
      写入 log_collect_raw(is_success=1, raw_json=responseJson)

      // 3. 按device逐台解析
      devices = 解析JSON数组
      for each device in devices:
        处理单个设备(device, address)

      // 4. 更新采集地址状态
      address.last_collect_time = NOW()
      address.last_collect_status = "success"
      address.fail_count = 0

    catch(异常 ex):
      处理采集失败(address, ex)

    // 本轮完成,检查挂起的停止/重载请求
    if(停止请求): 安全退出
    if(重载请求): 执行重载

    // 计算本轮耗时sleep剩余间隔
    elapsed = NOW() - roundStart
    sleep(max(0, collect_interval - elapsed))

关键点

  • 原始JSON先写入日志库再解析防止解析过程中服务崩溃导致数据丢失
  • 每轮采集结束才检查停止/重载请求,保证不会中断中间状态
  • 采集间隔扣除本轮耗时,保证频率稳定

3.3 处理单个设备

处理单个设备(deviceObj, address):
  // 1. 匹配机床
  deviceCode = deviceObj[brand.deviceField]  // 默认取 device 字段
  machine = 按 device_code 查机床表

  if(machine == null):
    // 未知设备 → 告警带完整JSON
    创建告警(alert_type="unknown_device", detail=deviceObj完整JSON)
    return

  if(machine.is_enabled == 0):
    return  // 已停用的机床跳过

  // 2. 字段映射解析
  fields = FieldMapper.Map(deviceObj, address.字段映射规则)
  // fields = { program_name:"1566.NC", part_count:1219, device_status:1, ... }

  // 3. 处理缺失字段
  missingRequired = 检查必填字段缺失情况
  if(missingRequired.Count > 0):
    创建告警(alert_type="collect_fail", machine_id=machine.id,
              detail="缺失字段: " + string.Join(", ", missingRequired))
    // 缺失字段写null不跳过整条记录

  // 4. 写结构化记录(业务库 cnc_collect_record
  写入 CollectRecord(machine_id, collect_time=NOW(), fields)

  // 5. 更新机床实时状态
  更新 cnc_machine:
    last_collect_time = NOW()
    last_program_name = fields.program_name
    last_part_count = fields.part_count
    last_device_status = fields.device_status
    last_run_status = fields.run_status
    last_operate_mode = fields.operate_mode
    last_machining_status = fields.machining_status

  // 6. 产量分段追踪
  ProductionTracker.Track(machine.id, fields.program_name, fields.part_count)

3.4 字段映射解析器

输入

deviceObj = { "device": "fanake_1.8", "desc": "西-1.8", "tags": [...] }
mappings = [
  { standard_field: "program_name", field_name: "Tag5",  match_by: "id",   data_type: "string" },
  { standard_field: "part_count",   field_name: "Tag8",  match_by: "id",   data_type: "number" },
  { standard_field: "device_status",field_name: "_io_status", match_by: "id", data_type: "number" },
  ...
]

解析逻辑

FieldMapper.Map(deviceObj, mappings) → Dictionary<string, object>

对每条映射规则:
  1. 在 deviceObj.tags 数组中查找匹配:
     match_by == "id"   → 找 tag.id == field_name
     match_by == "desc" → 找 tag.desc == field_name

  2. 找到则提取 tag.value:
     data_type == "number" → 去除".00000"尾缀转decimal
     data_type == "string" → 直接取字符串

  3. 未找到:
     结果为 null
     if(is_required == 1) → 加入缺失列表

数值转换规则

ValueConverter.Convert(rawValue, dataType):

if(dataType == "number"):
  // "1219.00000" → 1219
  // "0.00000" → 0
  // "1566.NC" → null非数字不抛异常
  str = rawValue.Trim()
  if(decimal.TryParse(str, out result)):
    return result
  else:
    return null

if(dataType == "string"):
  return rawValue

3.5 产量分段追踪

核心状态机

每台机床的产量追踪状态 = cnc_production_segment 表中的一条活跃记录is_settled=0, end_time=NULL

触发结账的事件:
  1. NC程序名变化 → close_reason = "program_change"
  2. 同程序下part_count下降手动清零 → close_reason = "manual_reset"
  3. 日终汇总时仍在运行 → close_reason = "end_of_day"
  4. 服务停止/崩溃恢复 → close_reason = "service_stop" / "service_crash"

Track逻辑伪代码

ProductionTracker.Track(machineId, currentProgramName, currentPartCount):

  activeSegment = repository.GetActiveSegment(machineId)

  if(activeSegment == null):
    // 无活跃段 → 开新段
    CreateSegment(machineId, currentProgramName, currentPartCount)
    return

  if(activeSegment.program_name != currentProgramName):
    // NC程序切换 → 结账旧段,开新段
    quantity = activeSegment.start_part_count 不变上一段的part_count快照
    // 实际产量 = currentPartCount - activeSegment.start_part_count
    // 但注意旧的activeSegment.start_part_count是旧程序开始时的值
    CloseSegment(activeSegment.id,
                 endPartCount = currentPartCount,
                 quantity = currentPartCount - activeSegment.start_part_count,
                 closeReason = "program_change")
    CreateSegment(machineId, currentProgramName, currentPartCount)
    return

  if(currentPartCount < activeSegment.start_part_count):
    // 同程序下part_count下降 → 手动清零
    // 结账旧段(产量 = 清零前的值 - 段起始值)
    CloseSegment(activeSegment.id,
                 endPartCount = 上一次采集的part_count需要内存缓存,
                 quantity = 上一次采集的part_count - activeSegment.start_part_count,
                 closeReason = "manual_reset")
    // 开新段,从当前值(清零后的值)开始
    CreateSegment(machineId, currentProgramName, currentPartCount)
    return

  // 正常运行 → 无需操作,活跃段继续
  // 可选:更新内存中"上一次采集的part_count"缓存

part_count缓存

为了检测手动清零,需要知道上一次采集的part_count。在内存中维护:

Dictionary<int, decimal?> _lastPartCountByMachine  // machineId → 上次part_count

每次采集后更新:
  _lastPartCountByMachine[machineId] = currentPartCount

手动清零判断条件:
  currentPartCount < activeSegment.start_part_count
  (零件数比段起始值还小,说明被手动清零了)

A-B-C-A-B场景

时间线:
  10:00  program=A, part=0   → 开段(A, start=0)
  10:30  program=A, part=10  → 正常
  11:00  program=B, part=0   → 结段(A, qty=10), 开段(B, start=0)
  11:30  program=C, part=0   → 结段(B, qty=0), 开段(C, start=0)
  12:00  program=A, part=0   → 结段(C, qty=0), 开段(A, start=0) ← 新段,不合并
  12:30  program=A, part=5   → 正常

日汇总时:
  按 (machine_id, production_date, program_name) 合并
  program=A 的总产量 = 10 + 5 = 15
  program=B 的总产量 = 0
  program=C 的总产量 = 0

四、采集失败与重试

4.1 重试机制

一轮采集循环中的重试:

  第1次采集 → 失败
  sleep(retry_interval)           // 默认30秒后台可配
  第2次采集 → 失败
  sleep(retry_interval)
  第3次采集 → 失败(达到 retry_count 上限默认3次后台可配

  重试总耗时 = retry_count × retry_interval

  if(重试总耗时 < collect_interval):
    // 正常情况:重试耗时不影响下一轮
    fail_count++
    sleep(collect_interval - 重试总耗时)   // 等待剩余间隔
  else:
    // 异常情况:重试耗时超过了采集间隔
    // 重置本轮错误状态(不算一次有效失败)
    // 直接进入下一轮采集
    不增加 fail_count
    立即开始下一轮

  if(fail_count >= collect_fail_alert_threshold):  // 默认5次
    创建告警(alert_type="collect_fail")
    fail_count = 0  // 重置,避免重复告警

4.2 配置约束

配置项 默认值 说明
collect_retry_count 3 每轮最大重试次数
collect_retry_interval 30 重试间隔(秒)
collect_fail_alert_threshold 5 连续失败N轮触发告警

约束:正常配置下 retry_count × retry_interval < collect_interval。如果不满足,重试超时不算失败。

4.3 fail_count 连续性

  • 采集成功一次 → fail_count 立即清零
  • 只有连续失败才累积
  • 达到阈值触发告警后清零,避免同一问题重复告警

五、配置热更新

5.1 双保险机制

管理后台修改 → 写DB → HTTP通知采集服务 → 采集服务验证生效

              ┌─── 即时通知HTTP POST /api/reload───┐
              │                                         │
管理后台写DB ──┤                                         ├──→ 采集服务重载配置
              │                                         │
              └─── 兜底轮询每30秒查DB─────────────────┘

5.2 重载流程

1. 管理后台修改采集地址/品牌模板 → 写入数据库

2. 管理后台调用采集服务API:
   POST http://localhost:5800/api/reload
   Headers: X-Api-Key: {collector_api_key}

3. 采集服务收到重载请求:
   a. 标记"重载请求挂起",不立即中断
   b. 等待所有地址当前轮采集完成
   c. 从DB重新加载配置:
      - cnc_collect_address启用的地址列表
      - cnc_brand_field_mapping字段映射规则
      - cnc_sys_config系统配置
   d. 对比新旧配置,生成变更摘要
   e. 调整运行中的任务:
      - 新增的地址 → 启动新的Ping+采集Task
      - 停用的地址 → 停止对应Task
      - 变更的地址 → 停止旧Task启动新Task
   f. 返回变更结果

4. 管理后台验证:
   GET http://localhost:5800/api/config/hash
   对比本地DB配置的hash → 一致则确认生效

5.3 配置快照

内存中维护的配置数据结构:

class CollectorConfig
{
    string ConfigHash;                          // MD5指纹快速判断是否变更
    List<CollectAddress> Addresses;             // 启用的采集地址列表
    Dictionary<int, List<BrandFieldMapping>> BrandMappings;  // brandId → 字段映射
    Dictionary<string, string> SysConfig;       // 系统配置键值对
    DateTime LoadedAt;                          // 加载时间
}

5.4 管理后台交互

管理后台修改采集地址/品牌模板后:

  • 保存按钮旁显示"已通知采集服务重载 ✓"或"通知失败将在30秒后自动同步 ⚠"
  • 通过Web API后端转发HTTP请求到采集服务前端不直连5800端口

六、日终汇总

6.1 触发机制(启动时补检 + 内置定时器)

DailySummaryJob:
  // 启动时自检
  OnStart():
    yesterday = DateTime.Today.AddDays(-1)
    if(昨天尚未汇总):
      立即执行昨天的汇总

  // 定时检查(每分钟)
  Timer(60秒):
    now = DateTime.Now
    targetDate = DateTime.Today.AddDays(-1)  // 汇总昨天

    if(now >= 配置的daily_summary_time):     // 默认01:00
      if(今天尚未执行过汇总):
        执行汇总(targetDate)
        记录"今天已汇总"

  // HTTP API 手动触发
  OnManualTrigger(date):
    执行汇总(date)  // 幂等:先删旧数据再重算

优势

  • 服务01:00挂了02:00恢复后自动补上
  • 手动触发支持重算任意日期
  • 幂等设计:同一天汇总多次不会重复

6.2 汇总步骤

DailySummaryJob.Execute(targetDate):

  for each 启用的机床:
    // Step 1: 关闭该机床的所有活跃段
    activeSegments = 查询(machine_id, is_settled=0, end_time=NULL)
    for each segment in activeSegments:
      CloseSegment(segment.id,
                   endPartCount = lastPartCount缓存或段内最大part_count,
                   quantity = endPartCount - segment.start_part_count,
                   closeReason = "end_of_day")

    // Step 2: 标记结算
    SettleByDate(targetDate)

    // Step 3: 按程序合并 → cnc_daily_production
    segments = 查询该机床targetDate的所有段
    grouped = segments.GroupBy(s => s.program_name)
    for each group:
      UpsertDailyProduction(
        machine_id, targetDate, program_name,
        total_quantity = Sum(group.quantity),
        segment_count = group.Count()
      )

    // Step 4: 写机床日状态
    UpsertMachineDailyStatus(
      machine_id, targetDate,
      data_status = 根据当天采集情况判断:
        "normal"       → 至少成功采集过一次
        "offline"      → 当天从未Ping通
        "data_missing" → Ping通但采集始终失败
    )

  // Step 5: 工人汇总 → cnc_worker_daily_summary
  for each worker:
    machines = 该工人绑定的机床列表
    workerProductions = 查这些机床在targetDate的daily_production
    UpsertWorkerDailySummary(
      worker_id, targetDate,
      total_quantity = Sum(workerProductions.total_quantity),
      machine_count = 去重机床数,
      program_count = 去重程序名数
    )

  // Step 6: 未开机机床插0产量记录
  allEnabledMachines = 所有启用的机床
  machinesWithData = 当天有daily_production记录的机床
  for each machine in (allEnabledMachines - machinesWithData):
    InsertDailyProduction(machine.id, targetDate, quantity=0)
    UpsertMachineDailyStatus(machine.id, targetDate, "offline")

七、心跳上报

7.1 心跳机制

HeartbeatReporter:
  每 heartbeat_interval 秒默认10秒可配:

  写入 log_collector_heartbeat:
    service_id = "CncCollector"
    status = "running"
    collect_address_id = null全局心跳
    last_collect_time = 最近一次成功采集时间
    success_count = 本轮成功采集次数
    fail_count = 本轮失败采集次数
    uptime_seconds = (NOW() - 服务启动时间).TotalSeconds
    detail = JSON.stringify({
      activeTasks: 活跃采集任务数,
      activePings: 活跃Ping任务数,
      configHash: 当前配置指纹
    })

7.2 Web API判断服务状态

-- 管理后台/大屏查询采集服务是否在线
SELECT * FROM log_collector_heartbeat
WHERE service_id = 'CncCollector'
  AND created_at > NOW() - INTERVAL 30 SECOND
ORDER BY created_at DESC LIMIT 1
-- 有记录 → 运行中;无记录 → 已停止

八、日志系统

8.1 双写策略

事件类型 文件日志 DB日志(log_system)
每次采集成功 INFO 不写(量太大)
采集失败/重试 ERROR ERROR
字段缺失告警 WARN 写入cnc_alert
未知设备告警 WARN 写入cnc_alert
配置变更/重载 INFO INFO
服务启停 INFO INFO
日终汇总 INFO INFO
异常崩溃 FATAL ERROR如果还能写的话
Ping状态变化 DEBUG

8.2 文件日志格式

logs/collector-2026-04-30.log

[2026-04-30 01:00:02 INFO ] 日终汇总开始目标日期2026-04-29
[2026-04-30 01:00:05 INFO ] 日终汇总完成处理160台机床耗时3.2秒
[2026-04-30 01:00:30 INFO ] 地址[FANUC-1号] 采集成功获取24台设备数据耗时120ms
[2026-04-30 01:01:00 WARN ] 机床[西-1.8](id=5) 必填字段缺失: spindle_speed_set
[2026-04-30 01:02:00 ERROR] 地址[FANUC-2号] 采集失败(第3次重试): 连接超时
[2026-04-30 01:02:05 INFO ] 配置重载完成新增地址1个停用地址1个更新映射3条
[2026-04-30 01:02:30 WARN ] 未知设备 device=fanake_1.15,采集地址[FANUC-1号]

8.3 log4net配置要点

<!-- App.config 中 log4net 配置 -->
<log4net>
  <appender name="RollingFile" type="log4net.Appender.RollingFileAppender">
    <file value="logs/collector" />
    <datePattern value="-yyyy-MM-dd'.log'" />
    <maximumFileSize value="50MB" />
    <maxSizeRollBackups value="10" />
    <layout type="log4net.Layout.PatternLayout">
      <conversionPattern value="[%date %-5level] %message%newline" />
    </layout>
  </appender>
  <root>
    <level value="INFO" />
    <appender-ref ref="RollingFile" />
  </root>
</log4net>

九、优雅停止与异常恢复

9.1 优雅停止

核心原则:收到停止请求后,等待所有地址当前轮采集完成再退出

CollectorEngine.Stop():
  1. 标记 _stopRequested = true
  2. 日志记录"收到停止请求,等待当前轮采集完成"
  3. 等待所有CollectorTask和PingTask完成带超时
     Task.WaitAll(allTasks, timeout: 30秒)
  4. 关闭所有活跃的产量分段:
     for each 活跃段:
       CloseSegment(reason="service_stop")
  5. 写最终心跳(status="stopped")
  6. 日志记录"服务已停止"
  7. 释放资源

9.2 配置重载时的保护

CollectorEngine.Reload():
  1. 标记 _reloadRequested = true
  2. 日志记录"收到重载请求,等待当前轮采集完成"
  3. 不中断任何正在执行的采集任务
  4. 等待所有任务到达本轮结束点(循环顶部检查标记)
  5. 从DB重新加载配置
  6. 对比新旧配置,生成变更摘要
  7. 停掉不再需要的任务
  8. 启动新任务
  9. 清除 _reloadRequested 标记
  10. 返回变更摘要

9.3 异常停止(崩溃/强杀)的数据防护

产量分段——天然事务边界

采集服务对产量分段的操作只有两种原子动作:
1. 开新段INSERT → end_time=NULL
2. 结旧段UPDATE → 设置end_time

最坏情况:结旧段成功但开新段失败
后果:机床没有活跃段
恢复:下次服务启动时自检恢复(见下)
数据丢失:最多丢失一次采集的产量,不会出现脏数据

服务启动时自检恢复

CollectorEngine.OnStart():
  1. 加载配置
  2. 自检恢复:
     activeSegments = 查询所有 is_settled=0 AND end_time=NULL 的段
     for each segment:
       if(segment.start_time 距今 > 2小时):
         // 很可能是崩溃遗留的孤儿段
         CloseSegment(segment.id,
                      endPartCount = segment.start_part_count,
                      quantity = 0,
                      closeReason = "service_crash",
                      endTime = segment.start_time.AddMinutes(5))
  3. 启动Ping+采集任务
  4. 自检日终汇总

原始JSON先写后解析

采集流程中的写入顺序:
  第1步写入 log_collect_raw原始JSON + is_success=1  ← 先保存
  第2步逐台解析 + 写 cnc_collect_record                  ← 后处理

如果第2步中途崩溃:
  - 原始JSON已安全保存
  - 可通过管理API手动触发"重解析"按log_collect_raw的记录重新跑字段映射

采集记录——天然原子性

每台设备的结构化数据是一条独立的 INSERT
即使中途崩溃,已写入的记录是完整的
未写入的记录只是缺失,下次采集会补上

十、管理API

10.1 端点列表

方法 URL 说明 认证
GET /api/status 服务状态(运行时长、任务数、各地址状态) X-Api-Key
POST /api/reload 立即重载配置(等本轮完成后执行) X-Api-Key
POST /api/stop 停止所有采集任务 X-Api-Key
POST /api/start 启动所有采集任务 X-Api-Key
GET /api/tasks 当前运行的任务列表 X-Api-Key
GET /api/config/snapshot 当前生效的配置快照 X-Api-Key
GET /api/config/hash 配置指纹MD5 X-Api-Key
POST /api/daily-summary 手动触发日终汇总,参数: date=2026-04-29 X-Api-Key
POST /api/reparse 重解析指定原始记录,参数: rawId=12345 X-Api-Key

10.2 认证

所有管理API请求需携带 Header X-Api-Key,值与 cnc_sys_config 表中的 collector_api_key 一致。

10.3 实现方式

使用 System.Net.HttpListener 自托管独立于IIS

// ManagementServer.cs
HttpListener listener = new HttpListener();
listener.Prefixes.Add("http://+:{port}/api/");
listener.Start();

// 请求处理循环
while(running)
{
  var context = listener.GetContext();
  // 路由到 ManagementController 对应方法
}

10.4 响应格式

{
  "code": 0,
  "message": "success",
  "data": { ... }
}

与Web API统一格式。


十一、告警类型汇总

alert_type 触发条件 detail内容
collect_fail 连续N轮采集失败 失败原因摘要
collect_fail 必填字段缺失 缺失字段列表
unknown_device device匹配不到机床表 完整device JSON对象
device_offline 预留当前Ping状态直接更新机床表 -
service_error 采集服务内部异常 异常消息+堆栈

告警频率控制

  • 同一台机床同一个问题5分钟内不重复告警
  • 未知设备告警同一device编码1小时内不重复
  • 告警表不记录采集成功的正常事件

十二、App.config 配置项

<configuration>
  <connectionStrings>
    <add name="BusinessConnection"
         connectionString="Server=localhost;Database=cnc_business;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;" />
    <add name="LogConnection"
         connectionString="Server=localhost;Database=cnc_log;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;" />
  </connectionStrings>

  <appSettings>
    <!-- 服务标识 -->
    <add key="ServiceId" value="CncCollector" />

    <!-- 管理API -->
    <add key="ManagementApiPort" value="5800" />
    <add key="CollectorApiKey" value="与cnc_sys_config.collector_api_key一致" />

    <!-- 采集参数首次启动默认值后续从DB读取 -->
    <add key="PingInterval" value="60" />
    <add key="CollectRetryCount" value="3" />
    <add key="CollectRetryInterval" value="30" />
    <add key="CollectFailAlertThreshold" value="5" />
    <add key="HeartbeatInterval" value="10" />
    <add key="ConfigPollInterval" value="30" />

    <!-- 日终汇总 -->
    <add key="DailySummaryTime" value="01:00" />

    <!-- 日志 -->
    <add key="LogLevel" value="INFO" />
  </appSettings>
</configuration>

十三、数据库交互清单

13.1 采集服务读取的表

用途 频率
cnc_collect_address 业务 加载启用的采集地址列表 每30秒/重载时
cnc_brand_field_mapping 业务 加载品牌字段映射规则 每30秒/重载时
cnc_brand 业务 获取品牌device_field/tags_path 每30秒/重载时
cnc_machine 业务 按device_code匹配机床 每次采集
cnc_production_segment 业务 查活跃段 每次采集每台机床
cnc_worker_machine 业务 查机床绑定的工人(日汇总用) 日汇总时
cnc_sys_config 业务 读取系统配置 每30秒/重载时
log_collect_raw 日志 读取原始记录(重解析用) 按需

13.2 采集服务写入的表

用途 频率
log_collect_raw 日志 存原始JSON 每次采集
log_collector_heartbeat 日志 心跳 每10秒
log_system 日志 关键事件日志 关键事件时
cnc_collect_record 业务 结构化采集记录 每次采集每台机床
cnc_production_segment 业务 开段/结段 程序切换/清零时
cnc_daily_production 业务 日汇总产量 日汇总时
cnc_worker_daily_summary 业务 工人日汇总 日汇总时
cnc_machine_daily_status 业务 机床日状态 日汇总时
cnc_machine 业务 更新实时状态字段 每次采集每台机床
cnc_collect_address 业务 更新last_collect_time/fail_count 每次采集
cnc_alert 业务 告警记录 异常时

13.3 需要新增的Repository方法

以下方法在现有接口中不存在,需要新增:

// ICollectRecordRepository新建
long Create(CollectRecord entity);

// IMachineRepository补充
void UpdateRealtimeStatus(int machineId, Machine realtimeFields);

// IDailyProductionRepository补充
int Upsert(DailyProduction entity);  // INSERT ON DUPLICATE KEY UPDATE
void DeleteByMachineAndDate(int machineId, DateTime date);

// IWorkerDailySummaryRepository补充
int Upsert(WorkerDailySummary entity);

十四、部署与运维

14.1 安装为Windows Service

# 安装
installutil CncCollector.exe

# 卸载
installutil /u CncCollector.exe

# 或使用 sc 命令
sc create CncCollector binPath= "E:\opencode\haoliang\src\CncCollector\bin\Debug\CncCollector.exe"
sc start CncCollector
sc stop CncCollector

14.2 文件部署位置

E:\opencode\haoliang\src\CncCollector\bin\Debug\
├── CncCollector.exe
├── CncCollector.exe.config    ← App.config 编译输出
├── CncModels.dll              ← 引用的模型层
├── CncRepository.dll          ← 引用的仓储层
├── Dapper.dll
├── MySqlConnector.dll
├── Newtonsoft.Json.dll
├── log4net.dll
└── logs\                      ← 日志目录(自动创建)

14.3 与Web API共享数据库

  • 采集服务和Web API同时读写同一个MariaDB
  • 采集服务只写自己的表采集记录、产量段、日汇总等不会覆盖Web API管理的配置数据
  • 配置变更通过DB传递双保险保证一致

14.4 防火墙

  • 开放5800端口供管理API通信仅本机/局域网访问)
  • 采集服务需要出站HTTP访问采集地址URL
  • Ping需要ICMP出站权限

十五、开发顺序建议

步骤 内容 依赖
1 新建CncCollector项目 + 引用CncModels/CncRepository + App.config
2 FieldMapper + ValueConverter纯函数可独立测试 步骤1
3 CollectorTask 单地址采集循环HTTP拉取+解析+写库) 步骤2
4 ProductionTracker 产量分段追踪 步骤1
5 PingTask Ping循环 步骤1
6 CollectorEngine 引擎启停所有Task 步骤3+4+5
7 HeartbeatReporter 心跳 步骤1
8 ConfigProvider 配置热更新 步骤6
9 DailySummaryJob 日终汇总 步骤4
10 ManagementServer + ManagementController 管理API 步骤6+8+9
11 DualLogger 双写日志 步骤1
12 ServiceEntry + ServiceInstaller Windows Service包装 步骤6+10
13 集成测试(完整链路) 全部

建议先跑通 步骤1→2→3 这条最小链路(一个地址→拉取→解析→写库),再逐步叠加其他功能。