import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js' import { CCRClient } from '../cli/transports/ccrClient.js' import type { HybridTransport } from '../cli/transports/HybridTransport.js' import { SSETransport } from '../cli/transports/SSETransport.js' import { logForDebugging } from '../utils/debug.js' import { errorMessage } from '../utils/errors.js' import { updateSessionIngressAuthToken } from '../utils/sessionIngressAuth.js' import type { SessionState } from '../utils/sessionState.js' import { registerWorker } from './workSecret.js' /** * Transport abstraction for replBridge. Covers exactly the surface that * replBridge.ts uses against HybridTransport so the v1/v2 choice is * confined to the construction site. * * - v1: HybridTransport (WS reads + POST writes to Session-Ingress) * - v2: SSETransport (reads) + CCRClient (writes to CCR v2 /worker/*) * * The v2 write path goes through CCRClient.writeEvent → SerialBatchEventUploader, * NOT through SSETransport.write() — SSETransport.write() targets the * Session-Ingress POST URL shape, which is wrong for CCR v2. * replBridge 的传输抽象。覆盖 replBridge.ts 针对 HybridTransport 使用的 * 确切表面,以便 v1/v2 选择被限制在构造 site。 * * - v1: HybridTransport(WS 读取 + POST 写入 Session-Ingress) * - v2: SSETransport(读取)+ CCRClient(写入 CCR v2 /worker/*) * * v2 写入路径通过 CCRClient.writeEvent → SerialBatchEventUploader, * 不是通过 SSETransport.write() — SSETransport.write() 面向 * Session-Ingress POST URL 形状,这对 CCR v2 是错误的。 */ export type ReplBridgeTransport = { write(message: StdoutMessage): Promise writeBatch(messages: StdoutMessage[]): Promise close(): void isConnectedStatus(): boolean getStateLabel(): string setOnData(callback: (data: string) => void): void setOnClose(callback: (closeCode?: number) => void): void setOnConnect(callback: () => void): void connect(): void /** * High-water mark of the underlying read stream's event sequence numbers. * replBridge reads this before swapping transports so the new one can * resume from where the old one left off (otherwise the server replays * the entire session history from seq 0). * * v1 returns 0 — Session-Ingress WS doesn't use SSE sequence numbers; * replay-on-reconnect is handled by the server-side message cursor. * 底层读取流的事件序列号的高水位标记。 * replBridge 在交换传输前读取此值,以便新的可以从旧的停止处继续 *(否则服务器从 seq 0 重放整个会话历史)。 * * v1 返回 0 — Session-Ingress WS 不使用 SSE 序列号; * 重连时的重放由服务器端消息游标处理。 */ getLastSequenceNum(): number /** * Monotonic count of batches dropped via maxConsecutiveFailures. * Snapshot before writeBatch() and compare after to detect silent drops * (writeBatch() resolves normally even when batches were dropped). * v2 returns 0 — the v2 write path doesn't set maxConsecutiveFailures. * 通过 maxConsecutiveFailures 丢弃的批次单调计数。 * 在 writeBatch() 之前快照并在之后比较以检测静默丢弃 *(即使批次被丢弃,writeBatch() 也会正常解决)。 * v2 返回 0 — v2 写入路径不设置 maxConsecutiveFailures。 */ readonly droppedBatchCount: number /** * PUT /worker state (v2 only; v1 is a no-op). `requires_action` tells * the backend a permission prompt is pending — claude.ai shows the * "waiting for input" indicator. REPL/daemon callers don't need this * (user watches the REPL locally); multi-session worker callers do. * PUT /worker state(仅限 v2;v1 是无操作)。`requires_action` 告诉 * 后端权限提示正在等待 — claude.ai 显示"waiting for input"指示器。 * REPL/daemon 调用者不需要这个(用户在本地观看 REPL); * 多会话 worker 调用者需要。 */ reportState(state: SessionState): void /** PUT /worker external_metadata (v2 only; v1 is a no-op). */ /** PUT /worker external_metadata(仅限 v2;v1 是无操作)。 */ reportMetadata(metadata: Record): void /** * POST /worker/events/{id}/delivery (v2 only; v1 is a no-op). Populates * CCR's processing_at/processed_at columns. `received` is auto-fired by * CCRClient on every SSE frame and is not exposed here. * POST /worker/events/{id}/delivery(仅限 v2;v1 是无操作)。填充 * CCR 的 processing_at/processed_at 列。`received` 由 CCRClient * 在每个 SSE 帧上自动触发,不在这里暴露。 */ reportDelivery(eventId: string, status: 'processing' | 'processed'): void /** * Drain the write queue before close() (v2 only; v1 resolves * immediately — HybridTransport POSTs are already awaited per-write). * 在 close() 之前排空写入队列(仅限 v2;v1 立即解决 — * HybridTransport POST 已经按写入等待)。 */ flush(): Promise } /** * v1 adapter: HybridTransport already has the full surface (it extends * WebSocketTransport which has setOnConnect + getStateLabel). This is a * no-op wrapper that exists only so replBridge's `transport` variable * has a single type. * v1 适配器:HybridTransport 已经有完整的表面(它扩展了 * 有 setOnConnect + getStateLabel 的 WebSocketTransport)。这是一个 * 无操作包装器,仅存在以便 replBridge 的 `transport` 变量 * 有单一类型。 */ export function createV1ReplTransport( hybrid: HybridTransport, ): ReplBridgeTransport { return { write: msg => hybrid.write(msg), writeBatch: msgs => hybrid.writeBatch(msgs), close: () => hybrid.close(), isConnectedStatus: () => hybrid.isConnectedStatus(), getStateLabel: () => hybrid.getStateLabel(), setOnData: cb => hybrid.setOnData(cb), setOnClose: cb => hybrid.setOnClose(cb), setOnConnect: cb => hybrid.setOnConnect(cb), connect: () => void hybrid.connect(), // v1 Session-Ingress WS doesn't use SSE sequence numbers; replay // semantics are different. Always return 0 so the seq-num carryover // logic in replBridge is a no-op for v1. // v1 Session-Ingress WS 不使用 SSE 序列号;重放语义不同。 // 始终返回 0,以便 replBridge 中的 seq-num 延续逻辑对 v1 无操作。 getLastSequenceNum: () => 0, get droppedBatchCount() { return hybrid.droppedBatchCount }, reportState: () => {}, reportMetadata: () => {}, reportDelivery: () => {}, flush: () => Promise.resolve(), } } /** * v2 adapter: wrap SSETransport (reads) + CCRClient (writes, heartbeat, * state, delivery tracking). * * Auth: v2 endpoints validate the JWT's session_id claim (register_worker.go:32) * and worker role (environment_auth.py:856). OAuth tokens have neither. * This is the inverse of the v1 replBridge path, which deliberately uses OAuth. * The JWT is refreshed when the poll loop re-dispatches work — the caller * invokes createV2ReplTransport again with the fresh token. * * Registration happens here (not in the caller) so the entire v2 handshake * is one async step. registerWorker failure propagates — replBridge will * catch it and stay on the poll loop. * v2 适配器:包装 SSETransport(读取)+ CCRClient(写入、heartbeat、 * 状态、交付跟踪)。 * * Auth:v2 endpoints 验证 JWT 的 session_id claim(register_worker.go:32) * 和 worker 角色(environment_auth.py:856)。OAuth tokens 两者都没有。 * 这与 v1 replBridge 路径相反,后者故意使用 OAuth。 * 当 poll 循环重新分配工作时 JWT 被刷新 — 调用者 * 使用新的 token 再次调用 createV2ReplTransport。 * * 注册在这里发生(不在调用者中),以便整个 v2 握手是一个异步步骤。 * registerWorker 失败会传播 — replBridge 会捕获它并留在 poll 循环上。 */ export async function createV2ReplTransport(opts: { sessionUrl: string ingressToken: string sessionId: string /** * SSE sequence-number high-water mark from the previous transport. * Passed to the new SSETransport so its first connect() sends * from_sequence_num / Last-Event-ID and the server resumes from where * the old stream left off. Without this, every transport swap asks the * server to replay the entire session history from seq 0. * 来自先前传输的 SSE 序列号高水位标记。 * 传递给新的 SSETransport 以便其第一次 connect() 发送 * from_sequence_num / Last-Event-ID,服务器从旧流停止处继续。 * 没有这个,每次传输交换都请求服务器从 seq 0 重放整个会话历史。 */ initialSequenceNum?: number /** * Worker epoch from POST /bridge response. When provided, the server * already bumped epoch (the /bridge call IS the register — see server * PR #293280). When omitted (v1 CCR-v2 path via replBridge.ts poll loop), * call registerWorker as before. * 来自 POST /bridge 响应的 Worker epoch。当提供时,服务器 * 已经增加了 epoch(/bridge 调用就是注册 — 见服务器 * PR #293280)。当省略时(通过 replBridge.ts poll 循环的 v1 CCR-v2 路径), * 像以前一样调用 registerWorker。 */ epoch?: number /** CCRClient heartbeat interval. Defaults to 20s when omitted. */ /** CCRClient heartbeat 间隔。省略时默认为 20s。 */ heartbeatIntervalMs?: number /** ±fraction per-beat jitter. Defaults to 0 (no jitter) when omitted. */ /** 每次心跳的 ±fraction jitter。省略时默认为 0(无 jitter)。 */ heartbeatJitterFraction?: number /** * When true, skip opening the SSE read stream — only the CCRClient write * path is activated. Use for mirror-mode attachments that forward events * but never receive inbound prompts or control requests. * 当为 true 时,跳过打开 SSE 读取流 — 仅激活 CCRClient 写入 * 路径。用于转发事件但从不接收入站 prompts 或控制请求的 * 镜像模式附件。 */ outboundOnly?: boolean /** * Per-instance auth header source. When provided, CCRClient + SSETransport * read auth from this closure instead of the process-wide * CLAUDE_CODE_SESSION_ACCESS_TOKEN env var. Required for callers managing * multiple concurrent sessions — the env-var path stomps across sessions. * When omitted, falls back to the env var (single-session callers). * 每实例 auth header 源。当提供时,CCRClient + SSETransport * 从这个闭包读取 auth,而不是进程范围的 * CLAUDE_CODE_SESSION_ACCESS_TOKEN env var。需要用于管理 * 多个并发会话的调用者 — env-var 路径会踩过会话。 * 当省略时,回退到 env var(单会话调用者)。 */ getAuthToken?: () => string | undefined }): Promise { const { sessionUrl, ingressToken, sessionId, initialSequenceNum, getAuthToken, } = opts // Auth header builder. If getAuthToken is provided, read from it // (per-instance, multi-session safe). Otherwise write ingressToken to // the process-wide env var (legacy single-session path — CCRClient's // default getAuthHeaders reads it via getSessionIngressAuthHeaders). // Auth header 构建器。如果提供了 getAuthToken,从它读取 //(每实例,多会话安全)。否则将 ingressToken 写入 // 进程范围的 env var(遗留单会话路径 — CCRClient 的 // 默认 getAuthHeaders 通过 getSessionIngressAuthHeaders 读取它)。 let getAuthHeaders: (() => Record) | undefined if (getAuthToken) { getAuthHeaders = (): Record => { const token = getAuthToken() if (!token) return {} return { Authorization: `Bearer ${token}` } } } else { // CCRClient.request() and SSETransport.connect() both read auth via // getSessionIngressAuthHeaders() → this env var. Set it before either // touches the network. // CCRClient.request() 和 SSETransport.connect() 都通过 // getSessionIngressAuthHeaders() → 这个 env var 读取 auth。在任一者 // 接触网络之前设置它。 updateSessionIngressAuthToken(ingressToken) } const epoch = opts.epoch ?? (await registerWorker(sessionUrl, ingressToken)) logForDebugging( `[bridge:repl] CCR v2: worker sessionId=${sessionId} epoch=${epoch}${opts.epoch !== undefined ? ' (from /bridge)' : ' (via registerWorker)'}`, ) // Derive SSE stream URL. Same logic as transportUtils.ts:26-33 but // starting from an http(s) base instead of a --sdk-url that might be ws://. // 派生 SSE 流 URL。与 transportUtils.ts:26-33 相同的逻辑,但 // 从 http(s) 基础开始,而非可能是 ws:// 的 --sdk-url。 const sseUrl = new URL(sessionUrl) sseUrl.pathname = sseUrl.pathname.replace(/\/$/, '') + '/worker/events/stream' const sse = new SSETransport( sseUrl, {}, sessionId, undefined, initialSequenceNum, getAuthHeaders, ) let onCloseCb: ((closeCode?: number) => void) | undefined const ccr = new CCRClient(sse, new URL(sessionUrl), { getAuthHeaders, heartbeatIntervalMs: opts.heartbeatIntervalMs, heartbeatJitterFraction: opts.heartbeatJitterFraction, // Default is process.exit(1) — correct for spawn-mode children. In-process, // that kills the REPL. Close instead: replBridge's onClose wakes the poll // loop, which picks up the server's re-dispatch (with fresh epoch). // 默认为 process.exit(1) — 对 spawn-mode 子进程正确。在进程中, // 这会杀死 REPL。改为关闭:replBridge 的 onClose 唤醒 poll // 循环,后者拾取服务器的重新分配(带 fresh epoch)。 onEpochMismatch: () => { logForDebugging( '[bridge:repl] CCR v2: epoch superseded (409) — closing for poll-loop recovery', ) // Close resources in a try block so the throw always executes. // If ccr.close() or sse.close() throw, we still need to unwind // the caller (request()) — otherwise handleEpochMismatch's `never` // return type is violated at runtime and control falls through. // 在 try 块中关闭资源,以便 throw 始终执行。 // 如果 ccr.close() 或 sse.close() throw,我们仍然需要解开 // 调用者(request())— 否则 handleEpochMismatch 的 `never` // 返回类型在运行时被违反,控制流掉落。 try { ccr.close() sse.close() onCloseCb?.(4090) } catch (closeErr: unknown) { logForDebugging( `[bridge:repl] CCR v2: error during epoch-mismatch cleanup: ${errorMessage(closeErr)}`, { level: 'error' }, ) } // Don't return — the calling request() code continues after the 409 // branch, so callers see the logged warning and a false return. We // throw to unwind; the uploaders catch it as a send failure. // 不要返回 — 调用 request() 代码在 409 分支后继续, // 因此调用者看到日志警告和 false 返回。我们 throw 以解开; // uploaders 将其作为发送失败捕获。 throw new Error('epoch superseded') }, }) // CCRClient's constructor wired sse.setOnEvent → reportDelivery('received'). // remoteIO.ts additionally sends 'processing'/'processed' via // setCommandLifecycleListener, which the in-process query loop fires. This // transport's only caller (replBridge/daemonBridge) has no such wiring — // the daemon's agent child is a separate process (ProcessTransport), and its // notifyCommandLifecycle calls fire with listener=null in its own module // scope. So events stay at 'received' forever, and reconnectSession re-queues // them on every daemon restart (observed: 21→24→25 phantom prompts as // "user sent a new message while you were working" system-reminders). // // Fix: ACK 'processed' immediately alongside 'received'. The window between // SSE receipt and transcript-write is narrow (queue → SDK → child stdin → // model); a crash there loses one prompt vs. the observed N-prompt flood on // every restart. Overwrite the constructor's wiring to do both — setOnEvent // replaces, not appends (SSETransport.ts:658). // CCRClient 的构造函数连接了 sse.setOnEvent → reportDelivery('received')。 // remoteIO.ts 额外通过 setCommandLifecycleListener 发送 'processing'/'processed', // 这是 in-process 查询循环触发的。此传输的唯一调用者 //(replBridge/daemonBridge)没有这样的连接 — daemon 的 agent 子进程 // 是一个单独的进程(ProcessTransport),其 notifyCommandLifecycle 调用 // 在自己的模块范围内用 listener=null 触发。因此事件永远停留在 // 'received',reconnectSession 在每次 daemon 重启时重新排队 //(观察到:21→24→25 幻影 prompts 为"user sent a new message while you were working" // 系统提醒)。 // // 修复:立即与 'received' 一起 ACK 'processed'。SSE 接收和 transcript 写入 // 之间的窗口很窄(queue → SDK → child stdin → model); // 在那里崩溃会丢失一个 prompt 对比每次重启时观察到的 N-prompt 洪水。 // 覆盖构造函数的连接以同时做两者 — setOnEvent 替换而非附加 //(SSETransport.ts:658)。 sse.setOnEvent(event => { ccr.reportDelivery(event.event_id, 'received') ccr.reportDelivery(event.event_id, 'processed') }) // Both sse.connect() and ccr.initialize() are deferred to connect() below. // replBridge's calling order is newTransport → setOnConnect → setOnData → // setOnClose → connect(), and both calls need those callbacks wired first: // sse.connect() opens the stream (events flow to onData/onClose immediately), // and ccr.initialize().then() fires onConnectCb. // // onConnect fires once ccr.initialize() resolves. Writes go via // CCRClient HTTP POST (SerialBatchEventUploader), not SSE, so the // write path is ready the moment workerEpoch is set. SSE.connect() // awaits its read loop and never resolves — don't gate on it. // The SSE stream opens in parallel (~30ms) and starts delivering // inbound events via setOnData; outbound doesn't need to wait for it. // sse.connect() 和 ccr.initialize() 都延迟到下面的 connect()。 // replBridge 的调用顺序是 newTransport → setOnConnect → setOnData → // setOnClose → connect(),两个调用都需要那些回调先连接: // sse.connect() 打开流(事件立即流向 onData/onClose), // ccr.initialize().then() 触发 onConnectCb。 // // onConnect 在 ccr.initialize() 解决后触发一次。写入通过 // CCRClient HTTP POST(SerialBatchEventUploader)而非 SSE, // 因此写入路径在 workerEpoch 设置时就准备好了。 // SSE.connect() 等待其读取循环且从不解决 — 不要门控它。 // SSE 流并行打开(~30ms)并开始通过 setOnData 传递 // 入站事件;出站不需要等待它。 let onConnectCb: (() => void) | undefined let ccrInitialized = false let closed = false return { write(msg) { return ccr.writeEvent(msg) }, async writeBatch(msgs) { // SerialBatchEventUploader already batches internally (maxBatchSize=100); // sequential enqueue preserves order and the uploader coalesces. // Check closed between writes to avoid sending partial batches after // transport teardown (epoch mismatch, SSE drop). // SerialBatchEventUploader 已经在内部批处理(maxBatchSize=100); // 顺序入队保持顺序,uploader 合并。 // 在写入之间检查 closed 以避免在传输拆除后发送部分批次 //(epoch 不匹配、SSE 丢弃)。 for (const m of msgs) { if (closed) break await ccr.writeEvent(m) } }, close() { closed = true ccr.close() sse.close() }, isConnectedStatus() { // Write-readiness, not read-readiness — replBridge checks this // before calling writeBatch. SSE open state is orthogonal. // 写入就绪,而非读取就绪 — replBridge 在调用 writeBatch 前检查这个。 // SSE 打开状态是正交的。 return ccrInitialized }, getStateLabel() { // SSETransport doesn't expose its state string; synthesize from // what we can observe. replBridge only uses this for debug logging. // SSETransport 不暴露其状态字符串;从我们能观察的合成。 // replBridge 只将此用于调试日志记录。 if (sse.isClosedStatus()) return 'closed' if (sse.isConnectedStatus()) return ccrInitialized ? 'connected' : 'init' return 'connecting' }, setOnData(cb) { sse.setOnData(cb) }, setOnClose(cb) { onCloseCb = cb // SSE reconnect-budget exhaustion fires onClose(undefined) — map to // 4092 so ws_closed telemetry can distinguish it from HTTP-status // closes (SSETransport:280 passes response.status). Stop CCRClient's // heartbeat timer before notifying replBridge. (sse.close() doesn't // invoke this, so the epoch-mismatch path above isn't double-firing.) // SSE 重连预算耗尽触发 onClose(undefined) — 映射到 4092 以便 // ws_closed 遥测可以将其与 HTTP 状态关闭区分开来 //(SSETransport:280 传递 response.status)。在通知 replBridge 之前 // 停止 CCRClient 的 heartbeat 计时器。(sse.close() 不调用这个, // 因此上面的 epoch-mismatch 路径不会双重触发。) sse.setOnClose(code => { ccr.close() cb(code ?? 4092) }) }, setOnConnect(cb) { onConnectCb = cb }, getLastSequenceNum() { return sse.getLastSequenceNum() }, // v2 write path (CCRClient) doesn't set maxConsecutiveFailures — no drops. // v2 写入路径(CCRClient)不设置 maxConsecutiveFailures — 无丢弃。 droppedBatchCount: 0, reportState(state) { ccr.reportState(state) }, reportMetadata(metadata) { ccr.reportMetadata(metadata) }, reportDelivery(eventId, status) { ccr.reportDelivery(eventId, status) }, flush() { return ccr.flush() }, connect() { // Outbound-only: skip the SSE read stream entirely — no inbound // events to receive, no delivery ACKs to send. Only the CCRClient // write path (POST /worker/events) and heartbeat are needed. // 出站专用:完全跳过 SSE 读取流 — 无入站事件接收, // 无交付 ACK 发送。只需要 CCRClient 写入路径 //(POST /worker/events)和 heartbeat。 if (!opts.outboundOnly) { // Fire-and-forget — SSETransport.connect() awaits readStream() // (the read loop) and only resolves on stream close/error. The // spawn-mode path in remoteIO.ts does the same void discard. // Fire-and-forget — SSETransport.connect() 等待 readStream() //(读取循环)且只在流关闭/错误时解决。remoteIO.ts 中的 // spawn-mode 路径同样做 void discard。 void sse.connect() } void ccr.initialize(epoch).then( () => { ccrInitialized = true logForDebugging( `[bridge:repl] v2 transport ready for writes (epoch=${epoch}, sse=${sse.isConnectedStatus() ? 'open' : 'opening'})`, ) onConnectCb?.() }, (err: unknown) => { logForDebugging( `[bridge:repl] CCR v2 initialize failed: ${errorMessage(err)}`, { level: 'error' }, ) // Close transport resources and notify replBridge via onClose // so the poll loop can retry on the next work dispatch. // Without this callback, replBridge never learns the transport // failed to initialize and sits with transport === null forever. // 关闭传输资源并通过 onClose 通知 replBridge, // 以便 poll 循环可以在下次工作分配时重试。 // 没有这个回调,replBridge 永远不会知道传输 // 初始化失败并永远以 transport === null 坐着。 ccr.close() sse.close() onCloseCb?.(4091) // 4091 = init failure, distinguishable from 4090 epoch mismatch }, ) }, } }