ZeroClaw — Channel 시스템
Layer 3 학습. 실제 소스 코드 기반. 파일:
src/channels/traits.rs,src/channels/cli.rs,src/channels/mod.rs
전체 구조
Channel trait (계약)
│
├─ TelegramChannel ← 내 프로젝트 핵심
├─ DiscordChannel
├─ SlackChannel
├─ CliChannel ← stdin/stdout, 항상 활성화
├─ WebhookChannel
└─ 30여 개 구현체 (WhatsApp, IRC, Email, MQTT, ...)
start_channels()
→ collect_configured_channels() → Vec<Arc<dyn Channel>>
→ 각 channel마다 spawn_supervised_listener() (재시작 포함)
→ 단일 mpsc 버스로 메시지 수렴
→ run_message_dispatch_loop() → process_channel_message()
Channel trait — 계약
#[async_trait]
pub trait Channel: Send + Sync {
fn name(&self) -> &str;
// 메시지 보내기
async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
// 메시지 수신 (long-running loop)
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
// 이하 default impl (Ok(()) 반환)
async fn health_check(&self) -> bool { true }
async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> { Ok(()) }
async fn stop_typing(&self, recipient: &str) -> anyhow::Result<()> { Ok(()) }
// 스트리밍 응답 (초안 편집 방식)
fn supports_draft_updates(&self) -> bool { false }
async fn send_draft(&self, message: &SendMessage) -> anyhow::Result<Option<String>> { Ok(None) }
async fn update_draft(&self, recipient, message_id, text) -> anyhow::Result<()> { Ok(()) }
async fn finalize_draft(&self, recipient, message_id, text) -> anyhow::Result<()> { Ok(()) }
async fn cancel_draft(&self, recipient, message_id) -> anyhow::Result<()> { Ok(()) }
// 이모지 반응
async fn add_reaction(&self, channel_id, message_id, emoji) -> anyhow::Result<()> { Ok(()) }
async fn remove_reaction(&self, channel_id, message_id, emoji) -> anyhow::Result<()> { Ok(()) }
}
필수 구현은 세 개뿐: name(), send(), listen()
나머지는 플랫폼이 지원하면 override, 아니면 default noop.
핵심 타입
// 수신 메시지
pub struct ChannelMessage {
pub id: String,
pub sender: String, // 발신자 ID (Telegram user_id 등)
pub reply_target: String, // 응답 대상 (chat_id 등)
pub content: String, // 텍스트 내용
pub channel: String, // 채널 이름 ("telegram")
pub timestamp: u64,
pub thread_ts: Option<String>, // 스레드 답글용 ID
pub interruption_scope_id: Option<String>, // 취소 범위 식별자
}
// 발신 메시지
pub struct SendMessage {
pub content: String,
pub recipient: String,
pub subject: Option<String>,
pub thread_ts: Option<String>, // 스레드 답글
}
CliChannel 구현 — 최소 예시
pub struct CliChannel;
impl Channel for CliChannel {
fn name(&self) -> &str { "cli" }
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
println!("{}", message.content);
Ok(())
}
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let mut lines = BufReader::new(io::stdin()).lines();
while let Ok(Some(line)) = lines.next_line().await {
if line == "/quit" { break; }
tx.send(ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: "user".into(),
content: line,
channel: "cli".into(),
...
}).await?;
}
Ok(())
}
}
메시지 처리 파이프라인
Telegram → listen() → ChannelMessage
Discord → listen() → ChannelMessage ─┐
Slack → listen() → ChannelMessage ├─► mpsc::channel (capacity 100)
... │
▼
run_message_dispatch_loop()
Semaphore (최대 64 in-flight)
per-sender in-flight 추적
│
┌─────────┴──────────┐
│ process_channel_message()
│ hook: on_message_received
│ runtime command (/model, /new, /stop)
│ query classification → 모델 라우팅
│ memory recall (sender + group)
│ system prompt 구성
│ start_typing()
│ run_tool_call_loop() ← agent loop
│ stop_typing()
│ sanitize_channel_response()
│ channel.send() or finalize_draft()
│ consolidate_turn() [fire-and-forget]
└────────────────────────────────────────
spawn_supervised_listener — 자동 재시작
fn spawn_supervised_listener(ch, tx, initial_backoff, max_backoff) {
tokio::spawn(async move {
let mut backoff = initial_backoff; // 기본 2초
loop {
// 30초마다 health heartbeat
let result = select! {
_ = health.tick() => health::mark_ok(&component),
result = ch.listen(tx.clone()) => result,
};
if tx.is_closed() { break; } // 수신자가 없으면 종료
match result {
Ok(_) => {
// 정상 종료도 재시작 (예상치 못한 종료)
backoff = initial_backoff; // 성공적 실행 후 백오프 리셋
}
Err(e) => {
health::mark_error(&component, e);
}
}
sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff); // 지수 백오프 (최대 60초)
}
})
}
채널 연결이 끊어지면 자동으로 재연결 시도.
동시성 제어
in-flight 메시지 제한:
채널 수 × 4, 최소 8, 최대 64
per-sender 취소:
interrupt_on_new_message = true (Telegram 등) 이면
같은 sender+channel+thread의 이전 요청을 CancellationToken으로 취소
→ 사용자가 연속으로 메시지를 보내면 이전 것을 취소하고 새 것 처리
/stop 명령:
진행 중인 요청 즉시 취소
(LLM 응답 대기 중이어도)
process_channel_message — 핵심 흐름
1. hook: on_message_received (메시지 수정/드롭 가능)
2. runtime command 처리 (/model, /new, /stop)
3. query classification → provider/model 라우팅
4. 메모리 자동 저장 (auto_save = true이면)
5. 대화 히스토리 복원 (per-sender, 최대 50개)
- stale tool_result 제거
- [IMAGE:] 마커 비전 미지원 시 제거
- proactive_trim (400,000자 초과 시 오래된 것 삭제)
6. 메모리 recall (sender scope + group scope)
7. system prompt 구성
- 채널별 지침 추가 (Telegram Markdown 형식 등)
- 메모리 컨텍스트 주입
8. 👀 ack_reaction (수신 확인)
9. start_typing() + 주기적 갱신 (4초마다)
10. run_tool_call_loop() ← agent loop (LLM + tool 실행)
11. stop_typing()
12. hook: on_message_sending (응답 수정/드롭 가능)
13. sanitize_channel_response() → <tool_call> 태그 제거
14. channel.send() 또는 finalize_draft()
15. ✅/⚠️ ack_reaction 교체
16. consolidate_turn() [fire-and-forget] → 기억 추출
채널별 특수 처리
Telegram:
- Markdown 렌더링 형식 system prompt 추가
- 미디어 마커: [IMAGE:<path>], [DOCUMENT:<path>], [VOICE:<path>]
- stream_mode 지원 (초안 편집으로 점진적 응답)
- TTS: 텍스트 응답 → 음성 메시지 자동 변환
- 음성 메시지 → 텍스트 자동 변환 후 처리
- /model, /models, /new 런타임 명령 지원
- interrupt_on_new_message: 새 메시지 오면 이전 요청 취소
발신 응답 전 sanitize:
fn sanitize_channel_response(response, tools) {
let stripped_xml = strip_tool_call_tags(response); // <tool_call> 제거
let stripped_json = strip_isolated_tool_json_artifacts(...); // JSON 아티팩트 제거
strip_tool_narration(&stripped_json) // "Let me check..." 같은 도구 서술 제거
}
내 프로젝트 설정
[channels_config.telegram]
bot_token = "..."
allowed_users = ["my_user_id"]
stream_mode = "off"
interrupt_on_new_message = true # 새 메시지로 이전 응답 취소
ack_reactions = true # 👀 → ✅ 반응
[channels_config]
show_tool_calls = true # 도구 사용 시 별도 메시지로 알림
message_timeout_secs = 300 # 요청당 최대 5분
session_persistence = true # 재시작 후 대화 히스토리 복원
커스텀 Channel 구현
struct LmsNotifyChannel {
db: Arc<Db>,
}
#[async_trait]
impl Channel for LmsNotifyChannel {
fn name(&self) -> &str { "lms_notify" }
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
// 학교 LMS 알림 발송 (예: 이메일, 푸시)
self.db.save_notification(&message.recipient, &message.content).await
}
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
// 학교 LMS 폴링 (새 공지 감지 → ChannelMessage 발송)
loop {
let notices = self.db.poll_new_notices().await?;
for notice in notices {
tx.send(ChannelMessage {
content: notice.content,
channel: "lms_notify".into(),
...
}).await?;
}
sleep(Duration::from_secs(60)).await;
}
}
}
→ collect_configured_channels()에 추가하면 메인 파이프라인에 자동 통합.
관련
- agent-loop — process_channel_message() 내부에서 run_tool_call_loop() 호출
- auto-compaction — consolidate_turn() fire-and-forget
- memory-system — 메모리 recall + 자동 저장
- config-schema — [channels_config] 전체
- overview — ZeroClaw 학습 지도