DObject共享内存协议

来自MDCS wiki2
跳到导航 跳到搜索


概述 / Overview

DObject 是 MDCS 的跨进程 命名共享内存 pub/sub 通讯原语。Medulla 把传感器数据、上下位 IO、调度状态等都通过 DObject 发布;Detour / Clumsy / SimpleComposer / 监控工具按名字订阅。整个 MDCS 的进程间通讯只有这一条总线。

DObject is MDCS's cross-process named-shared-memory pub/sub primitive. Medulla publishes sensor data, upper/lower IO, scheduler state via DObject; Detour / Clumsy / SimpleComposer / monitoring tools subscribe by name. It's the sole IPC bus in MDCS.

实现:`D:\src\Fundamentals\FundamentalLib\DObject.cs:18-344`。 Implementation: `D:\src\Fundamentals\FundamentalLib\DObject.cs:18-344`.

存储 / Storage

平台 / Platform 后端 / Backing
Windows `MemoryMappedFile("io_shared", 256 MB)` + meta `MemoryMappedFile("io_shared_meta", 1024 B)`
Linux `$TMPDIR/io_shared` 文件 / file-backed fallback

容量 256 MB 是预分配上限;具体 DObject 槽按需扩展但不能超过总容量。

The 256 MB pool is preallocated; per-DObject slots grow on demand but the pool cap is hard.

二进制布局 / Binary layout

 [meta 1024 B]
   [magic: 4 B  0x90 0x97 0x92 0x96]
   [FundamentalLib version hash: 4 B]
   [process count: 1 B]
   [PID list: 32 B (8 × int32)]
   [reserved]
 [payload area ≤ 256 MB]
   slot 0:
     [next_offset: int32]
     [name_len:    int32]
     [name:        ASCII (≤ 32 B)]
     [size:        int32]
     [reserved:    int32]
     [payload:     N B]
   slot 1: same
   ...
   EOF: next_offset = 0

API

构造 / Constructor

var dobj = new DObject(string name, int defSize = 1024);
// name ≤ 32 chars; creates new slot or attaches to existing

写入 / Post

dobj.Post(byte[] payload, bool recording = false);
// 1. acquires per-DObject named Mutex ("io_mutex_<name>")
// 2. Marshal.Copy(payload) into the slot
// 3. signals per-DObject EventWaitHandle (wakes all Wait())
// 4. if recording == true, also append to .dorec file

读取 / Read

var reader = dobj.Reader(int offset, int len);
// reader is a delegate; on each invoke it:
// 1. acquires Mutex
// 2. copies [offset, offset+len) into a fresh byte[]
// 3. releases Mutex, returns the bytes

等待 / Wait

bool gotNewFrame = dobj.Wait(int timeoutMs);
// blocks on the EventWaitHandle; returns true on signal, false on timeout

并发模型 / Concurrency

  • 每 DObject 一把命名 Mutex (`io_mutex_<name>`) 保护 Post/Read 互斥。
  • 每 DObject 一个 EventWaitHandle 用于 `Wait()` 唤醒。
  • Mutex + EventWaitHandle 都是系统级原语(Windows: 命名 Mutex + AutoReset Event;Linux: FSWEventWaitHandle)。
  • 读者使用 `Wait` 阻塞到下次写入;轮询读取也 OK 但浪费 CPU。
  • Per-DObject named Mutex protects Post/Read mutual exclusion.
  • Per-DObject EventWaitHandle wakes `Wait()`.
  • Both are kernel-level primitives — cross-process safe.

版本校验 / Version safety

Meta 块的第二个 4 字节存 FundamentalLib 编译哈希。读取时若 `X.conf.EnsureFlibSame = true` 且哈希不匹配 → 抛异常。这能立刻发现"同一系统里加载了两个不同版本的 FundamentalLib"的灾难性 bug。

The meta block stores the compiling FundamentalLib's version hash. On read, if `X.conf.EnsureFlibSame = true` and the hash differs → throw. Catches the "two different FundamentalLib.dll versions loaded in one system" disaster bug.

【发现】 这是 MDCS 调试中最有用的护栏 —— 重新构建 FundamentalLib 但忘了重建消费者时,立刻抛错。 The single most useful guardrail in MDCS debugging.

录制 / Recording

`dobj.Recording = true` 时每次 `Post()` 同时把数据追加到磁盘 `.dorec` 流。 With `dobj.Recording = true` every `Post()` also appends to a disk `.dorec` stream.

格式 / Format: `[name] [DateTime.ToBinary] [RND id] [length] [payload]`。

清理 / Cleanup: 按 年龄(`RecordingKeepSeconds`)和 总磁盘大小(`RecordingDiskSizeLimitMB`)双重边界,自动滚动。 Auto-cleanup bounded by both age (`RecordingKeepSeconds`) and total disk size (`RecordingDiskSizeLimitMB`).

回放工具见 使用手册 - 数据录制与回放手册

常见 DObject 槽 / Common slot names

名字 / Name 发布者 / Producer 订阅者 / Subscriber 内容 / Payload
`Lidar2D`(或厂商后缀) Medulla 雷达插件 Detour 打包的 `LidarOutput`(点云 + tick)
`pose` Detour Clumsy, SimpleComposer 6-DoF 位姿 + 协方差 + tick
`MedullaCartLowerIO<tag>` Medulla CartActivator SimpleComposer `[AsLowerIO]` 字段 JSON
`MedullaCartUpperIO<tag>` SimpleComposer / Clumsy Medulla CartActivator `[AsUpperIO]` 字段 JSON
自定义 任意 任意 自定义二进制

使用模式 / Patterns

写者侧 / Publisher pattern

public class MyLidar : Lidar2DIOObject
{
    void Loop()
    {
        while (true)
        {
            var cloud = ReadFromDevice();
            cachedLidar = cloud;
            frame += 1;
            tick = DateTime.Now.Ticks;
            output();  // 内部调用 dobj.Post(serializedCloud)
        }
    }
}

读者侧 / Subscriber pattern

var dobj = new DObject("Lidar2D");
var reader = dobj.Reader(0, 8 * 1024);  // expected payload size
while (true)
{
    if (dobj.Wait(5000))
    {
        var bytes = reader();
        var output = LidarOutput.Deserialize(bytes);
        ProcessFrame(output.points);
    }
    else
    {
        Console.WriteLine("Lidar timeout!");
    }
}

注意事项 / Caveats

  • DObject 名 ≤ 32 字符 —— 超长会截断 / silently truncated.
  • Post 是阻塞的 —— 所有订阅者都未释放 Mutex 之前你写不进 / blocks while any subscriber holds the mutex.
  • Reader 不重试 —— 槽消失或重建时 reader 失效,需要重新 `new Reader`.
  • 跨主机不工作 —— 同主机进程间专用 / Single-host only; not designed for cross-machine.
  • 256 MB 是硬限 —— 大量大尺寸槽要监控用量 / monitor usage if many large slots.

相关页面 / See also