返回博客列表

CoPaw 微信个人号渠道集成指南

ai
CoPaw微信渠道集成Python

CoPaw 微信个人号渠道集成指南

本文档详细记录了将 CoPaw 对接到微信个人号的完整过程,包括技术架构、实现步骤、关键代码和常见问题解决方案。


目录

  1. 项目背景
  2. 技术架构
  3. 实现步骤
  4. 目录结构
  5. 关键模块说明
  6. 配置说明
  7. CLI 命令
  8. 前端集成
  9. 常见问题与解决方案
  10. 参考资源

项目背景

目标

weixin-openclaw-cli(微信个人号 OpenClaw 插件)适配为 CoPaw 的 WeixinChannel 渠道实现,实现:

  • 微信消息接收与处理
  • 消息回复发送
  • 扫码登录管理
  • 多媒体消息支持(图片、视频、文件)

技术来源

参考腾讯官方提供的 @tencent-weixin/openclaw-weixin npm 包实现:

node_modules/@tencent-weixin/openclaw-weixin/
├── src/
│   ├── api/          # API 客户端
│   ├── auth/         # 登录认证
│   ├── cdn/          # CDN 上传下载
│   ├── channel.ts    # 渠道主入口
│   ├── messaging/    # 消息处理
│   ├── monitor/      # 消息监控
│   └── types/        # 类型定义

技术架构

API 端点

端点说明
https://ilinkai.weixin.qq.com微信 iLink API 基础 URL
https://novac2c.cdn.weixin.qq.com/c2cCDN 端点(媒体上传下载)

核心 API 接口

接口说明
ilink/bot/get_bot_qrcode获取登录二维码
ilink/bot/get_qrcode_status查询二维码状态
ilink/bot/getupdates长轮询获取新消息
ilink/bot/sendmessage发送消息
ilink/bot/getuploadurl获取 CDN 上传 URL
ilink/bot/getconfig获取账号配置
ilink/bot/sendtyping发送输入状态

关键技术点

  1. 媒体加密: AES-128-ECB,需要 PKCS7 填充
  2. context_token: 每条消息携带,回复时必须回传
  3. bot_type: 默认 "3"(当前渠道构建版本)
  4. get_updates_buf: 同步缓冲,用于增量获取消息
  5. Session 过期: 错误码 200011,需要重新登录

消息流程

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  微信用户   │ ←→  │  iLink API  │ ←→  │  CoPaw      │
└─────────────┘     └─────────────┘     └─────────────┘
       ↑                   ↑                   │
       │                   │                   ↓
       │            长轮询 getupdates    ChannelManager
       │                   │                   │
       │                   ↓                   ↓
       │            消息入队 ←────────── WeixinChannel
       │                   │                   │
       └───────────────────┴───────────────────┘
                     sendmessage

实现步骤

步骤 1: 创建目录结构

mkdir -p src/copaw/app/channels/weixin

步骤 2: 实现核心模块

按以下顺序实现各模块:

  1. compat.py - 兼容性检查
  2. types.py - 类型定义
  3. account.py - 账号管理
  4. api.py - API 客户端
  5. auth.py - 扫码登录
  6. cdn.py - CDN 上传下载
  7. channel.py - 渠道主类

步骤 3: 注册渠道

src/copaw/app/channels/registry.py 中注册:

_BUILTIN_SPECS: dict[str, tuple[str, str]] = {
    # ... 其他渠道
    "weixin": (".weixin", "WeixinChannel"),
}

步骤 4: 添加依赖

pyproject.toml 中添加:

dependencies = [
    # ... 其他依赖
    "qrcode>=7.4.0",
]

步骤 5: 实现 CLI 命令

创建 src/copaw/cli/weixin_cmd.py 提供命令行工具。

步骤 6: 前端集成

console/src/pages/Control/Channels/components/constants.ts 中添加微信渠道配置。


目录结构

src/copaw/app/channels/weixin/
├── __init__.py          # 模块导出
├── compat.py            # 兼容性检查
├── types.py             # 类型定义
├── account.py           # 账号管理
├── api.py               # API 客户端
├── auth.py              # 扫码登录
├── cdn.py               # CDN 上传下载
└── channel.py           # 渠道主类

src/copaw/cli/
└── weixin_cmd.py        # CLI 命令


关键模块说明

1. compat.py - 兼容性检查

