DObject共享内存协议
概述 / Overview
DObject 是 MDCS 的跨进程 命名共享内存 pub/sub 通讯原语。Medulla 把传感器数据、上下位 IO、调度状态等都通过 DObject 发布;Detour / Clumsy / SimpleComposer / 监控工具按名字订阅。整个 MDCS 的进程间通讯只有这一条总线。
DObject is MDCS's cross-process named-shared-memory pub/sub primitive. Medulla publishes sensor data, upper/lower IO, scheduler state via DObject; Detour / Clumsy / SimpleComposer / monitoring tools subscribe by name. It's the sole IPC bus in MDCS.
实现:`D:\src\Fundamentals\FundamentalLib\DObject.cs:18-344`。 Implementation: `D:\src\Fundamentals\FundamentalLib\DObject.cs:18-344`.
存储 / Storage
| 平台 / Platform | 后端 / Backing |
|---|---|
| Windows | `MemoryMappedFile("io_shared", 256 MB)` + meta `MemoryMappedFile("io_shared_meta", 1024 B)` |
| Linux | `$TMPDIR/io_shared` 文件 / file-backed fallback |
容量 256 MB 是预分配上限;具体 DObject 槽按需扩展但不能超过总容量。
The 256 MB pool is preallocated; per-DObject slots grow on demand but the pool cap is hard.
二进制布局 / Binary layout
[meta 1024 B] [magic: 4 B 0x90 0x97 0x92 0x96] [FundamentalLib version hash: 4 B] [process count: 1 B] [PID list: 32 B (8 × int32)] [reserved]
[payload area ≤ 256 MB]
slot 0:
[next_offset: int32]
[name_len: int32]
[name: ASCII (≤ 32 B)]
[size: int32]
[reserved: int32]
[payload: N B]
slot 1: same
...
EOF: next_offset = 0
API
构造 / Constructor
var dobj = new DObject(string name, int defSize = 1024);
// name ≤ 32 chars; creates new slot or attaches to existing
写入 / Post
dobj.Post(byte[] payload, bool recording = false);
// 1. acquires per-DObject named Mutex ("io_mutex_<name>")
// 2. Marshal.Copy(payload) into the slot
// 3. signals per-DObject EventWaitHandle (wakes all Wait())
// 4. if recording == true, also append to .dorec file
读取 / Read
var reader = dobj.Reader(int offset, int len);
// reader is a delegate; on each invoke it:
// 1. acquires Mutex
// 2. copies [offset, offset+len) into a fresh byte[]
// 3. releases Mutex, returns the bytes
等待 / Wait
bool gotNewFrame = dobj.Wait(int timeoutMs);
// blocks on the EventWaitHandle; returns true on signal, false on timeout
并发模型 / Concurrency
- 每 DObject 一把命名 Mutex (`io_mutex_<name>`) 保护 Post/Read 互斥。
- 每 DObject 一个 EventWaitHandle 用于 `Wait()` 唤醒。
- Mutex + EventWaitHandle 都是系统级原语(Windows: 命名 Mutex + AutoReset Event;Linux: FSWEventWaitHandle)。
- 读者使用 `Wait` 阻塞到下次写入;轮询读取也 OK 但浪费 CPU。
- Per-DObject named Mutex protects Post/Read mutual exclusion.
- Per-DObject EventWaitHandle wakes `Wait()`.
- Both are kernel-level primitives — cross-process safe.
版本校验 / Version safety
Meta 块的第二个 4 字节存 FundamentalLib 编译哈希。读取时若 `X.conf.EnsureFlibSame = true` 且哈希不匹配 → 抛异常。这能立刻发现"同一系统里加载了两个不同版本的 FundamentalLib"的灾难性 bug。
The meta block stores the compiling FundamentalLib's version hash. On read, if `X.conf.EnsureFlibSame = true` and the hash differs → throw. Catches the "two different FundamentalLib.dll versions loaded in one system" disaster bug.
【发现】 这是 MDCS 调试中最有用的护栏 —— 重新构建 FundamentalLib 但忘了重建消费者时,立刻抛错。 The single most useful guardrail in MDCS debugging.
录制 / Recording
`dobj.Recording = true` 时每次 `Post()` 同时把数据追加到磁盘 `.dorec` 流。 With `dobj.Recording = true` every `Post()` also appends to a disk `.dorec` stream.
格式 / Format: `[name] [DateTime.ToBinary] [RND id] [length] [payload]`。
清理 / Cleanup: 按 年龄(`RecordingKeepSeconds`)和 总磁盘大小(`RecordingDiskSizeLimitMB`)双重边界,自动滚动。 Auto-cleanup bounded by both age (`RecordingKeepSeconds`) and total disk size (`RecordingDiskSizeLimitMB`).
回放工具见 使用手册 - 数据录制与回放手册。
常见 DObject 槽 / Common slot names
| 名字 / Name | 发布者 / Producer | 订阅者 / Subscriber | 内容 / Payload |
|---|---|---|---|
| `Lidar2D`(或厂商后缀) | Medulla 雷达插件 | Detour | 打包的 `LidarOutput`(点云 + tick) |
| `pose` | Detour | Clumsy, SimpleComposer | 6-DoF 位姿 + 协方差 + tick |
| `MedullaCartLowerIO<tag>` | Medulla CartActivator | SimpleComposer | `[AsLowerIO]` 字段 JSON |
| `MedullaCartUpperIO<tag>` | SimpleComposer / Clumsy | Medulla CartActivator | `[AsUpperIO]` 字段 JSON |
| 自定义 | 任意 | 任意 | 自定义二进制 |
使用模式 / Patterns
写者侧 / Publisher pattern
public class MyLidar : Lidar2DIOObject
{
void Loop()
{
while (true)
{
var cloud = ReadFromDevice();
cachedLidar = cloud;
frame += 1;
tick = DateTime.Now.Ticks;
output(); // 内部调用 dobj.Post(serializedCloud)
}
}
}
读者侧 / Subscriber pattern
var dobj = new DObject("Lidar2D");
var reader = dobj.Reader(0, 8 * 1024); // expected payload size
while (true)
{
if (dobj.Wait(5000))
{
var bytes = reader();
var output = LidarOutput.Deserialize(bytes);
ProcessFrame(output.points);
}
else
{
Console.WriteLine("Lidar timeout!");
}
}
注意事项 / Caveats
- DObject 名 ≤ 32 字符 —— 超长会截断 / silently truncated.
- Post 是阻塞的 —— 所有订阅者都未释放 Mutex 之前你写不进 / blocks while any subscriber holds the mutex.
- Reader 不重试 —— 槽消失或重建时 reader 失效,需要重新 `new Reader`.
- 跨主机不工作 —— 同主机进程间专用 / Single-host only; not designed for cross-machine.
- 256 MB 是硬限 —— 大量大尺寸槽要监控用量 / monitor usage if many large slots.