research

ZeroClaw — Channel 시스템

ZeroClaw — Channel 시스템

Layer 3 학습. 실제 소스 코드 기반. 파일: src/channels/traits.rs, src/channels/cli.rs, src/channels/mod.rs


전체 구조

Channel trait (계약)
    │
    ├─ TelegramChannel     ← 내 프로젝트 핵심
    ├─ DiscordChannel
    ├─ SlackChannel
    ├─ CliChannel          ← stdin/stdout, 항상 활성화
    ├─ WebhookChannel
    └─ 30여 개 구현체 (WhatsApp, IRC, Email, MQTT, ...)

start_channels()
    → collect_configured_channels() → Vec<Arc<dyn Channel>>
    → 각 channel마다 spawn_supervised_listener() (재시작 포함)
    → 단일 mpsc 버스로 메시지 수렴
    → run_message_dispatch_loop() → process_channel_message()

Channel trait — 계약

#[async_trait]
pub trait Channel: Send + Sync {
    fn name(&self) -> &str;

    // 메시지 보내기
    async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;

    // 메시지 수신 (long-running loop)
    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;

    // 이하 default impl (Ok(()) 반환)
    async fn health_check(&self) -> bool { true }
    async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> { Ok(()) }
    async fn stop_typing(&self, recipient: &str) -> anyhow::Result<()> { Ok(()) }

    // 스트리밍 응답 (초안 편집 방식)
    fn supports_draft_updates(&self) -> bool { false }
    async fn send_draft(&self, message: &SendMessage) -> anyhow::Result<Option<String>> { Ok(None) }
    async fn update_draft(&self, recipient, message_id, text) -> anyhow::Result<()> { Ok(()) }
    async fn finalize_draft(&self, recipient, message_id, text) -> anyhow::Result<()> { Ok(()) }
    async fn cancel_draft(&self, recipient, message_id) -> anyhow::Result<()> { Ok(()) }

    // 이모지 반응
    async fn add_reaction(&self, channel_id, message_id, emoji) -> anyhow::Result<()> { Ok(()) }
    async fn remove_reaction(&self, channel_id, message_id, emoji) -> anyhow::Result<()> { Ok(()) }
}

필수 구현은 세 개뿐: name(), send(), listen() 나머지는 플랫폼이 지원하면 override, 아니면 default noop.


핵심 타입

// 수신 메시지
pub struct ChannelMessage {
    pub id: String,
    pub sender: String,         // 발신자 ID (Telegram user_id 등)
    pub reply_target: String,   // 응답 대상 (chat_id 등)
    pub content: String,        // 텍스트 내용
    pub channel: String,        // 채널 이름 ("telegram")
    pub timestamp: u64,
    pub thread_ts: Option<String>,            // 스레드 답글용 ID
    pub interruption_scope_id: Option<String>, // 취소 범위 식별자
}

// 발신 메시지
pub struct SendMessage {
    pub content: String,
    pub recipient: String,
    pub subject: Option<String>,
    pub thread_ts: Option<String>,  // 스레드 답글
}

CliChannel 구현 — 최소 예시

pub struct CliChannel;

impl Channel for CliChannel {
    fn name(&self) -> &str { "cli" }

    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
        println!("{}", message.content);
        Ok(())
    }

    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
        let mut lines = BufReader::new(io::stdin()).lines();
        while let Ok(Some(line)) = lines.next_line().await {
            if line == "/quit" { break; }
            tx.send(ChannelMessage {
                id: Uuid::new_v4().to_string(),
                sender: "user".into(),
                content: line,
                channel: "cli".into(),
                ...
            }).await?;
        }
        Ok(())
    }
}

메시지 처리 파이프라인

Telegram → listen() → ChannelMessage
Discord  → listen() → ChannelMessage ─┐
Slack    → listen() → ChannelMessage  ├─► mpsc::channel (capacity 100)
...                                   │
                                      ▼
                          run_message_dispatch_loop()
                              Semaphore (최대 64 in-flight)
                              per-sender in-flight 추적
                                      │
                            ┌─────────┴──────────┐
                            │ process_channel_message()
                            │   hook: on_message_received
                            │   runtime command (/model, /new, /stop)
                            │   query classification → 모델 라우팅
                            │   memory recall (sender + group)
                            │   system prompt 구성
                            │   start_typing()
                            │   run_tool_call_loop()  ← agent loop
                            │   stop_typing()
                            │   sanitize_channel_response()
                            │   channel.send() or finalize_draft()
                            │   consolidate_turn() [fire-and-forget]
                            └────────────────────────────────────────

spawn_supervised_listener — 자동 재시작

