research

pi SDK — RPC 모드 상세

pi SDK — RPC 모드 상세

pi RPC 모드의 전체 프로토콜 분석. 목적: Python orchestrator에서 pi를 subprocess로 호출할 때 구현 레퍼런스.


RPC 모드란

다른 언어나 프로세스 격리가 필요한 환경에서 pi coding agent를 임베드하기 위한 JSON 기반 stdin/stdout 프로토콜.

Python orchestrator
  → subprocess.Popen(["pi", "--mode", "rpc"])
  → stdin에 JSON 명령 전송 (LF 구분)
  → stdout에서 이벤트/응답 수신

언어 무관 — Python, Ruby, Go, Rust 어디서든 호출 가능.


시작 방법

# 기본 (세션 영속화)
pi --mode rpc

# 무상태 (세션 저장 안 함)
pi --mode rpc --no-session

# 특정 디렉토리에서
pi --mode rpc --cwd /path/to/project

SDK에서 직접 실행:

import { createAgentSession, runRpcMode } from "@mariozechner/pi-coding-agent";
const { session } = await createAgentSession({ /* ... */ });
await runRpcMode(session);  // stdin 읽기, stdout 쓰기

프로토콜 규칙

JSONL 포맷

- 한 줄 = 하나의 JSON 객체
- 구분자: LF(\n) 만
- CR(\r) 사용 금지
- Node readline 사용 금지 (U+2028, U+2029도 분리해서 버그 발생)

Python에서 올바른 파싱

import subprocess, json

