ZeroClaw — SOP 엔진
Layer 3 학습. 실제 소스 코드 기반. 파일:
src/sop/types.rs,src/sop/mod.rs,src/sop/dispatch.rs,src/sop/engine.rs
한 줄 정의
SOP(Standard Operating Procedure) 엔진 = 이벤트가 도착하면 정의된 절차(단계 목록)를 자동으로 실행하는 워크플로 런타임.
MQTT 센서 신호, Webhook, Cron, 하드웨어 GPIO 같은 이벤트가 오면 매칭되는 SOP를 찾아 단계별로 LLM 에이전트를 통해 실행한다.
전체 구조
workspace/sops/
pump-overpressure/
SOP.toml ← 메타데이터 + 트리거 정의
SOP.md ← 절차 단계 마크다운
로드 시:
load_sops() → Vec<Sop>
SopEngine::new() → SOPs 등록
이벤트 도착:
SopEvent { source, topic, payload, timestamp }
│
▼
dispatch_sop_event()
│
├─ Phase 1: match_trigger() → 매칭 SOP 이름 목록
├─ Phase 2: start_run() → SopRun 생성 + SopRunAction 반환
└─ Phase 3: audit 로그 (async, lock 없이)
SopRunAction:
ExecuteStep → 에이전트 루프에서 step 실행
WaitApproval → 승인 대기
Completed / Failed
SOP 파일 포맷
SOP.toml
[sop]
name = "pump-overpressure"
description = "High pressure emergency response"
version = "1.0.0"
priority = "critical" # low | normal | high | critical
execution_mode = "auto" # auto | supervised | step_by_step | priority_based
cooldown_secs = 300 # 마지막 실행 후 재실행 대기 (기본 0)
max_concurrent = 1 # 동시 실행 최대 수 (기본 1)
[[triggers]]
type = "mqtt"
topic = "facility/pump/pressure"
condition = "$.value > 85" # JSONPath 조건 (선택)
[[triggers]]
type = "webhook"
path = "/sop/emergency"
[[triggers]]
type = "cron"
expression = "0 */6 * * *"
[[triggers]]
type = "peripheral"
board = "nucleo-f401re-0"
signal = "pin_3"
condition = "> 0"
[[triggers]]
type = "manual"
SOP.md
# Pump Overpressure Response
## Steps
1. **Check pressure readings** — Read sensor data and log.
- tools: gpio_read, memory_store
2. **Close relief valve** — Set GPIO pin 5 LOW.
- tools: gpio_write
- requires_confirmation: true
3. **Notify operator** — Send Pushover alert.
- tools: pushover
단계 파싱 규칙:
## Steps섹션만 파싱 (다른##섹션 무시)N. **제목** — 본문형식- tools: tool1, tool2→ suggested_tools- requires_confirmation: true→ 이 단계 전 승인 필요
핵심 타입
Schedule (트리거 방식)
pub enum SopTrigger {
Mqtt { topic, condition } // MQTT 토픽 + JSONPath 조건
Webhook { path } // HTTP 경로
Cron { expression } // Cron 표현식
Peripheral { board, signal, condition } // 하드웨어 GPIO
Manual // 수동 실행
}
SopExecutionMode (자율성 수준)
Auto → 모든 단계 자동 실행 (승인 없음)
Supervised → 첫 단계 전 1회 승인, 이후 자동 (기본값)
StepByStep → 각 단계마다 승인
PriorityBased → Critical/High → Auto, Normal/Low → Supervised
SopRunStatus (실행 상태)
Pending → Running → Completed
→ Failed
→ Cancelled
WaitingApproval → Running (승인 시)
SopRunAction (엔진 → 호출자 지시)
pub enum SopRunAction {
ExecuteStep { run_id, step, context }, // 이 step을 에이전트에게 실행 요청
WaitApproval { run_id, step, context }, // 승인 대기 후 재개
Completed { run_id, sop_name },
Failed { run_id, sop_name, reason },
}
dispatch_sop_event() — 핵심 디스패처
Lock 획득 횟수: 정확히 2번 (배치 락 패턴)
Phase 1 (Lock 1):
engine.match_trigger(&event)
→ 매칭된 SOP 이름 목록 수집
→ Lock 해제
Phase 2 (Lock 2):
for each sop_name:
engine.start_run(sop_name, event) → SopRunAction
run 스냅샷 저장
→ Lock 해제
Phase 3 (Lock 없음, async):
audit.log_run_start(run) 각각 호출
이유: Lock을 길게 잡으면 audit(async 네트워크 I/O)이 블록됨. 분리해서 최소화.
트리거 매칭 — match_trigger()
SopEvent.source 기준으로 필터 후:
Mqtt: topic 정확히 일치 + condition(JSONPath) 평가
Webhook: path 정확히 일치
Cron: SopCronCache에서 윈도우 기반 체크 (last_check, now]
Peripheral: "{board}/{signal}" 토픽 일치 + condition 평가
Manual: 항상 매칭
Cron 트리거 — 윈도우 기반
// 스케줄러 재시작 후 놓친 tick도 캐치
if let Some(next) = schedule.after(&last_check).next() {
if next <= now { /* 이 표현식 이 윈도우에서 발화 */ }
}
// 한 윈도우에 여러 tick이 있어도 1번만 발화 (at-most-once)
SopCronCache: 데몬 시작 시 한 번 파싱 → 매 poll tick마다 재파싱 없음.
실행 흐름 — start_run() 후
SopRunAction::ExecuteStep { step, context } 반환 시:
→ 에이전트 루프로 step 주입
→ "[SOP: pump-overpressure | Step 1] Check pressure..." 형식의 프롬프트
→ agent::run() 실행
→ 결과로 engine.advance_step() 호출
→ 다음 SopRunAction 반환 (다음 단계 or Completed/Failed)
SopRunAction::WaitApproval { step, context } 반환 시:
→ 채널(Telegram 등)에 승인 요청 메시지 전송
→ run.status = WaitingApproval
→ run.waiting_since 기록 (타임아웃 추적용)
→ 승인 수신 시 engine.approve_step() → 실행 재개
→ 타임아웃 시 scheduler가 polling으로 감지 → 취소
동시성 & Cooldown 제어
cooldown_secs = 300:
마지막 완료 후 300초 내 재트리거 → Skipped
max_concurrent = 1:
현재 활성 run이 1개 이상이면 → Skipped
DispatchResult::Skipped { sop_name, reason }
→ "cooldown active" 또는 "max concurrent reached"
헤드리스 실행 (채널 없을 때)
MQTT, Webhook, Cron 같은 팬인 소스는 에이전트 루프가 없을 수 있음:
pub async fn process_headless_results(results: &[DispatchResult]) {
// ExecuteStep → 경고 로그 (step 실행 불가)
// WaitApproval → 정보 로그 (타임아웃 폴링이 처리)
// Completed → 정보 로그
// Failed → 경고 로그
}
SopAuditLogger
모든 run 시작/완료/실패를 Memory 백엔드에 기록.
log_run_start(run) → memory.store("sop_audit_start_{run_id}", ...)
log_run_complete(run)
log_run_fail(run, reason)
SopMetricsCollector
per-SOP 메트릭:
total_runs, completed, failed, cancelled
avg_duration_ms, min/max_duration_ms
last_run_at, last_status
step별 성공/실패 카운트
config.toml 설정
[sop]
enabled = true
sops_dir = "" # 기본: {workspace}/sops
default_execution_mode = "supervised"
approval_timeout_secs = 3600 # 승인 대기 최대 시간 (1시간)
max_active_runs = 10 # 전체 활성 run 최대 수
디렉토리 구조 예시
workspace/sops/
pump-overpressure/
SOP.toml
SOP.md
nightly-backup/
SOP.toml
SOP.md
door-sensor-alert/
SOP.toml
SOP.md
# CLI 명령
zeroclaw sop list # 등록된 SOP 목록
zeroclaw sop show <name> # 상세 정보 (트리거, 단계)
zeroclaw sop validate # 모든 SOP 검증
zeroclaw sop validate <name>
Lifecycle Hooks와의 연결
SOP 엔진 자체는 Hook을 직접 노출하지 않지만,
에이전트 루프를 통해 step을 실행하므로
before_tool_call, on_after_tool_call 등 기존 Hook이 SOP step 실행에도 적용됨.
관련
- daemon — SOP 스케줄러 / 폴러가 daemon 컴포넌트로 실행
- cron-scheduler — SopTrigger::Cron이 cron 표현식 파싱 재사용
- agent-loop — SopRunAction::ExecuteStep → agent::run() 호출
- lifecycle-hooks — step 실행 중 hook 적용
- channel-system — WaitApproval 메시지 채널 전달
- overview — ZeroClaw 학습 지도