問題:對話被派工阻塞
我的 Bot(一見生財)採用甲方外包制:主意識(CTO)需要一邊派工給背景 agent 團隊,一邊和老闆(Arc)聊天。
但有個卡點:Claude Code CLI session 的 busy lock。
1 2 3 4 5 6 7
| 主對話(Telegram) ├─ 派工給 explorer: "研究 XXX" │ └─ [✗] busyPromise locked │ └─ 對話卡住,直到 CLI session 完全結束 │ └─ 新訊息從老闆進來 └─ [✗] 進不了 per-chat queue(前一個 CLI 還在忙)
|
即使用 run_in_background: true,也只是 Claude Code 內部非同步,不釋放 CLI session。CTO 仍然得等。
根因:per-chat queue 的序列化
Telegram bot 的架構中,每個聊天室都有一個 ChatState:
1 2 3 4
| interface ChatState { processing: boolean; buffer: Telegram.Update[]; }
|
新訊息進來時,若 processing=true,就進 buffer。而 processing 只在 Claude Code CLI session 完全結束(exit code 0、1 或 42)時才變 false。
這是安全的設計(避免 CLI 衝突),但代價是:派工不能非同步。
設計亮點:Filesystem 作為 IPC
我的解決方案很簡單:MCP server 直接寫 queue.json,不經過主 session 的 CLI 層。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
server.tool('dispatch_task', '...', { agentName, prompt, priority }, async ({ agentName, prompt, priority }) => { const agentPath = join(SOUL_DIR, 'agents', `${agentName}.json`); const cfg = JSON.parse(await readFile(agentPath, 'utf-8')); if (cfg.enabled === false) return error;
let queue = JSON.parse(await readFile(QUEUE_PATH, 'utf-8'));
const isDupe = queue.tasks.some( t => t.agentName === agentName && t.prompt === prompt && (t.status === 'pending' || t.status === 'running') ); if (isDupe) return warn("task already queued");
const task = { id: randomUUID(), agentName, prompt, status: 'pending', priority: Math.max(1, Math.min(10, priority ?? 5)), createdAt: new Date().toISOString(), startedAt: null, completedAt: null, workerId: null, result: null, error: null, costUsd: 0, duration: 0, }; queue.tasks.push(task);
await atomicWrite(QUEUE_PATH, JSON.stringify(queue, null, 2));
await writeFile(DISPATCH_SIGNAL, new Date().toISOString());
return success(`Task dispatched: ${taskId.slice(0, 8)}`); } );
|
核心:Filesystem-based IPC
- MCP server(獨立 process)無法直接呼叫主 bot 的
enqueueTask()
- 解法:直接寫
queue.json(atomic write)+ 寫 signal file .dispatch
- Pattern 驗證:沿用現有的
.rebuild signal file(skill hot-reload 已驗證)
Worker Scheduler 的 10 秒 Polling Loop
主 bot 的 worker-scheduler 不斷監聽:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
const DISPATCH_SIGNAL = join(SOUL_DIR, 'agent-tasks', '.dispatch');
export function startWorkerScheduler(): void {
dispatchPollTimer = setInterval(() => { checkDispatchSignal().catch(() => {}); }, 10_000); }
async function checkDispatchSignal(): Promise<void> { try { await stat(DISPATCH_SIGNAL);
try { await unlink(DISPATCH_SIGNAL); } catch { } await logger.info('WorkerScheduler', 'Dispatch signal detected, processing queue...'); await processQueue(); } catch { } }
|
流程:
- MCP tool 寫
queue.json 和 .dispatch signal
- 10 秒內,worker-scheduler 的 polling loop 偵測到 signal
- 立即呼叫
processQueue() — 選取優先度最高的待處理 task
- 分配給空閒 worker(最多 8 個並行)
- 非同步執行,主對話不被阻塞
Before vs After
Before(沒有 dispatch_task)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| [CTO] "派工:研究 AI 新聞" ↓ [Telegram] 觸發 Claude Code CLI ↓ [Queue] processing = true 🔒 ↓ [老闆] "嘿,我想聊天" ↓ [Queue] buffer 累積 ↓ [CLI] 執行完畢,exit code 0 ↓ [Queue] processing = false 🔓 ↓ [老闆] 終於可以收到回覆
⏱️ 延遲:2-30 秒(取決於派工任務的複雜度)
|
After(用 dispatch_task)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| [CTO] "派工:研究 AI 新聞" ↓ [MCP Tool] dispatch_task() ├─ 驗證 agent ├─ 寫 queue.json (atomic) ├─ 寫 .dispatch signal └─ 【立即返回 task ID】 ↓ [Telegram] MCP tool 結果推送給用戶 ↓ [老闆] "嘿,我想聊天" ↓ [Queue] processing = true(主對話的 CLI)✓ 不受影響 ↓ [CTO] 立即回應老闆 ↓ [Worker Scheduler] 10 秒內偵測 .dispatch signal ↓ [Worker] 獨立 CLI 執行派工任務
⏱️ 延遲:< 500ms(MCP 工具調用 + 檔案寫入)
|
實現細節
Atomic Write(防止並行寫入衝突)
1 2 3 4 5 6 7 8 9 10 11
| async function atomicWrite(fullPath: string, content: string): Promise<void> { const dir = dirname(fullPath); await mkdir(dir, { recursive: true });
const tmpPath = join(dir, `.tmp-${randomUUID()}`); await writeFile(tmpPath, content, 'utf-8');
await rename(tmpPath, fullPath); }
|
為什麼:queue.json 可能被多個 worker 同時讀取。寫入時必須確保:
- 檔案要麼是舊狀態,要麼是完整新狀態
- 不能有「正在寫入」的中間狀態
Dedup 檢查(防止重複任務)
1 2 3 4 5 6 7
| const isDupe = queue.tasks.some( t => t.agentName === agentName && t.prompt === prompt && (t.status === 'pending' || t.status === 'running') ); if (isDupe) { return warn("Task already queued for this agent with same prompt"); }
|
為什麼:使用者可能意外呼叫 dispatch_task 兩次,或 MCP 工具被重試。Dedup 確保一次派工只產生一個 task。
優先度排序 + 成本預算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
const pendingTasks = queue.tasks .filter(t => t.status === 'pending') .sort((a, b) => { if (b.priority !== a.priority) return b.priority - a.priority; return a.createdAt.localeCompare(b.createdAt); });
for (const task of pendingTasks) { const workerId = findFreeWorker(); if (!workerId) break;
if (!(await reserveBudget(task.agentName, ESTIMATED_COST))) { continue; }
executeTask(task, workerId); }
|
使用方式
從 Claude Code 內部,CTO 可以這樣派工:
1 2 3 4 5 6 7 8 9
| result = mcp.dispatch_task( agent_name="deep-researcher", prompt="調查 Cloudflare Workers 的最新功能", priority=7 )
|
或者從其他 agent 呼叫:
1 2 3 4 5 6 7 8 9
| import { enqueueTask } from '../agents/worker-scheduler.js';
const taskId = await enqueueTask( 'blog-writer', '基於以下研究資料撰寫部落格文章...', priority = 8 );
|
架構優勢
| 面向 |
優勢 |
| 延遲 |
從 2-30 秒 → < 500ms |
| 並行度 |
主對話 + 最多 8 個 worker 完全獨立 |
| 容錯 |
Filesystem IPC 比進程通訊更穩定;signal file 掉了不會丟任務 |
| 可觀測性 |
queue.json + history.jsonl 可視化所有任務 |
| 成本控制 |
每個 agent 有日額度 + 每個 task 有估算成本檢查 |
侷限與後續
現況:
- Signal file polling 是 10 秒間隔(足夠響應,但不是毫秒級)
- 超過 8 個任務只能排隊等待 worker 空閒
可能的改進:
- 用 inotify(Linux)/ FSEvents(macOS)取代 polling(但增加系統複雜度)
- 整合 Redis pub/sub 作為分散式 IPC(適合多機部署)
結論
dispatch_task 是一個簡單但強大的設計模式:利用 Filesystem 作為跨進程通訊層,規避 CLI session 的 busy lock。
它體現了一個更大的原則:不要讓工具的限制成為架構的限制。CLI 有並行限制?那就讓 MCP server 繞過它,用更低層的 IPC 機制。
對主意識(CTO)而言:現在派工就像發一個 Telegram 訊息一樣快。後台 agent 安靜地幹活,不打擾對話流。
這就是服務,不是侍者。
本文技術棧:TypeScript + Node.js + MCP + Filesystem IPC
載入留言中...