ZeroClaw — Message 흐름 상세
Channel 시스템 내부. 실제 소스 코드 기반. 파일:
src/channels/mod.rs—process_channel_message()
전체 흐름 다이어그램
[Telegram / Discord / Slack / ...]
│
│ listen() — long-running loop
▼
ChannelMessage {
id, sender, reply_target,
content, channel,
thread_ts, interruption_scope_id
}
│
▼
mpsc::channel (capacity 100) ← 모든 채널이 하나의 버스로
│
▼
run_message_dispatch_loop()
Semaphore: 최대 64 in-flight
per-sender InFlightSenderTaskState 추적
│
├─ /stop 명령 → 진행 중 요청 CancellationToken.cancel()
│
└─ 일반 메시지
tokio::spawn → process_channel_message()
[interrupt_on_new_message = true이면]
이전 동일 sender+scope 요청 취소 후 완료 대기
process_channel_message() 단계별 상세
Phase 0: 전처리
로그 출력:
"💬 [telegram] from alice: hello"
Hook: on_message_received
→ HookResult::Cancel(reason) : 메시지 드롭 (return)
→ HookResult::Continue(msg) : 수정된 메시지로 계속
Phase 1: 라우팅 결정
target_channel 찾기:
channels_by_name.get(&msg.channel)
multi-room은 "matrix:!roomId" → "matrix"로 분리
config.toml 핫 리로드 확인:
maybe_apply_runtime_config_update()
→ config.toml 수정 시각/크기 변경 감지
→ 변경 있으면 provider 재생성 (데몬 재시작 없이)
runtime command 처리:
/models openrouter → provider 전환
/model claude-sonnet-4-6 → model 전환
/new → sender 히스토리 초기화
→ 처리 후 즉시 return (LLM 호출 없음)
query classification:
keywords 매칭 → hint 결정
model_routes에서 hint → provider/model 매핑
예: "code" 키워드 → code-provider + code-model
Phase 2: 히스토리 복원
conversation_history_key 생성:
"{channel}_{reply_target}_{sender}"
스레드 있으면: "{channel}_{reply_target}_{thread_ts}_{sender}"
히스토리 로드:
conversation_histories HashMap에서 sender_key로 조회
세션 퍼시스턴스: session_store에서 JSONL 파일 복원
normalize_cached_channel_turns():
연속 user 메시지 → 하나로 병합 ("\n\n" 구분)
연속 assistant 메시지 → 하나로 병합
이유: LLM API는 user/assistant 엄격히 교대 요구
후처리:
<tool_result> 블록 제거 (세션 간 stale 방지)
비전 미지원 provider이면 [IMAGE:] 마커 제거
proactive_trim_turns():
400,000자 초과 시 가장 오래된 turn부터 삭제
마지막 turn(현재 메시지)은 절대 삭제 안 함
Phase 3: 메모리 recall
auto_save = true이면:
user message → memory.store() (Conversation 카테고리)
key = "{channel}_{thread_ts}_{sender}_{msg.id}" (메시지마다 고유)
Dual-scope memory recall:
sender_memory = memory.recall(content, limit=5, session=sender)
group chat이면 (reply_target에 "@g.us" 또는 "group:" 포함):
group_memory = memory.recall(content, limit=5, session=history_key)
→ sender + group 기억 모두 주입 (중복 제거)
build_memory_context():
score < min_relevance_score → 스킵
[IMAGE:] 포함 → 스킵 (중복 방지)
<tool_result> 포함 → 스킵
_history 접미사 → 스킵
4,000자 초과 → 스킵
최대 4개 항목
Phase 4: System Prompt 구성
base_system_prompt:
새 세션이면: refreshed_new_session_system_prompt()
→ skills 섹션 최신화
이전 세션이면: 캐시된 ctx.system_prompt 사용
build_channel_system_prompt():
DateTimeSection 갱신 (매 메시지마다 현재 시각)
채널별 지침 추가:
"telegram" → Markdown 형식, 미디어 마커 규칙
"matrix" → 음성 응답 안내
"qq" → QQ 특화 마커
channel context 추가:
"Channel context: channel=telegram, reply_target=12345678
cron_add 시 delivery={"channel":"telegram","to":"12345678"}"
Memory context 주입:
system prompt 끝에 "\n\n[Memory context]\n- key: value\n" 추가
(user message에 넣지 않고 system prompt에 넣음 → 히스토리 오염 방지)
최종 history 구성:
[ChatMessage::system(system_prompt)]
+ prior_turns (이전 대화 히스토리)
Phase 5: LLM 호출 (tool call loop)
스트리밍 모드 결정:
channel.supports_draft_updates() = true
→ send_draft() → "..." 초안 메시지 전송
→ delta_tx/rx 채널 생성
→ draft_updater 태스크: rx로 받아서 update_draft() 호출
ack_reactions = true이면:
channel.add_reaction(reply_target, msg.id, "👀")
start_typing():
spawn_scoped_typing_task()
→ 4초마다 channel.start_typing() 반복 (Telegram 타이핑 표시)
ChannelNotifyObserver:
tool call 시작될 때마다 "🔧 `shell`: ls /home" 형식으로
별도 스레드 메시지 전송 (show_tool_calls = true이면)
tokio::select!:
timeout(timeout_budget_secs) vs run_tool_call_loop()
timeout_budget_secs 계산:
message_timeout_secs (기본 300초)
× min(max_tool_iterations, scale_cap)
scale_cap 기본값 4 → 최대 300 × 4 = 1200초
run_tool_call_loop():
= agent loop (LLM + tool 실행 반복)
max_tool_iterations 까지
취소 처리:
CancellationToken.cancelled() → LlmExecutionResult::Cancelled
tool loop 내부에서도 취소 체크
Phase 6: 응답 처리 및 전송
성공 (LlmExecutionResult::Completed(Ok(Ok(response)))):
Hook: on_message_sending
→ HookResult::Cancel → 응답 억제
→ HookResult::Continue(modified) → 내용 수정 (경로 변경 불가)
sanitize_channel_response():
strip_tool_call_tags()
→ <tool_call>, <toolcall>, <tool-call>, <invoke> 블록 제거
strip_isolated_tool_json_artifacts()
→ 도구 JSON 아티팩트 ({"name":"shell","arguments":{...}}) 제거
strip_tool_narration()
→ "Let me check...", "I'll fetch...", "Searching..." 제거
tool_summary 추출:
history에서 run_tool_call_loop 이후 추가된 turn 스캔
→ [Used tools: shell, web_search]
Telegram은 이 prefix 없이 저장 (thread 오염 방지)
히스토리 저장:
append_sender_turn(history_key, ChatMessage::assistant(response))
session_store에 JSONL 파일로 영속화
fire-and-forget consolidation:
tokio::spawn(consolidate_turn(...))
전송:
스트리밍 모드: channel.finalize_draft(reply_target, draft_id, response)
일반 모드: channel.send(SendMessage::new(response, reply_target)
.in_thread(thread_ts))
오류 케이스:
Cancelled → draft 취소, 히스토리 롤백 없음
(orphan user turn은 다음 메시지에서 정상화)
Context 초과 → compact_sender_history() (최근 12개 + 600자 트런케이트)
→ "⚠️ Context window exceeded..." 응답
LLM 에러 → vision capability 에러면 user turn 롤백
→ 일반 에러는 [Task failed] assistant turn 추가
Timeout → [Task timed out] assistant turn 추가
→ "⚠️ Request timed out..." 응답
ack_reactions 마무리:
channel.remove_reaction(reply_target, msg.id, "👀")
channel.add_reaction(reply_target, msg.id, "✅" or "⚠️")
히스토리 키 설계
conversation_history_key():
DM: "telegram_12345678_alice"
스레드: "slack_C123_1741234567_alice"
conversation_memory_key():
메시지마다 고유: "slack_alice_msg_abc123"
스레드 있으면: "slack_1741234567_alice_msg_abc123"
interruption_scope_key():
scope_id 없음: "telegram_chat-1_alice"
scope_id 있음: "slack_C123_alice_1741234567"
→ scope가 다른 스레드는 서로 취소 안 함
취소 vs 히스토리 보존
Alice: "안녕" (msg-1 처리 시작, 150ms 응답 예상)
Alice: "무시해" (50ms 후 도착)
interrupt_on_new_message = true이면:
msg-1의 CancellationToken.cancel()
msg-1 완료 대기
msg-2 처리 시작
히스토리 상태:
turns = [user: "안녕"] ← msg-1 user turn은 이미 push됨
→ msg-2 처리 시 prior_turns에 "안녕"이 포함됨
→ normalize 후 LLM에는 두 user turn이 병합돼서 전달됨
→ "안녕\n\n무시해" 형태
msg-1이 assistant turn을 push하기 전에 취소됐다면:
다음 turn에서 normalize_cached_channel_turns()가 정상화
타임아웃 계산
max_tool_iterations = 10
message_timeout_secs = 300
scale_cap = 4 (기본)
timeout_budget = 300 × min(10, 4) = 300 × 4 = 1200초 (20분)
이유: 도구를 많이 쓸수록 더 오래 걸림
단, scale_cap으로 무제한 증가 방지
관련
- channel-system — Channel trait, 채널 목록, 전체 구조
- agent-loop — run_tool_call_loop() 내부 (turn() 루프)
- auto-compaction — consolidate_turn() fire-and-forget
- memory-system — recall + auto_save
- overview — ZeroClaw 학습 지도