fn spawn_supervised_listener(ch, tx, initial_backoff, max_backoff) {
    tokio::spawn(async move {
        let mut backoff = initial_backoff; // 기본 2초
        loop {
            // 30초마다 health heartbeat
            let result = select! {
                _ = health.tick() => health::mark_ok(&component),
                result = ch.listen(tx.clone()) => result,
            };

            if tx.is_closed() { break; } // 수신자가 없으면 종료

            match result {
                Ok(_) => {
                    // 정상 종료도 재시작 (예상치 못한 종료)
                    backoff = initial_backoff; // 성공적 실행 후 백오프 리셋
                }
                Err(e) => {
                    health::mark_error(&component, e);
                }
            }

            sleep(backoff).await;
            backoff = (backoff * 2).min(max_backoff); // 지수 백오프 (최대 60초)
        }
    })
}

채널 연결이 끊어지면 자동으로 재연결 시도.


동시성 제어

in-flight 메시지 제한:
  채널 수 × 4, 최소 8, 최대 64

per-sender 취소:
  interrupt_on_new_message = true (Telegram 등) 이면
  같은 sender+channel+thread의 이전 요청을 CancellationToken으로 취소
  → 사용자가 연속으로 메시지를 보내면 이전 것을 취소하고 새 것 처리

/stop 명령:
  진행 중인 요청 즉시 취소
  (LLM 응답 대기 중이어도)

process_channel_message — 핵심 흐름

1. hook: on_message_received (메시지 수정/드롭 가능)
2. runtime command 처리 (/model, /new, /stop)
3. query classification → provider/model 라우팅
4. 메모리 자동 저장 (auto_save = true이면)
5. 대화 히스토리 복원 (per-sender, 최대 50개)
   - stale tool_result 제거
   - [IMAGE:] 마커 비전 미지원 시 제거
   - proactive_trim (400,000자 초과 시 오래된 것 삭제)
6. 메모리 recall (sender scope + group scope)
7. system prompt 구성
   - 채널별 지침 추가 (Telegram Markdown 형식 등)
   - 메모리 컨텍스트 주입
8. 👀 ack_reaction (수신 확인)
9. start_typing() + 주기적 갱신 (4초마다)
10. run_tool_call_loop() ← agent loop (LLM + tool 실행)
11. stop_typing()
12. hook: on_message_sending (응답 수정/드롭 가능)
13. sanitize_channel_response() → <tool_call> 태그 제거
14. channel.send() 또는 finalize_draft()
15. ✅/⚠️ ack_reaction 교체
16. consolidate_turn() [fire-and-forget] → 기억 추출

채널별 특수 처리

Telegram:

- Markdown 렌더링 형식 system prompt 추가
- 미디어 마커: [IMAGE:<path>], [DOCUMENT:<path>], [VOICE:<path>]
- stream_mode 지원 (초안 편집으로 점진적 응답)
- TTS: 텍스트 응답 → 음성 메시지 자동 변환
- 음성 메시지 → 텍스트 자동 변환 후 처리
- /model, /models, /new 런타임 명령 지원
- interrupt_on_new_message: 새 메시지 오면 이전 요청 취소

발신 응답 전 sanitize:

fn sanitize_channel_response(response, tools) {
    let stripped_xml  = strip_tool_call_tags(response);     // <tool_call> 제거
    let stripped_json = strip_isolated_tool_json_artifacts(...); // JSON 아티팩트 제거
    strip_tool_narration(&stripped_json)  // "Let me check..." 같은 도구 서술 제거
}

내 프로젝트 설정

[channels_config.telegram]
bot_token = "..."
allowed_users = ["my_user_id"]
stream_mode = "off"
interrupt_on_new_message = true   # 새 메시지로 이전 응답 취소
ack_reactions = true              # 👀 → ✅ 반응

[channels_config]
show_tool_calls = true            # 도구 사용 시 별도 메시지로 알림
message_timeout_secs = 300        # 요청당 최대 5분
session_persistence = true        # 재시작 후 대화 히스토리 복원

커스텀 Channel 구현

struct LmsNotifyChannel {
    db: Arc<Db>,
}

#[async_trait]
impl Channel for LmsNotifyChannel {
    fn name(&self) -> &str { "lms_notify" }

    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
        // 학교 LMS 알림 발송 (예: 이메일, 푸시)
        self.db.save_notification(&message.recipient, &message.content).await
    }

    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
        // 학교 LMS 폴링 (새 공지 감지 → ChannelMessage 발송)
        loop {
            let notices = self.db.poll_new_notices().await?;
            for notice in notices {
                tx.send(ChannelMessage {
                    content: notice.content,
                    channel: "lms_notify".into(),
                    ...
                }).await?;
            }
            sleep(Duration::from_secs(60)).await;
        }
    }
}

collect_configured_channels()에 추가하면 메인 파이프라인에 자동 통합.


관련