SimpleAgvInterface Queue机制
概述 / Overview
`SimpleAgvInterface.Queue(params Func<Task>[] actions)` 是 Clumsy 车载侧的 任务流水线机制。它把多个 异步 action组织成可串可并的执行图,使任务作者能以声明式风格写出 "巡线 → 取货 → 升叉 → 倒车"这样的业务序列。
`SimpleAgvInterface.Queue(params Func<Task>[] actions)` is the task pipeline on the Clumsy on-vehicle side. It organises multiple async actions into a serial/parallel execution graph so mission authors can write business sequences declaratively.
实现:`D:\src\Clumsy\ClumsyCore\Interfaces\SimpleAgvInterface.cs:410`。 Implementation: `D:\src\Clumsy\ClumsyCore\Interfaces\SimpleAgvInterface.cs:410`.
基本用法 / Basic usage
public void Fetch(double sx, double sy, double dx, double dy)
{
Queue(
async () =>
{
DriveTask.WaitDriveTask(new SteeringLineFollowing
{ srcX = currentX, srcY = currentY, dstX = sx, dstY = sy, basespeed = 600 }.Follow());
},
async () =>
{
DriveTask.WaitDriveTask(new AutoFetchGood
{ aX = sx, aY = sy, bX = dx, bY = dy, initSpeed = 500, stopDist = 80 }.Get());
},
async () =>
{
SetUpperIO("forkHeightTgt", 50);
await WaitForLowerIO("forkAtTarget", 4000);
},
async () =>
{
DriveTask.WaitDriveTask(new SteeringLineFollowingReverse
{ srcX = dx, srcY = dy, dstX = sx, dstY = sy, basespeed = 200 }.ReverseFollow());
});
}
按提交顺序,4 个 action 形成 4 行(rows),框架依次执行。 The 4 actions form 4 rows; the framework executes them.
数据结构 / Data structure
内部维护两个队列(`SimpleAgvInterface.cs:308-352`): Internally there are two queues:
- `staticTaskLs : List<List<Task>>` — 静态共享队列;每行是 `List<Task>`。
- `taskLs : List<List<Task>>` — 当前实例的队列;可与 static 共享或独立。
- `queueInfo : List<SaiInfo>` — 每行的状态字符串、提交线程 ID、异常上下文。
每次 `Queue(actions)` 调用 : Each `Queue(actions)` call:
- 把 `actions[i]` ContinueWith 到 `taskLs[i]` 的最后一个 Task 后面。
- `actions[i]` 内部 `await taskLs[i-1].Last()` —— 即等上一 行最新提交的 action 完成。
- 注册到 `queueInfo[i]`。
- Chain `actions[i]` after `taskLs[i].Last()` (the previous action in the same row).
- Inside `actions[i]`, `await taskLs[i-1].Last()` (the previous row's latest action).
- Register in `queueInfo[i]`.
"超标量"实际行为 / The "super-scalar" reality
【发现】 当前实现 没有真正的跨行并发。第 i 行的 action 必须等第 i-1 行的 最新提交完成,所以多行 Queue 调用实际上是 串行执行。"超标量流水线"是设计目标,不是当前实现。 【发现】 The current implementation does NOT actually run rows in parallel. Row i awaits row i-1's most recent Task, so multi-row Queue calls execute sequentially. The "super-scalar pipeline" is design aspiration, not current behaviour.
也就是说 : In practice:
- 单次 `Queue(a, b, c)` → a, b, c 串行(b 等 a 完成,c 等 b 完成)。
- 连续 `Queue(a, b)`; `Queue(c, d)` → a → b → c → d 串行;c 等 b 完成(同行);d 等 c 完成(同行)。
- 并发要靠 action 内 `Task.WhenAll` —— 若你想 a + b 同时开始,自己在 a 内 `Task.WhenAll(taskA, taskB)`。
- `Queue(a, b, c)` runs a → b → c sequentially.
- Repeated `Queue` calls also serialise via row chaining.
- For concurrency within an action, use `Task.WhenAll` explicitly.
`WaitAsync` / `Wait`
等待指定行完成: Wait for a specific row to finish:
await agv.WaitAsync(pipe: 2); // wait for row 2's latest task
// or sync
agv.Wait(pipe: 0);
`TryLock` / `Leave`
交管 锁的两种执行路径,由 `useSimpleCallTraffic` 开关决定(`SimpleAgvInterface.cs:40`):
Traffic locks have two paths controlled by `useSimpleCallTraffic`:
同步内置 / In-process synchronous
当 `useSimpleCallTraffic = true`(同一进程跑调度内核 + 车载):
- `TryLock(siteID, route_id)` 轮询 `PilotBase.latestLock == siteID`(`SimpleAgvInterface.cs:124`)
- `Leave(siteID)` 直接更新本地 `PilotBase.leaves`
HTTP RPC
跨进程时(车载与调度内核不在同一进程):
- `TryLock` → POST `http://{host}:{port}/trylock` body `{carid, siteid}`(line 170)
- `Leave` → GET `/leave?carid={id}&siteid={siteID}`(line 260)
异常处理 / Error handling
- Action 内异常 → 写到 `queueInfo[i].exception`;该行后续 action 跳过;其它行继续。
- 调度可通过 `Flush()` 显式终结队列;之后 `Queue(...)` 抛 `_pipelineError`(line 112)。
- 任务级失败上报靠 任务脚本编译时插入的 try/catch + Hedingben toast。
- Exceptions in actions → recorded in `queueInfo[i].exception`; subsequent actions in that row skipped; other rows continue.
- `Flush()` ends the queue; later `Queue(...)` throws `_pipelineError`.
实战模式 / Patterns
顺序业务动作 / Sequential business action
最常见。每个 action 是一段独立的 检查 → 行驶 → 等硬件微操。 Most common. Each action is a self-contained "check → drive → wait hw" micro-op.
锁先于动作 / Lock-then-act
Queue(
async () => { if (!TryLock(targetSite, routeId)) return; /* signal failure */ },
async () => { /* the actual drive */ },
async () => Leave(prevSite)
);
设备 IO 等待 / Wait-for-IO
Queue(
async () => { SetUpperIO("jackTarget", 80); },
async () => { await WaitForLowerIO("jackAtTarget", timeoutMs: 4000); }
);
错误恢复 / Error recovery
Action 内部 try/catch;失败时把 cart 切到 "故障"状态并 toast:
Queue(
async () =>
{
try { await DoFragileThing(); }
catch (Exception e)
{
Hedingben.ToastText($"Action failed: {e.Message}", "AGV");
DriveStop();
throw;
}
}
);
调度侧 AGVInterface / Fleet-side AGVInterface
SimpleCore 端的 `AGVInterface` (`D:\src\Simple\SimpleCore\BasicProps\AGVInterface.cs`) 提供 同款 Queue 机制,用于自评估车 :调度直接在自己进程里跑 Queue 来翻译 TopazScript。详见 如何基于SimpleCore核心库进行调度系统开发。
The fleet-side `AGVInterface` provides the same Queue mechanism for self-evaluating cars (scheduler runs Queue locally to translate TopazScript).