"""版本兼容性检查模块。"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

# CoPaw 版本要求
MIN_COPAW_VERSION = "0.2.0"

@dataclass
class CompatEntry:
    """兼容性条目。"""
    copaw_version: str
    plugin_version: str
    label: str

# 兼容性矩阵
COMPAT_MATRIX: list[CompatEntry] = [
    CompatEntry("0.2.0", "1.0.0", "Initial release"),
]

def get_copaw_version() -> str:
    """获取 CoPaw 版本。"""
    from copaw.__version__ import __version__
    return __version__

def find_compat_entry(version: str) -> Optional[CompatEntry]:
    """查找兼容性条目。"""
    for entry in reversed(COMPAT_MATRIX):
        if version >= entry.copaw_version:
            return entry
    return None

def assert_host_compat() -> None:
    """检查宿主版本兼容性。"""
    version = get_copaw_version()
    entry = find_compat_entry(version)
    if not entry:
        raise RuntimeError(
            f"CoPaw {version} is not compatible with this plugin. "
            f"Minimum required: {MIN_COPAW_VERSION}"
        )

2. types.py - 类型定义

"""微信消息类型定义。"""

from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Optional

class MessageType(IntEnum):
    """消息类型。"""
    BOT = 3

class MessageState(IntEnum):
    """消息状态。"""
    FINISH = 5

class MessageItemType(IntEnum):
    """消息项类型。"""
    TEXT = 1
    IMAGE = 2
    VOICE = 4
    VIDEO = 5
    FILE = 6

@dataclass
class TextItem:
    """文本项。"""
    text: Optional[str] = None

@dataclass
class MediaItem:
    """媒体项。"""
    encrypt_query_param: Optional[str] = None
    aes_key: Optional[str] = None
    encrypt_type: Optional[str] = None

@dataclass
class ImageItem:
    """图片项。"""
    media: Optional[MediaItem] = None
    url: Optional[str] = None
    aeskey: Optional[str] = None

@dataclass
class MessageItem:
    """消息项。"""
    type: Optional[int] = None
    text_item: Optional[TextItem] = None
    image_item: Optional[ImageItem] = None
    voice_item: Optional[VoiceItem] = None
    video_item: Optional[VideoItem] = None
    file_item: Optional[FileItem] = None

@dataclass
class WeixinMessage:
    """微信消息。"""
    from_user_id: Optional[str] = None
    to_user_id: Optional[str] = None
    message_type: Optional[int] = None
    message_state: Optional[int] = None
    context_token: Optional[str] = None
    item_list: Optional[list[MessageItem]] = None
    seq: Optional[int] = None
    message_id: Optional[str] = None
    create_time_ms: Optional[int] = None

3. account.py - 账号管理

"""微信账号管理模块。"""

import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

@dataclass
class WeixinAccountData:
    """微信账号数据。"""
    token: Optional[str] = None
    saved_at: Optional[str] = None
    base_url: Optional[str] = None
    user_id: Optional[str] = None

@dataclass
class ResolvedWeixinAccount:
    """解析后的微信账号。"""
    account_id: str
    base_url: str
    cdn_base_url: str
    token: Optional[str] = None
    enabled: bool = True
    configured: bool = False
    name: Optional[str] = None
    user_id: Optional[str] = None

def _get_state_dir() -> Path:
    """获取状态存储目录。"""
    from copaw.constant import WORKING_DIR
    return WORKING_DIR / "weixin"

def list_weixin_account_ids() -> list[str]:
    """列出所有微信账号 ID。"""
    index_path = _get_state_dir() / "accounts.json"
    if not index_path.exists():
        return []
    with open(index_path) as f:
        return json.load(f)

def save_weixin_account(
    account_id: str,
    token: Optional[str] = None,
    base_url: Optional[str] = None,
    user_id: Optional[str] = None,
) -> None:
    """保存微信账号数据。"""
    account_id = _normalize_account_id(account_id)
    accounts_dir = _get_state_dir() / "accounts"
    accounts_dir.mkdir(parents=True, exist_ok=True)
    
    data = WeixinAccountData(
        token=token,
        base_url=base_url,
        user_id=user_id,
        saved_at=datetime.now().isoformat(),
    )
    
    account_path = accounts_dir / f"{account_id}.json"
    with open(account_path, "w") as f:
        json.dump(data.__dict__, f, indent=2)
    
    # 更新索引
    _register_account_id(account_id)

def resolve_weixin_account(
    account_id: str,
    cdn_base_url: Optional[str] = None,
) -> ResolvedWeixinAccount:
    """解析账号信息。"""
    account_id = _normalize_account_id(account_id)
    account_data = load_weixin_account(account_id)
    
    token = account_data.token if account_data else None
    base_url = (account_data.base_url if account_data else None) or DEFAULT_BASE_URL
    user_id = account_data.user_id if account_data else None
    
    return ResolvedWeixinAccount(
        account_id=account_id,
        base_url=base_url,
        cdn_base_url=cdn_base_url or CDN_BASE_URL,
        token=token,
        enabled=True,
        configured=bool(token),
        user_id=user_id,
    )

def update_workspace_weixin_account_id(new_account_id: str) -> list[str]:
    """更新所有 workspace 配置文件中的微信 account_id。
    
    当扫码登录生成新的 bot_id 时,自动更新配置文件。
    """
    from copaw.constant import WORKING_DIR
    
    workspaces_dir = WORKING_DIR / "workspaces"
    updated_workspaces: list[str] = []
    
    for workspace_dir in workspaces_dir.iterdir():
        agent_json = workspace_dir / "agent.json"
        if not agent_json.exists():
            continue
        
        with open(agent_json) as f:
            config = json.load(f)
        
        weixin_cfg = config.get("channels", {}).get("weixin", {})
        if not weixin_cfg:
            continue
        
        old_account_id = weixin_cfg.get("account_id")
        if old_account_id == new_account_id:
            continue
        
        weixin_cfg["account_id"] = new_account_id
        
        with open(agent_json, "w") as f:
            json.dump(config, f, indent=2, ensure_ascii=False)
        
        updated_workspaces.append(workspace_dir.name)
    
    return updated_workspaces

4. api.py - API 客户端

"""微信 iLink API 客户端。"""

import asyncio
import json
import secrets
from typing import Optional

import httpx

DEFAULT_BASE_URL = "https://ilinkai.weixin.qq.com"
CDN_BASE_URL = "https://novac2c.cdn.weixin.qq.com/c2c"
DEFAULT_LONG_POLL_TIMEOUT_MS = 35000

class WeixinApiClient:
    """微信 iLink API 客户端。"""

    def __init__(
        self,
        base_url: str = DEFAULT_BASE_URL,
        token: Optional[str] = None,
        long_poll_timeout_ms: int = DEFAULT_LONG_POLL_TIMEOUT_MS,
    ):
        self.base_url = base_url.rstrip("/")
        self.token = token
        self.long_poll_timeout_ms = long_poll_timeout_ms
        self._client = httpx.AsyncClient(timeout=httpx.Timeout(60.0))

    def _build_headers(self, body: str) -> dict[str, str]:
        """构建请求头。"""
        headers = {
            "Content-Type": "application/json",
            "AuthorizationType": "ilink_bot_token",
            "X-WECHAT-UIN": self._random_wechat_uin(),
        }
        if self.token:
            headers["Authorization"] = f"Bearer {self.token}"
        return headers

    @staticmethod
    def _random_wechat_uin() -> str:
        """生成随机 X-WECHAT-UIN 头。"""
        import base64
        uint32 = secrets.randbits(32)
        return base64.b64encode(str(uint32).encode()).decode()

    async def get_updates(
        self,
        get_updates_buf: str = "",
        timeout_ms: Optional[int] = None,
    ) -> GetUpdatesResp:
        """长轮询获取新消息。"""
        body = json.dumps({
            "get_updates_buf": get_updates_buf,
            "base_info": {"channel_version": "copaw-weixin-1.0.0"},
        })
        
        timeout = timeout_ms or self.long_poll_timeout_ms
        
        try:
            raw_text = await self._api_fetch(
                "ilink/bot/getupdates",
                body,
                timeout_ms=timeout,
            )
            data = json.loads(raw_text)
            return parse_get_updates_resp(data)
        except (asyncio.TimeoutError, httpx.ReadTimeout):
            # 长轮询超时是正常的
            return GetUpdatesResp(ret=0, get_updates_buf=get_updates_buf)

    async def send_message(self, msg: WeixinMessage) -> None:
        """发送消息。"""
        body_dict = {
            "msg": weixin_message_to_dict(msg),
            "base_info": {"channel_version": "copaw-weixin-1.0.0"},
        }
        body = json.dumps(body_dict, ensure_ascii=False)
        await self._api_fetch("ilink/bot/sendmessage", body)

    async def close(self) -> None:
        """关闭客户端。"""
        await self._client.aclose()

5. auth.py - 扫码登录

"""微信扫码登录模块。"""

import asyncio
from typing import Optional

from .api import DEFAULT_BASE_URL, fetch_qr_code, poll_qr_status
from .account import (
    save_weixin_account,
    register_weixin_account_id,
    clear_context_tokens_for_account,
    update_workspace_weixin_account_id,
)

DEFAULT_LOGIN_TIMEOUT_MS = 480_000  # 8 分钟

def print_qr_code(url: str) -> None:
    """在终端打印 ASCII 二维码。"""
    try:
        import qrcode
        qr = qrcode.QRCode(version=1, box_size=1, border=1)
        qr.add_data(url)
        qr.make(fit=True)
        qr.print_ascii(invert=True)
    except ImportError:
        print(f"二维码链接: {url}")

async def login_with_qr(
    base_url: Optional[str] = None,
    account_id: Optional[str] = None,
    timeout_ms: int = DEFAULT_LOGIN_TIMEOUT_MS,
) -> WeixinQrWaitResult:
    """执行完整的扫码登录流程。"""
    base_url = base_url or DEFAULT_BASE_URL
    
    # 1. 获取二维码
    qr_response = await fetch_qr_code(base_url, bot_type="3")
    qrcode_url = qr_response.get("qrcode_img_content")
    
    if not qrcode_url:
        return WeixinQrWaitResult(message="获取二维码失败")
    
    # 2. 显示二维码
    print("\n使用微信扫描以下二维码:\n")
    print_qr_code(qrcode_url)
    
    # 3. 等待扫码
    qrcode = qr_response.get("qrcode")
    result = await poll_qr_status(base_url, qrcode, timeout_ms)
    
    # 4. 保存账号
    if result.connected and result.bot_token and result.account_id:
        save_weixin_account(
            result.account_id,
            token=result.bot_token,
            base_url=result.base_url,
            user_id=result.user_id,
        )
        register_weixin_account_id(result.account_id)
        
        # 更新配置文件
        update_workspace_weixin_account_id(result.account_id)
        
        print("\n✅ 与微信连接成功!")
    
    return result

6. channel.py - 渠道主类

"""微信个人号渠道实现。"""

import asyncio
import logging
from typing import Any, Optional

from .account import (
    list_weixin_account_ids,
    load_sync_buf,
    resolve_weixin_account,
    restore_context_tokens,
    save_sync_buf,
    set_context_token,
    get_context_token,
)
from .api import WeixinApiClient, DEFAULT_BASE_URL
from .types import MessageItemType, MessageType, MessageState, WeixinMessage
from ..base import BaseChannel
from ..schema import ChannelType

logger = logging.getLogger(__name__)

WEIXIN_CHANNEL: ChannelType = "weixin"
SESSION_EXPIRED_ERRCODE = 200011

class WeixinChannel(BaseChannel):
    """微信个人号渠道。"""

    channel: ChannelType = WEIXIN_CHANNEL
    display_name = "WeChat"
    uses_manager_queue: bool = True

    def __init__(
        self,
        process,
        enabled: bool = True,
        bot_prefix: str = "",
        base_url: str = DEFAULT_BASE_URL,
        account_id: Optional[str] = None,
        **kwargs,
    ):
        super().__init__(process, **kwargs)
        self.enabled = enabled
        self.bot_prefix = bot_prefix
        self.base_url = base_url
        self.account_id = account_id
        
        self._client: Optional[WeixinApiClient] = None
        self._account: Optional[ResolvedWeixinAccount] = None
        self._monitor_task: Optional[asyncio.Task] = None
        self._abort_event = asyncio.Event()

    @classmethod
    def from_env(cls, process, on_reply_sent=None) -> "WeixinChannel":
        """从环境变量创建渠道。"""
        import os
        from .account import list_weixin_account_ids
        
        # 自动检测:如果有账号配置则默认启用
        env_enabled = os.getenv("WEIXIN_CHANNEL_ENABLED", "")
        if env_enabled:
            enabled = env_enabled == "1"
        else:
            enabled = len(list_weixin_account_ids()) > 0
        
        return cls(
            process=process,
            enabled=enabled,
            bot_prefix=os.getenv("WEIXIN_BOT_PREFIX", ""),
            base_url=os.getenv("WEIXIN_BASE_URL", DEFAULT_BASE_URL),
            account_id=os.getenv("WEIXIN_ACCOUNT_ID"),
            on_reply_sent=on_reply_sent,
        )

    async def start(self) -> None:
        """启动渠道。"""
        if not self.enabled:
            return
        
        if not self._enqueue:
            logger.error("❌ _enqueue callback not set!")
            return
        
        account_ids = list_weixin_account_ids()
        if not account_ids:
            logger.warning("No Weixin accounts. Run: copaw weixin login")
            return
        
        account_id = self.account_id or account_ids[0]
        self._account = resolve_weixin_account(account_id)
        
        if not self._account.configured:
            logger.warning(f"Account {account_id} not configured. Run: copaw weixin login")
            return
        
        restore_context_tokens(account_id)
        
        self._client = WeixinApiClient(
            base_url=self._account.base_url,
            token=self._account.token,
        )
        
        self._abort_event.clear()
        self._monitor_task = asyncio.create_task(self._monitor_loop())
        
        logger.info(f"✅ WeixinChannel 已启动: account={account_id}")

    async def stop(self) -> None:
        """停止渠道。"""
        self._abort_event.set()
        if self._monitor_task:
            self._monitor_task.cancel()
            try:
                await self._monitor_task
            except asyncio.CancelledError:
                pass
            self._monitor_task = None
        
        if self._client:
            await self._client.close()
            self._client = None

    async def _monitor_loop(self) -> None:
        """长轮询监控循环。"""
        account_id = self._account.account_id
        get_updates_buf = load_sync_buf(account_id)
        
        while not self._abort_event.is_set():
            try:
                resp = await self._client.get_updates(get_updates_buf=get_updates_buf)
                
                # 检查错误
                is_api_error = (resp.ret is not None and resp.ret != 0) or (
                    resp.errcode is not None and resp.errcode != 0
                )
                
                if is_api_error:
                    if resp.errcode == SESSION_EXPIRED_ERRCODE:
                        logger.error("Session expired. Run: copaw weixin login")
                        break
                    continue
                
                # 更新同步缓冲
                if resp.get_updates_buf:
                    save_sync_buf(account_id, resp.get_updates_buf)
                    get_updates_buf = resp.get_updates_buf
                
                # 处理消息
                for msg in resp.msgs or []:
                    await self._process_inbound_message(msg)
                    
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.exception(f"Monitor loop error: {e}")
                await asyncio.sleep(2)

    async def _process_inbound_message(self, msg: WeixinMessage) -> None:
        """处理入站消息。"""
        from_user_id = msg.from_user_id
        if not from_user_id:
            return
        
        logger.info(f"📥 收到微信消息: from={from_user_id}")
        
        # 保存 context token
        if msg.context_token:
            set_context_token(self._account.account_id, from_user_id, msg.context_token)
        
        # 构建消息内容
        content_parts = []
        for item in msg.item_list or []:
            if item.type == MessageItemType.TEXT and item.text_item:
                content_parts.append({
                    "type": "text",
                    "text": item.text_item.text,
                })
        
        if not content_parts:
            return
        
        # 入队处理
        native = {
            "channel_id": self.channel,
            "sender_id": from_user_id,
            "session_id": f"weixin:{from_user_id}",
            "content_parts": content_parts,
            "meta": {
                "weixin_from_user_id": from_user_id,
                "weixin_context_token": msg.context_token,
            },
        }
        
        if self._enqueue:
            self._enqueue(native)

    async def send(self, to_handle: str, text: str, meta=None) -> None:
        """发送文本消息。"""
        if not self._client or not self._account:
            return
        
        to_user_id = self._parse_user_id(to_handle)
        context_token = get_context_token(self._account.account_id, to_user_id)
        
        msg = WeixinMessage(
            to_user_id=to_user_id,
            client_id=self._generate_client_id(),
            message_type=MessageType.BOT,
            message_state=MessageState.FINISH,
            context_token=context_token,
            item_list=[{
                "type": MessageItemType.TEXT,
                "text_item": {"text": text},
            }],
        )
        
        await self._client.send_message(msg)
        logger.info(f"📤 发送微信消息: to={to_user_id}")

    def _parse_user_id(self, to_handle: str) -> str:
        """解析用户 ID。"""
        if to_handle.startswith("weixin:"):
            return to_handle[7:]
        return to_handle

配置说明

环境变量

变量说明默认值
WEIXIN_CHANNEL_ENABLED是否启用渠道自动检测
WEIXIN_BOT_PREFIX消息前缀
WEIXIN_BASE_URLAPI 基础 URLhttps://ilinkai.weixin.qq.com
WEIXIN_ACCOUNT_ID指定账号 ID第一个账号

配置文件

~/.copaw/workspaces/<agent_id>/agent.json 中配置:

{
  "channels": {
    "weixin": {
      "enabled": true,
      "bot_prefix": "",
      "base_url": "https://ilinkai.weixin.qq.com",
      "account_id": "xxx-im-bot"
    }
  }
}

账号数据存储

账号数据存储在 ~/.copaw/weixin/ 目录:

~/.copaw/weixin/
├── accounts.json           # 账号索引
└── accounts/
    └── xxx-im-bot.json     # 账号数据

账号数据文件格式:

{
  "token": "xxx@im.bot:xxx",
  "base_url": "https://ilinkai.weixin.qq.com",
  "user_id": "xxx@im.wechat",
  "saved_at": "2026-03-25T00:00:00"
}

CLI 命令

安装命令

# 安装并登录
copaw weixin install

# 扫码登录
copaw weixin login

# 查看状态
copaw weixin status

# 列出账号
copaw weixin accounts

渠道命令

# 通过渠道命令登录
copaw channels login --channel weixin

# 配置渠道
copaw channels config weixin --enabled true

前端集成

添加渠道标签

console/src/pages/Control/Channels/components/constants.ts 中添加:

export const CHANNEL_TAGS: Record<string, ChannelTagConfig> = {
  // ... 其他渠道
  weixin: {
    label: 'WeChat',
    color: '#07C160',
    icon: WechatOutlined,
  },
};

渠道配置表单

前端会自动渲染渠道配置表单,支持以下字段:

  • enabled - 是否启用
  • bot_prefix - 消息前缀
  • base_url - API 基础 URL
  • account_id - 账号 ID

常见问题与解决方案

1. 渠道未启动

症状: 日志显示 WeixinChannel is disabled

原因: 没有检测到账号或配置未启用

解决方案:

# 登录账号
copaw weixin login

# 或设置环境变量
export WEIXIN_CHANNEL_ENABLED=1

2. 账号未配置

症状: 日志显示 Account xxx not configured. Run login first.

原因: 配置文件中的 account_id 与实际登录的账号不匹配

解决方案:

# 重新登录
copaw weixin login

# 或手动更新配置文件中的 account_id

3. Session 过期

症状: 日志显示 Session expired 或错误码 200011

原因: 登录凭证过期

解决方案:

copaw weixin login

4. getUpdates 失败

症状: 日志显示 getUpdates failed: ret=None errcode=None

原因: 错误检查逻辑问题

解决方案: 确保使用正确的错误检查逻辑:

is_api_error = (resp.ret is not None and resp.ret != 0) or (
    resp.errcode is not None and resp.errcode != 0
)

5. 长轮询超时

症状: httpx.ReadTimeout 异常

原因: 超时异常类型不匹配

解决方案: 捕获正确的异常类型:

except (asyncio.TimeoutError, httpx.ReadTimeout):
    # 长轮询超时是正常的
    pass

6. 消息未发送

症状: 收到消息但没有回复

原因: 可能是 Agent 处理问题或模型配置问题

排查步骤:

  1. 检查模型配置是否正确
  2. 查看是否有 Agent 错误日志
  3. 检查 on_event_message_completed 是否被调用

参考资源

源码参考

  • @tencent-weixin/openclaw-weixin - 腾讯官方 OpenClaw 微信插件
  • src/copaw/app/channels/weixin/ - CoPaw 微信渠道实现

相关文件

文件说明
src/copaw/app/channels/weixin/channel.py渠道主类
src/copaw/app/channels/weixin/api.pyAPI 客户端
src/copaw/app/channels/weixin/auth.py登录认证
src/copaw/app/channels/weixin/account.py账号管理
src/copaw/app/channels/weixin/types.py类型定义
src/copaw/cli/weixin_cmd.pyCLI 命令

更新日志

v1.0.0 (2026-03-25)

  • 初始实现
  • 支持文本消息收发
  • 支持扫码登录
  • 支持账号管理
  • 支持自动更新配置文件
  • 添加 CLI 命令
  • 前端集成

贡献

欢迎提交 Issue 和 Pull Request!

开发环境

# 克隆仓库
git clone https://github.com/copaw/copaw.git
cd copaw

# 安装依赖
pip install -e ".[dev]"

# 运行测试
pytest tests/channels/weixin/

代码风格

  • 使用 Python 3.10+ 语法
  • 遵循 PEP 8 规范
  • 使用 type hints
  • 添加 docstring