research

ZeroClaw — Message 흐름 상세

ZeroClaw — Message 흐름 상세

Channel 시스템 내부. 실제 소스 코드 기반. 파일: src/channels/mod.rsprocess_channel_message()


전체 흐름 다이어그램

[Telegram / Discord / Slack / ...]
        │
        │ listen() — long-running loop
        ▼
ChannelMessage {
    id, sender, reply_target,
    content, channel,
    thread_ts, interruption_scope_id
}
        │
        ▼
mpsc::channel (capacity 100)  ← 모든 채널이 하나의 버스로
        │
        ▼
run_message_dispatch_loop()
    Semaphore: 최대 64 in-flight
    per-sender InFlightSenderTaskState 추적
        │
        ├─ /stop 명령 → 진행 중 요청 CancellationToken.cancel()
        │
        └─ 일반 메시지
               tokio::spawn → process_channel_message()
               [interrupt_on_new_message = true이면]
               이전 동일 sender+scope 요청 취소 후 완료 대기

process_channel_message() 단계별 상세

Phase 0: 전처리

로그 출력:
  "💬 [telegram] from alice: hello"

Hook: on_message_received
  → HookResult::Cancel(reason)  : 메시지 드롭 (return)
  → HookResult::Continue(msg)   : 수정된 메시지로 계속

Phase 1: 라우팅 결정

target_channel 찾기:
  channels_by_name.get(&msg.channel)
  multi-room은 "matrix:!roomId" → "matrix"로 분리

config.toml 핫 리로드 확인:
  maybe_apply_runtime_config_update()
  → config.toml 수정 시각/크기 변경 감지
  → 변경 있으면 provider 재생성 (데몬 재시작 없이)

runtime command 처리:
  /models openrouter → provider 전환
  /model claude-sonnet-4-6 → model 전환
  /new → sender 히스토리 초기화
  → 처리 후 즉시 return (LLM 호출 없음)

query classification:
  keywords 매칭 → hint 결정
  model_routes에서 hint → provider/model 매핑
  예: "code" 키워드 → code-provider + code-model

Phase 2: 히스토리 복원

conversation_history_key 생성:
  "{channel}_{reply_target}_{sender}"
  스레드 있으면: "{channel}_{reply_target}_{thread_ts}_{sender}"

히스토리 로드:
  conversation_histories HashMap에서 sender_key로 조회
  세션 퍼시스턴스: session_store에서 JSONL 파일 복원

normalize_cached_channel_turns():
  연속 user 메시지 → 하나로 병합 ("\n\n" 구분)
  연속 assistant 메시지 → 하나로 병합
  이유: LLM API는 user/assistant 엄격히 교대 요구

후처리:
  <tool_result> 블록 제거 (세션 간 stale 방지)
  비전 미지원 provider이면 [IMAGE:] 마커 제거

proactive_trim_turns():
  400,000자 초과 시 가장 오래된 turn부터 삭제
  마지막 turn(현재 메시지)은 절대 삭제 안 함

Phase 3: 메모리 recall

auto_save = true이면:
  user message → memory.store() (Conversation 카테고리)
  key = "{channel}_{thread_ts}_{sender}_{msg.id}" (메시지마다 고유)

Dual-scope memory recall:
  sender_memory = memory.recall(content, limit=5, session=sender)
  
  group chat이면 (reply_target에 "@g.us" 또는 "group:" 포함):
    group_memory = memory.recall(content, limit=5, session=history_key)
    → sender + group 기억 모두 주입 (중복 제거)

build_memory_context():
  score < min_relevance_score → 스킵
  [IMAGE:] 포함 → 스킵 (중복 방지)
  <tool_result> 포함 → 스킵
  _history 접미사 → 스킵
  4,000자 초과 → 스킵
  최대 4개 항목

Phase 4: System Prompt 구성

base_system_prompt:
  새 세션이면: refreshed_new_session_system_prompt()
              → skills 섹션 최신화
  이전 세션이면: 캐시된 ctx.system_prompt 사용

build_channel_system_prompt():
  DateTimeSection 갱신 (매 메시지마다 현재 시각)
  채널별 지침 추가:
    "telegram" → Markdown 형식, 미디어 마커 규칙
    "matrix"   → 음성 응답 안내
    "qq"       → QQ 특화 마커

channel context 추가:
  "Channel context: channel=telegram, reply_target=12345678
   cron_add 시 delivery={"channel":"telegram","to":"12345678"}"

Memory context 주입:
  system prompt 끝에 "\n\n[Memory context]\n- key: value\n" 추가
  (user message에 넣지 않고 system prompt에 넣음 → 히스토리 오염 방지)

최종 history 구성:
  [ChatMessage::system(system_prompt)]
  + prior_turns (이전 대화 히스토리)

Phase 5: LLM 호출 (tool call loop)

