pi SDK — RPC 모드 상세
pi RPC 모드의 전체 프로토콜 분석. 목적: Python orchestrator에서 pi를 subprocess로 호출할 때 구현 레퍼런스.
RPC 모드란
다른 언어나 프로세스 격리가 필요한 환경에서 pi coding agent를 임베드하기 위한 JSON 기반 stdin/stdout 프로토콜.
Python orchestrator
→ subprocess.Popen(["pi", "--mode", "rpc"])
→ stdin에 JSON 명령 전송 (LF 구분)
→ stdout에서 이벤트/응답 수신
언어 무관 — Python, Ruby, Go, Rust 어디서든 호출 가능.
시작 방법
# 기본 (세션 영속화)
pi --mode rpc
# 무상태 (세션 저장 안 함)
pi --mode rpc --no-session
# 특정 디렉토리에서
pi --mode rpc --cwd /path/to/project
SDK에서 직접 실행:
import { createAgentSession, runRpcMode } from "@mariozechner/pi-coding-agent";
const { session } = await createAgentSession({ /* ... */ });
await runRpcMode(session); // stdin 읽기, stdout 쓰기
프로토콜 규칙
JSONL 포맷
- 한 줄 = 하나의 JSON 객체
- 구분자: LF(\n) 만
- CR(\r) 사용 금지
- Node readline 사용 금지 (U+2028, U+2029도 분리해서 버그 발생)
Python에서 올바른 파싱
import subprocess, json
proc = subprocess.Popen(
["pi", "--mode", "rpc"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
def send(cmd: dict):
line = json.dumps(cmd) + "\n"
proc.stdin.write(line.encode())
proc.stdin.flush()
def recv():
line = proc.stdout.readline() # LF로 구분, 안전
return json.loads(line.decode())
요청/응답 상관관계
// 요청 (id 포함)
{"id": "req-1", "type": "prompt", "message": "안녕"}
// 응답 (같은 id 반환)
{"type": "response", "command": "prompt", "id": "req-1", "success": true}
// 이벤트 (id 없음, 비동기 스트리밍)
{"type": "message_update", "delta": "안녕"}
명령 목록 (stdin → pi)
핵심 명령
// 프롬프트 전송 (비동기, 이벤트로 스트리밍)
{"id": "r1", "type": "prompt", "message": "작업 내용"}
// 이미지 포함
{
"type": "prompt",
"message": "이 이미지 분석해줘",
"images": [{"type": "image", "data": "<base64>", "mimeType": "image/png"}]
}
// 스트리밍 중 추가 메시지
{
"type": "prompt",
"message": "방향 수정해줘",
"streamingBehavior": "steer" // 현재 turn 완료 후 전달
// "streamingBehavior": "followUp" // 에이전트 완전 종료 후 전달
}
// 실행 중단
{"type": "abort"}
세션 관리
// 현재 상태 조회
{"id": "r2", "type": "get_state"}
// 응답:
// {
// "model": {...},
// "thinkingLevel": "medium",
// "isStreaming": false,
// "isCompacting": false,
// "sessionFile": "/path/to/session.jsonl",
// "sessionId": "abc123",
// "sessionName": "my-feature",
// "autoCompactionEnabled": true,
// "messageCount": 5
// }
// 새 세션 시작
{"type": "new_session"}
// 기존 세션 재개
{"type": "resume_session", "sessionFile": "/path/to/session.jsonl"}
// 메시지 목록 조회
{"type": "get_messages"}
// 세션 통계
{"type": "get_session_stats"}
// 응답: userMessages, assistantMessages, toolCalls, tokens, cost 등
모델/설정 제어
// 모델 전환 (mid-session 가능)
{"type": "set_model", "provider": "anthropic", "modelId": "claude-sonnet-4-20250514"}
// 로컬 모델로 전환
{"type": "set_model", "provider": "local", "modelId": "qwen3-7b"}
// 자동 컴팩션 토글
{"type": "set_auto_compaction", "enabled": true}
// 자동 재시도 토글
{"type": "set_auto_retry", "enabled": true}
// 재시도 중단
{"type": "abort_retry"}
bash 직접 실행
// bash 실행 (즉시 실행, 에이전트 루프 밖)
{"type": "bash", "command": "ls -la", "timeoutMs": 5000}
// 응답: { "output": "...", "exitCode": 0, "cancelled": false }
// bash 중단
{"type": "abort_bash"}
Extension UI 응답
// Extension이 confirm 요청 시
// 수신: {"type": "extension_ui_request", "id": "uuid-1", "method": "confirm", ...}
// 응답:
{"type": "extension_ui_response", "id": "uuid-1", "confirmed": true}
// select 응답
{"type": "extension_ui_response", "id": "uuid-2", "value": "Allow"}
// input 응답
{"type": "extension_ui_response", "id": "uuid-3", "value": "사용자 입력값"}
// 취소
{"type": "extension_ui_response", "id": "uuid-4", "cancelled": true}
이벤트 목록 (pi → stdout)
메시지 스트리밍
{"type": "message_start"}
{"type": "message_update", "assistantMessageEvent": {"type": "text_delta", "delta": "안"}}
{"type": "message_update", "assistantMessageEvent": {"type": "text_delta", "delta": "녕"}}
{"type": "message_end"}
Tool 실행
{"type": "tool_execution_start", "toolCallId": "tc-1", "toolName": "bash"}
{"type": "tool_execution_update", "toolCallId": "tc-1", "partialResult": "처리 중..."}
{"type": "tool_execution_end", "toolCallId": "tc-1", "result": "완료"}
Agent lifecycle
{"type": "agent_start"}
{"type": "agent_end"}
{"type": "turn_start"}
{"type": "turn_end"}
컴팩션
{"type": "auto_compaction_start", "reason": "threshold"} // or "overflow"
{
"type": "auto_compaction_end",
"result": {
"summary": "대화 요약...",
"firstKeptEntryId": "abc123",
"tokensBefore": 150000
},
"aborted": false,
"willRetry": false // overflow면 true → 자동 재시도
}
자동 재시도
{"type": "auto_retry_start", "attempt": 1, "maxAttempts": 3, "delayMs": 1000}
{"type": "auto_retry_end", "success": true}
Extension UI 요청
// confirm 요청 (timeout 있으면 자동 resolve)
{"type": "extension_ui_request", "id": "uuid-1", "method": "confirm",
"title": "위험!", "message": "rm -rf 허용?", "timeout": 10000}
// select 요청
{"type": "extension_ui_request", "id": "uuid-2", "method": "select",
"title": "모델 선택", "options": ["Claude", "Local"], "timeout": 30000}
// input 요청
{"type": "extension_ui_request", "id": "uuid-3", "method": "input",
"title": "입력", "placeholder": "값 입력..."}
// 알림 (응답 불필요)
{"type": "extension_ui_request", "method": "notify",
"message": "처리 완료", "level": "success"}
Python 클라이언트 구현 패턴
import subprocess, json, threading, asyncio
from typing import Callable
class PiRpcClient:
def __init__(self, cwd: str = "."):
self.proc = subprocess.Popen(
["pi", "--mode", "rpc", "--cwd", cwd],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self._callbacks: dict[str, asyncio.Future] = {}
self._event_handlers: dict[str, list[Callable]] = {}
self._reader_thread = threading.Thread(target=self._read_loop, daemon=True)
self._reader_thread.start()
def _read_loop(self):
for line in self.proc.stdout:
event = json.loads(line.decode())
self._dispatch(event)
def _dispatch(self, event: dict):
# 응답: id 기반 Future resolve
if event.get("type") == "response" and "id" in event:
fut = self._callbacks.pop(event["id"], None)
if fut:
fut.get_event_loop().call_soon_threadsafe(
fut.set_result, event
)
# 이벤트: 핸들러 호출
for handler in self._event_handlers.get(event["type"], []):
handler(event)
def on(self, event_type: str, handler: Callable):
self._event_handlers.setdefault(event_type, []).append(handler)
async def send(self, cmd: dict) -> dict:
import uuid
cmd_id = str(uuid.uuid4())
cmd["id"] = cmd_id
fut = asyncio.get_event_loop().create_future()
self._callbacks[cmd_id] = fut
line = json.dumps(cmd) + "\n"
self.proc.stdin.write(line.encode())
self.proc.stdin.flush()
return await fut
async def prompt(self, message: str) -> None:
"""프롬프트 전송 (비동기, 이벤트로 결과 수신)"""
await self.send({"type": "prompt", "message": message})
async def get_state(self) -> dict:
resp = await self.send({"type": "get_state"})
return resp["data"]
async def set_model(self, provider: str, model_id: str):
await self.send({"type": "set_model", "provider": provider, "modelId": model_id})
# 사용 예시
async def main():
client = PiRpcClient(cwd="/path/to/project")
# 이벤트 핸들러 등록
client.on("message_update", lambda e: print(e.get("delta", ""), end=""))
client.on("agent_end", lambda e: print("\n[완료]"))
client.on("extension_ui_request", handle_ui_request)
# 모델 전환
await client.set_model("anthropic", "claude-sonnet-4-20250514")
# 프롬프트
await client.prompt("오늘 대기 중인 과제 목록 정리해줘")
중요 제약사항
| 제약 | 내용 |
|------|------|
| @file 문법 | RPC 모드에서 미지원. 파일 내용을 직접 메시지에 포함 |
| 이미지 | images 필드로 base64 전달 (일부 모델만 지원) |
| Extension UI | timeout 있으면 자동 resolve (클라이언트가 응답 안 해도 됨) |
| 스트리밍 중 명령 | streamingBehavior 없으면 에러 |
SDK vs RPC — 선택 기준
| 상황 | 추천 |
|------|------|
| TypeScript/Node.js 프로젝트 | SDK 직접 사용 |
| Python/Rust/Go 등 타 언어 | RPC 모드 |
| 프로세스 격리 필요 | RPC 모드 |
| 가벼운 단일 호출 | print 모드 (pi -p "query") |
| Extension UI 상호작용 필요 | RPC 모드 (JSON request/response 서브 프로토콜 지원) |
내 일정 프로그램: Python orchestrator → RPC 모드 사용이 맞아.
Python으로 직접 구현 시 RPC 모드에서 빌릴 개념
1. JSONL 스트리밍 프로토콜 (stdin/stdout)
→ asyncio StreamReader/StreamWriter
2. id 기반 request/response 상관관계
→ dict[str, Future] 패턴
3. 비동기 이벤트 스트리밍
→ asyncio Queue + 이벤트 핸들러
4. Extension UI 서브 프로토콜 (confirm/select/input)
→ plan 승인 흐름에 그대로 적용 가능
5. streamingBehavior (steer/followUp)
→ 작업 중 지시 수정 패턴
관련
- overview — pi SDK 전체 분석
- extension-api — Extension API 상세
- llm-vram-model-swap — VRAM 스왑 전략