proc = subprocess.Popen(
    ["pi", "--mode", "rpc"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE
)

def send(cmd: dict):
    line = json.dumps(cmd) + "\n"
    proc.stdin.write(line.encode())
    proc.stdin.flush()

def recv():
    line = proc.stdout.readline()  # LF로 구분, 안전
    return json.loads(line.decode())

요청/응답 상관관계

// 요청 (id 포함)
{"id": "req-1", "type": "prompt", "message": "안녕"}

// 응답 (같은 id 반환)
{"type": "response", "command": "prompt", "id": "req-1", "success": true}

// 이벤트 (id 없음, 비동기 스트리밍)
{"type": "message_update", "delta": "안녕"}

명령 목록 (stdin → pi)

핵심 명령

// 프롬프트 전송 (비동기, 이벤트로 스트리밍)
{"id": "r1", "type": "prompt", "message": "작업 내용"}

// 이미지 포함
{
  "type": "prompt",
  "message": "이 이미지 분석해줘",
  "images": [{"type": "image", "data": "<base64>", "mimeType": "image/png"}]
}

// 스트리밍 중 추가 메시지
{
  "type": "prompt",
  "message": "방향 수정해줘",
  "streamingBehavior": "steer"    // 현재 turn 완료 후 전달
  // "streamingBehavior": "followUp"  // 에이전트 완전 종료 후 전달
}

// 실행 중단
{"type": "abort"}

세션 관리

// 현재 상태 조회
{"id": "r2", "type": "get_state"}
// 응답:
// {
//   "model": {...},
//   "thinkingLevel": "medium",
//   "isStreaming": false,
//   "isCompacting": false,
//   "sessionFile": "/path/to/session.jsonl",
//   "sessionId": "abc123",
//   "sessionName": "my-feature",
//   "autoCompactionEnabled": true,
//   "messageCount": 5
// }

// 새 세션 시작
{"type": "new_session"}

// 기존 세션 재개
{"type": "resume_session", "sessionFile": "/path/to/session.jsonl"}

// 메시지 목록 조회
{"type": "get_messages"}

// 세션 통계
{"type": "get_session_stats"}
// 응답: userMessages, assistantMessages, toolCalls, tokens, cost 등

모델/설정 제어

// 모델 전환 (mid-session 가능)
{"type": "set_model", "provider": "anthropic", "modelId": "claude-sonnet-4-20250514"}

// 로컬 모델로 전환
{"type": "set_model", "provider": "local", "modelId": "qwen3-7b"}

// 자동 컴팩션 토글
{"type": "set_auto_compaction", "enabled": true}

// 자동 재시도 토글
{"type": "set_auto_retry", "enabled": true}

// 재시도 중단
{"type": "abort_retry"}

bash 직접 실행

// bash 실행 (즉시 실행, 에이전트 루프 밖)
{"type": "bash", "command": "ls -la", "timeoutMs": 5000}
// 응답: { "output": "...", "exitCode": 0, "cancelled": false }

// bash 중단
{"type": "abort_bash"}

Extension UI 응답

// Extension이 confirm 요청 시
// 수신: {"type": "extension_ui_request", "id": "uuid-1", "method": "confirm", ...}
// 응답:
{"type": "extension_ui_response", "id": "uuid-1", "confirmed": true}

// select 응답
{"type": "extension_ui_response", "id": "uuid-2", "value": "Allow"}

// input 응답
{"type": "extension_ui_response", "id": "uuid-3", "value": "사용자 입력값"}

// 취소
{"type": "extension_ui_response", "id": "uuid-4", "cancelled": true}

이벤트 목록 (pi → stdout)

메시지 스트리밍

{"type": "message_start"}
{"type": "message_update", "assistantMessageEvent": {"type": "text_delta", "delta": "안"}}
{"type": "message_update", "assistantMessageEvent": {"type": "text_delta", "delta": "녕"}}
{"type": "message_end"}

Tool 실행

{"type": "tool_execution_start", "toolCallId": "tc-1", "toolName": "bash"}
{"type": "tool_execution_update", "toolCallId": "tc-1", "partialResult": "처리 중..."}
{"type": "tool_execution_end", "toolCallId": "tc-1", "result": "완료"}

Agent lifecycle

{"type": "agent_start"}
{"type": "agent_end"}
{"type": "turn_start"}
{"type": "turn_end"}

컴팩션

{"type": "auto_compaction_start", "reason": "threshold"}  // or "overflow"
{
  "type": "auto_compaction_end",
  "result": {
    "summary": "대화 요약...",
    "firstKeptEntryId": "abc123",
    "tokensBefore": 150000
  },
  "aborted": false,
  "willRetry": false  // overflow면 true → 자동 재시도
}

자동 재시도

{"type": "auto_retry_start", "attempt": 1, "maxAttempts": 3, "delayMs": 1000}
{"type": "auto_retry_end", "success": true}

Extension UI 요청

// confirm 요청 (timeout 있으면 자동 resolve)
{"type": "extension_ui_request", "id": "uuid-1", "method": "confirm",
 "title": "위험!", "message": "rm -rf 허용?", "timeout": 10000}

// select 요청
{"type": "extension_ui_request", "id": "uuid-2", "method": "select",
 "title": "모델 선택", "options": ["Claude", "Local"], "timeout": 30000}

// input 요청
{"type": "extension_ui_request", "id": "uuid-3", "method": "input",
 "title": "입력", "placeholder": "값 입력..."}

// 알림 (응답 불필요)
{"type": "extension_ui_request", "method": "notify",
 "message": "처리 완료", "level": "success"}

Python 클라이언트 구현 패턴

import subprocess, json, threading, asyncio
from typing import Callable

class PiRpcClient:
    def __init__(self, cwd: str = "."):
        self.proc = subprocess.Popen(
            ["pi", "--mode", "rpc", "--cwd", cwd],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        self._callbacks: dict[str, asyncio.Future] = {}
        self._event_handlers: dict[str, list[Callable]] = {}
        self._reader_thread = threading.Thread(target=self._read_loop, daemon=True)
        self._reader_thread.start()

    def _read_loop(self):
        for line in self.proc.stdout:
            event = json.loads(line.decode())
            self._dispatch(event)

    def _dispatch(self, event: dict):
        # 응답: id 기반 Future resolve
        if event.get("type") == "response" and "id" in event:
            fut = self._callbacks.pop(event["id"], None)
            if fut:
                fut.get_event_loop().call_soon_threadsafe(
                    fut.set_result, event
                )
        # 이벤트: 핸들러 호출
        for handler in self._event_handlers.get(event["type"], []):
            handler(event)

    def on(self, event_type: str, handler: Callable):
        self._event_handlers.setdefault(event_type, []).append(handler)

    async def send(self, cmd: dict) -> dict:
        import uuid
        cmd_id = str(uuid.uuid4())
        cmd["id"] = cmd_id
        fut = asyncio.get_event_loop().create_future()
        self._callbacks[cmd_id] = fut
        line = json.dumps(cmd) + "\n"
        self.proc.stdin.write(line.encode())
        self.proc.stdin.flush()
        return await fut

    async def prompt(self, message: str) -> None:
        """프롬프트 전송 (비동기, 이벤트로 결과 수신)"""
        await self.send({"type": "prompt", "message": message})

    async def get_state(self) -> dict:
        resp = await self.send({"type": "get_state"})
        return resp["data"]

    async def set_model(self, provider: str, model_id: str):
        await self.send({"type": "set_model", "provider": provider, "modelId": model_id})

# 사용 예시
async def main():
    client = PiRpcClient(cwd="/path/to/project")

    # 이벤트 핸들러 등록
    client.on("message_update", lambda e: print(e.get("delta", ""), end=""))
    client.on("agent_end", lambda e: print("\n[완료]"))
    client.on("extension_ui_request", handle_ui_request)

    # 모델 전환
    await client.set_model("anthropic", "claude-sonnet-4-20250514")

    # 프롬프트
    await client.prompt("오늘 대기 중인 과제 목록 정리해줘")

중요 제약사항

| 제약 | 내용 | |------|------| | @file 문법 | RPC 모드에서 미지원. 파일 내용을 직접 메시지에 포함 | | 이미지 | images 필드로 base64 전달 (일부 모델만 지원) | | Extension UI | timeout 있으면 자동 resolve (클라이언트가 응답 안 해도 됨) | | 스트리밍 중 명령 | streamingBehavior 없으면 에러 |


SDK vs RPC — 선택 기준

| 상황 | 추천 | |------|------| | TypeScript/Node.js 프로젝트 | SDK 직접 사용 | | Python/Rust/Go 등 타 언어 | RPC 모드 | | 프로세스 격리 필요 | RPC 모드 | | 가벼운 단일 호출 | print 모드 (pi -p "query") | | Extension UI 상호작용 필요 | RPC 모드 (JSON request/response 서브 프로토콜 지원) |

내 일정 프로그램: Python orchestrator → RPC 모드 사용이 맞아.


Python으로 직접 구현 시 RPC 모드에서 빌릴 개념

1. JSONL 스트리밍 프로토콜 (stdin/stdout)
   → asyncio StreamReader/StreamWriter

2. id 기반 request/response 상관관계
   → dict[str, Future] 패턴

3. 비동기 이벤트 스트리밍
   → asyncio Queue + 이벤트 핸들러

4. Extension UI 서브 프로토콜 (confirm/select/input)
   → plan 승인 흐름에 그대로 적용 가능

5. streamingBehavior (steer/followUp)
   → 작업 중 지시 수정 패턴

관련