스트리밍 모드 결정:
  channel.supports_draft_updates() = true
    → send_draft() → "..." 초안 메시지 전송
    → delta_tx/rx 채널 생성
    → draft_updater 태스크: rx로 받아서 update_draft() 호출

ack_reactions = true이면:
  channel.add_reaction(reply_target, msg.id, "👀")

start_typing():
  spawn_scoped_typing_task()
  → 4초마다 channel.start_typing() 반복 (Telegram 타이핑 표시)

ChannelNotifyObserver:
  tool call 시작될 때마다 "🔧 `shell`: ls /home" 형식으로
  별도 스레드 메시지 전송 (show_tool_calls = true이면)

tokio::select!:
  timeout(timeout_budget_secs) vs run_tool_call_loop()
  
  timeout_budget_secs 계산:
    message_timeout_secs (기본 300초)
    × min(max_tool_iterations, scale_cap)
    scale_cap 기본값 4 → 최대 300 × 4 = 1200초

  run_tool_call_loop():
    = agent loop (LLM + tool 실행 반복)
    max_tool_iterations 까지

취소 처리:
  CancellationToken.cancelled() → LlmExecutionResult::Cancelled
  tool loop 내부에서도 취소 체크

Phase 6: 응답 처리 및 전송

성공 (LlmExecutionResult::Completed(Ok(Ok(response)))):

  Hook: on_message_sending
    → HookResult::Cancel → 응답 억제
    → HookResult::Continue(modified) → 내용 수정 (경로 변경 불가)

  sanitize_channel_response():
    strip_tool_call_tags()
      → <tool_call>, <toolcall>, <tool-call>, <invoke> 블록 제거
    strip_isolated_tool_json_artifacts()
      → 도구 JSON 아티팩트 ({"name":"shell","arguments":{...}}) 제거
    strip_tool_narration()
      → "Let me check...", "I'll fetch...", "Searching..." 제거

  tool_summary 추출:
    history에서 run_tool_call_loop 이후 추가된 turn 스캔
    → [Used tools: shell, web_search]
    Telegram은 이 prefix 없이 저장 (thread 오염 방지)

  히스토리 저장:
    append_sender_turn(history_key, ChatMessage::assistant(response))
    session_store에 JSONL 파일로 영속화

  fire-and-forget consolidation:
    tokio::spawn(consolidate_turn(...))

  전송:
    스트리밍 모드: channel.finalize_draft(reply_target, draft_id, response)
    일반 모드:     channel.send(SendMessage::new(response, reply_target)
                               .in_thread(thread_ts))

오류 케이스:
  Cancelled     → draft 취소, 히스토리 롤백 없음
                  (orphan user turn은 다음 메시지에서 정상화)
  Context 초과  → compact_sender_history() (최근 12개 + 600자 트런케이트)
                → "⚠️ Context window exceeded..." 응답
  LLM 에러      → vision capability 에러면 user turn 롤백
                → 일반 에러는 [Task failed] assistant turn 추가
  Timeout       → [Task timed out] assistant turn 추가
                → "⚠️ Request timed out..." 응답

ack_reactions 마무리:
  channel.remove_reaction(reply_target, msg.id, "👀")
  channel.add_reaction(reply_target, msg.id, "✅" or "⚠️")

히스토리 키 설계

conversation_history_key():
  DM:     "telegram_12345678_alice"
  스레드:  "slack_C123_1741234567_alice"

conversation_memory_key():
  메시지마다 고유: "slack_alice_msg_abc123"
  스레드 있으면:   "slack_1741234567_alice_msg_abc123"

interruption_scope_key():
  scope_id 없음:  "telegram_chat-1_alice"
  scope_id 있음:  "slack_C123_alice_1741234567"
  → scope가 다른 스레드는 서로 취소 안 함

취소 vs 히스토리 보존

Alice: "안녕"      (msg-1 처리 시작, 150ms 응답 예상)
Alice: "무시해"    (50ms 후 도착)

interrupt_on_new_message = true이면:
  msg-1의 CancellationToken.cancel()
  msg-1 완료 대기
  msg-2 처리 시작

  히스토리 상태:
    turns = [user: "안녕"]           ← msg-1 user turn은 이미 push됨
    → msg-2 처리 시 prior_turns에 "안녕"이 포함됨
    → normalize 후 LLM에는 두 user turn이 병합돼서 전달됨
    → "안녕\n\n무시해" 형태

  msg-1이 assistant turn을 push하기 전에 취소됐다면:
    다음 turn에서 normalize_cached_channel_turns()가 정상화

타임아웃 계산

max_tool_iterations = 10
message_timeout_secs = 300
scale_cap = 4 (기본)

timeout_budget = 300 × min(10, 4) = 300 × 4 = 1200초 (20분)

이유: 도구를 많이 쓸수록 더 오래 걸림
     단, scale_cap으로 무제한 증가 방지

관련