SimpleAgvInterface Queue机制

来自MDCS wiki2
Artheru讨论 | 贡献2026年5月16日 (六) 22:00的版本 (Initial bilingual draft (auto-published))
(差异) ←上一版本 | 最后版本 (差异) | 下一版本→ (差异)
跳到导航 跳到搜索


概述 / Overview

`SimpleAgvInterface.Queue(params Func<Task>[] actions)` 是 Clumsy 车载侧的 任务流水线机制。它把多个 异步 action组织成可串可并的执行图,使任务作者能以声明式风格写出 "巡线 → 取货 → 升叉 → 倒车"这样的业务序列。

`SimpleAgvInterface.Queue(params Func<Task>[] actions)` is the task pipeline on the Clumsy on-vehicle side. It organises multiple async actions into a serial/parallel execution graph so mission authors can write business sequences declaratively.

实现:`D:\src\Clumsy\ClumsyCore\Interfaces\SimpleAgvInterface.cs:410`。 Implementation: `D:\src\Clumsy\ClumsyCore\Interfaces\SimpleAgvInterface.cs:410`.

基本用法 / Basic usage

public void Fetch(double sx, double sy, double dx, double dy)
{
    Queue(
      async () =>
      {
          DriveTask.WaitDriveTask(new SteeringLineFollowing
          { srcX = currentX, srcY = currentY, dstX = sx, dstY = sy, basespeed = 600 }.Follow());
      },
      async () =>
      {
          DriveTask.WaitDriveTask(new AutoFetchGood
          { aX = sx, aY = sy, bX = dx, bY = dy, initSpeed = 500, stopDist = 80 }.Get());
      },
      async () =>
      {
          SetUpperIO("forkHeightTgt", 50);
          await WaitForLowerIO("forkAtTarget", 4000);
      },
      async () =>
      {
          DriveTask.WaitDriveTask(new SteeringLineFollowingReverse
          { srcX = dx, srcY = dy, dstX = sx, dstY = sy, basespeed = 200 }.ReverseFollow());
      });
}

按提交顺序,4 个 action 形成 4 (rows),框架依次执行。 The 4 actions form 4 rows; the framework executes them.

数据结构 / Data structure

内部维护两个队列(`SimpleAgvInterface.cs:308-352`): Internally there are two queues:

  • `staticTaskLs : List<List<Task>>` — 静态共享队列;每行是 `List<Task>`。
  • `taskLs : List<List<Task>>` — 当前实例的队列;可与 static 共享或独立。
  • `queueInfo : List<SaiInfo>` — 每行的状态字符串、提交线程 ID、异常上下文。

每次 `Queue(actions)` 调用 : Each `Queue(actions)` call:

  1. 把 `actions[i]` ContinueWith 到 `taskLs[i]` 的最后一个 Task 后面。
  2. `actions[i]` 内部 `await taskLs[i-1].Last()` —— 即等上一 最新提交的 action 完成。
  3. 注册到 `queueInfo[i]`。
  1. Chain `actions[i]` after `taskLs[i].Last()` (the previous action in the same row).
  2. Inside `actions[i]`, `await taskLs[i-1].Last()` (the previous row's latest action).
  3. Register in `queueInfo[i]`.

"超标量"实际行为 / The "super-scalar" reality

【发现】 当前实现 没有真正的跨行并发。第 i 行的 action 必须等第 i-1 行的 最新提交完成,所以多行 Queue 调用实际上是 串行执行。"超标量流水线"是设计目标,不是当前实现。 【发现】 The current implementation does NOT actually run rows in parallel. Row i awaits row i-1's most recent Task, so multi-row Queue calls execute sequentially. The "super-scalar pipeline" is design aspiration, not current behaviour.

也就是说 : In practice:

  • 单次 `Queue(a, b, c)` → a, b, c 串行(b 等 a 完成,c 等 b 完成)。
  • 连续 `Queue(a, b)`; `Queue(c, d)` → a → b → c → d 串行;c 等 b 完成(同行);d 等 c 完成(同行)。
  • 并发要靠 action 内 `Task.WhenAll` —— 若你想 a + b 同时开始,自己在 a 内 `Task.WhenAll(taskA, taskB)`。
  • `Queue(a, b, c)` runs a → b → c sequentially.
  • Repeated `Queue` calls also serialise via row chaining.
  • For concurrency within an action, use `Task.WhenAll` explicitly.

`WaitAsync` / `Wait`

等待指定行完成: Wait for a specific row to finish:

await agv.WaitAsync(pipe: 2);  // wait for row 2's latest task
// or sync
agv.Wait(pipe: 0);

`TryLock` / `Leave`

交管 的两种执行路径,由 `useSimpleCallTraffic` 开关决定(`SimpleAgvInterface.cs:40`):

Traffic locks have two paths controlled by `useSimpleCallTraffic`:

同步内置 / In-process synchronous

当 `useSimpleCallTraffic = true`(同一进程跑调度内核 + 车载):

  • `TryLock(siteID, route_id)` 轮询 `PilotBase.latestLock == siteID`(`SimpleAgvInterface.cs:124`)
  • `Leave(siteID)` 直接更新本地 `PilotBase.leaves`

HTTP RPC

跨进程时(车载与调度内核不在同一进程):

  • `TryLock` → POST `http://{host}:{port}/trylock` body `{carid, siteid}`(line 170)
  • `Leave` → GET `/leave?carid={id}&siteid={siteID}`(line 260)

异常处理 / Error handling

  • Action 内异常 → 写到 `queueInfo[i].exception`;该行后续 action 跳过;其它行继续。
  • 调度可通过 `Flush()` 显式终结队列;之后 `Queue(...)` 抛 `_pipelineError`(line 112)。
  • 任务级失败上报靠 任务脚本编译时插入的 try/catch + Hedingben toast。
  • Exceptions in actions → recorded in `queueInfo[i].exception`; subsequent actions in that row skipped; other rows continue.
  • `Flush()` ends the queue; later `Queue(...)` throws `_pipelineError`.

实战模式 / Patterns

顺序业务动作 / Sequential business action

最常见。每个 action 是一段独立的 检查 → 行驶 → 等硬件微操。 Most common. Each action is a self-contained "check → drive → wait hw" micro-op.

锁先于动作 / Lock-then-act

Queue(
  async () => { if (!TryLock(targetSite, routeId)) return; /* signal failure */ },
  async () => { /* the actual drive */ },
  async () => Leave(prevSite)
);

设备 IO 等待 / Wait-for-IO

Queue(
  async () => { SetUpperIO("jackTarget", 80); },
  async () => { await WaitForLowerIO("jackAtTarget", timeoutMs: 4000); }
);

错误恢复 / Error recovery

Action 内部 try/catch;失败时把 cart 切到 "故障"状态并 toast:

Queue(
  async () =>
  {
      try { await DoFragileThing(); }
      catch (Exception e)
      {
          Hedingben.ToastText($"Action failed: {e.Message}", "AGV");
          DriveStop();
          throw;
      }
  }
);

调度侧 AGVInterface / Fleet-side AGVInterface

SimpleCore 端的 `AGVInterface` (`D:\src\Simple\SimpleCore\BasicProps\AGVInterface.cs`) 提供 同款 Queue 机制,用于自评估车 :调度直接在自己进程里跑 Queue 来翻译 TopazScript。详见 如何基于SimpleCore核心库进行调度系统开发

The fleet-side `AGVInterface` provides the same Queue mechanism for self-evaluating cars (scheduler runs Queue locally to translate TopazScript).

相关页面 / See also