CoPaw 微信个人号渠道集成指南
本文档详细记录了将 CoPaw 对接到微信个人号的完整过程,包括技术架构、实现步骤、关键代码和常见问题解决方案。
目录
项目背景
目标
将 weixin-openclaw-cli(微信个人号 OpenClaw 插件)适配为 CoPaw 的 WeixinChannel 渠道实现,实现:
- 微信消息接收与处理
- 消息回复发送
- 扫码登录管理
- 多媒体消息支持(图片、视频、文件)
技术来源
参考腾讯官方提供的 @tencent-weixin/openclaw-weixin npm 包实现:
node_modules/@tencent-weixin/openclaw-weixin/
├── src/
│ ├── api/ # API 客户端
│ ├── auth/ # 登录认证
│ ├── cdn/ # CDN 上传下载
│ ├── channel.ts # 渠道主入口
│ ├── messaging/ # 消息处理
│ ├── monitor/ # 消息监控
│ └── types/ # 类型定义
技术架构
API 端点
| 端点 | 说明 |
|---|---|
https://ilinkai.weixin.qq.com | 微信 iLink API 基础 URL |
https://novac2c.cdn.weixin.qq.com/c2c | CDN 端点(媒体上传下载) |
核心 API 接口
| 接口 | 说明 |
|---|---|
ilink/bot/get_bot_qrcode | 获取登录二维码 |
ilink/bot/get_qrcode_status | 查询二维码状态 |
ilink/bot/getupdates | 长轮询获取新消息 |
ilink/bot/sendmessage | 发送消息 |
ilink/bot/getuploadurl | 获取 CDN 上传 URL |
ilink/bot/getconfig | 获取账号配置 |
ilink/bot/sendtyping | 发送输入状态 |
关键技术点
- 媒体加密: AES-128-ECB,需要 PKCS7 填充
- context_token: 每条消息携带,回复时必须回传
- bot_type: 默认 "3"(当前渠道构建版本)
- get_updates_buf: 同步缓冲,用于增量获取消息
- Session 过期: 错误码 200011,需要重新登录
消息流程
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 微信用户 │ ←→ │ iLink API │ ←→ │ CoPaw │
└─────────────┘ └─────────────┘ └─────────────┘
↑ ↑ │
│ │ ↓
│ 长轮询 getupdates ChannelManager
│ │ │
│ ↓ ↓
│ 消息入队 ←────────── WeixinChannel
│ │ │
└───────────────────┴───────────────────┘
sendmessage
实现步骤
步骤 1: 创建目录结构
mkdir -p src/copaw/app/channels/weixin
步骤 2: 实现核心模块
按以下顺序实现各模块:
- compat.py - 兼容性检查
- types.py - 类型定义
- account.py - 账号管理
- api.py - API 客户端
- auth.py - 扫码登录
- cdn.py - CDN 上传下载
- channel.py - 渠道主类
步骤 3: 注册渠道
在 src/copaw/app/channels/registry.py 中注册:
_BUILTIN_SPECS: dict[str, tuple[str, str]] = {
# ... 其他渠道
"weixin": (".weixin", "WeixinChannel"),
}
步骤 4: 添加依赖
在 pyproject.toml 中添加:
dependencies = [
# ... 其他依赖
"qrcode>=7.4.0",
]
步骤 5: 实现 CLI 命令
创建 src/copaw/cli/weixin_cmd.py 提供命令行工具。
步骤 6: 前端集成
在 console/src/pages/Control/Channels/components/constants.ts 中添加微信渠道配置。
目录结构
src/copaw/app/channels/weixin/
├── __init__.py # 模块导出
├── compat.py # 兼容性检查
├── types.py # 类型定义
├── account.py # 账号管理
├── api.py # API 客户端
├── auth.py # 扫码登录
├── cdn.py # CDN 上传下载
└── channel.py # 渠道主类
src/copaw/cli/
└── weixin_cmd.py # CLI 命令
关键模块说明
1. compat.py - 兼容性检查
"""版本兼容性检查模块。"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
# CoPaw 版本要求
MIN_COPAW_VERSION = "0.2.0"
@dataclass
class CompatEntry:
"""兼容性条目。"""
copaw_version: str
plugin_version: str
label: str
# 兼容性矩阵
COMPAT_MATRIX: list[CompatEntry] = [
CompatEntry("0.2.0", "1.0.0", "Initial release"),
]
def get_copaw_version() -> str:
"""获取 CoPaw 版本。"""
from copaw.__version__ import __version__
return __version__
def find_compat_entry(version: str) -> Optional[CompatEntry]:
"""查找兼容性条目。"""
for entry in reversed(COMPAT_MATRIX):
if version >= entry.copaw_version:
return entry
return None
def assert_host_compat() -> None:
"""检查宿主版本兼容性。"""
version = get_copaw_version()
entry = find_compat_entry(version)
if not entry:
raise RuntimeError(
f"CoPaw {version} is not compatible with this plugin. "
f"Minimum required: {MIN_COPAW_VERSION}"
)
2. types.py - 类型定义
"""微信消息类型定义。"""
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Optional
class MessageType(IntEnum):
"""消息类型。"""
BOT = 3
class MessageState(IntEnum):
"""消息状态。"""
FINISH = 5
class MessageItemType(IntEnum):
"""消息项类型。"""
TEXT = 1
IMAGE = 2
VOICE = 4
VIDEO = 5
FILE = 6
@dataclass
class TextItem:
"""文本项。"""
text: Optional[str] = None
@dataclass
class MediaItem:
"""媒体项。"""
encrypt_query_param: Optional[str] = None
aes_key: Optional[str] = None
encrypt_type: Optional[str] = None
@dataclass
class ImageItem:
"""图片项。"""
media: Optional[MediaItem] = None
url: Optional[str] = None
aeskey: Optional[str] = None
@dataclass
class MessageItem:
"""消息项。"""
type: Optional[int] = None
text_item: Optional[TextItem] = None
image_item: Optional[ImageItem] = None
voice_item: Optional[VoiceItem] = None
video_item: Optional[VideoItem] = None
file_item: Optional[FileItem] = None
@dataclass
class WeixinMessage:
"""微信消息。"""
from_user_id: Optional[str] = None
to_user_id: Optional[str] = None
message_type: Optional[int] = None
message_state: Optional[int] = None
context_token: Optional[str] = None
item_list: Optional[list[MessageItem]] = None
seq: Optional[int] = None
message_id: Optional[str] = None
create_time_ms: Optional[int] = None
3. account.py - 账号管理
"""微信账号管理模块。"""
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
@dataclass
class WeixinAccountData:
"""微信账号数据。"""
token: Optional[str] = None
saved_at: Optional[str] = None
base_url: Optional[str] = None
user_id: Optional[str] = None
@dataclass
class ResolvedWeixinAccount:
"""解析后的微信账号。"""
account_id: str
base_url: str
cdn_base_url: str
token: Optional[str] = None
enabled: bool = True
configured: bool = False
name: Optional[str] = None
user_id: Optional[str] = None
def _get_state_dir() -> Path:
"""获取状态存储目录。"""
from copaw.constant import WORKING_DIR
return WORKING_DIR / "weixin"
def list_weixin_account_ids() -> list[str]:
"""列出所有微信账号 ID。"""
index_path = _get_state_dir() / "accounts.json"
if not index_path.exists():
return []
with open(index_path) as f:
return json.load(f)
def save_weixin_account(
account_id: str,
token: Optional[str] = None,
base_url: Optional[str] = None,
user_id: Optional[str] = None,
) -> None:
"""保存微信账号数据。"""
account_id = _normalize_account_id(account_id)
accounts_dir = _get_state_dir() / "accounts"
accounts_dir.mkdir(parents=True, exist_ok=True)
data = WeixinAccountData(
token=token,
base_url=base_url,
user_id=user_id,
saved_at=datetime.now().isoformat(),
)
account_path = accounts_dir / f"{account_id}.json"
with open(account_path, "w") as f:
json.dump(data.__dict__, f, indent=2)
# 更新索引
_register_account_id(account_id)
def resolve_weixin_account(
account_id: str,
cdn_base_url: Optional[str] = None,
) -> ResolvedWeixinAccount:
"""解析账号信息。"""
account_id = _normalize_account_id(account_id)
account_data = load_weixin_account(account_id)
token = account_data.token if account_data else None
base_url = (account_data.base_url if account_data else None) or DEFAULT_BASE_URL
user_id = account_data.user_id if account_data else None
return ResolvedWeixinAccount(
account_id=account_id,
base_url=base_url,
cdn_base_url=cdn_base_url or CDN_BASE_URL,
token=token,
enabled=True,
configured=bool(token),
user_id=user_id,
)
def update_workspace_weixin_account_id(new_account_id: str) -> list[str]:
"""更新所有 workspace 配置文件中的微信 account_id。
当扫码登录生成新的 bot_id 时,自动更新配置文件。
"""
from copaw.constant import WORKING_DIR
workspaces_dir = WORKING_DIR / "workspaces"
updated_workspaces: list[str] = []
for workspace_dir in workspaces_dir.iterdir():
agent_json = workspace_dir / "agent.json"
if not agent_json.exists():
continue
with open(agent_json) as f:
config = json.load(f)
weixin_cfg = config.get("channels", {}).get("weixin", {})
if not weixin_cfg:
continue
old_account_id = weixin_cfg.get("account_id")
if old_account_id == new_account_id:
continue
weixin_cfg["account_id"] = new_account_id
with open(agent_json, "w") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
updated_workspaces.append(workspace_dir.name)
return updated_workspaces
4. api.py - API 客户端
"""微信 iLink API 客户端。"""
import asyncio
import json
import secrets
from typing import Optional
import httpx
DEFAULT_BASE_URL = "https://ilinkai.weixin.qq.com"
CDN_BASE_URL = "https://novac2c.cdn.weixin.qq.com/c2c"
DEFAULT_LONG_POLL_TIMEOUT_MS = 35000
class WeixinApiClient:
"""微信 iLink API 客户端。"""
def __init__(
self,
base_url: str = DEFAULT_BASE_URL,
token: Optional[str] = None,
long_poll_timeout_ms: int = DEFAULT_LONG_POLL_TIMEOUT_MS,
):
self.base_url = base_url.rstrip("/")
self.token = token
self.long_poll_timeout_ms = long_poll_timeout_ms
self._client = httpx.AsyncClient(timeout=httpx.Timeout(60.0))
def _build_headers(self, body: str) -> dict[str, str]:
"""构建请求头。"""
headers = {
"Content-Type": "application/json",
"AuthorizationType": "ilink_bot_token",
"X-WECHAT-UIN": self._random_wechat_uin(),
}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
return headers
@staticmethod
def _random_wechat_uin() -> str:
"""生成随机 X-WECHAT-UIN 头。"""
import base64
uint32 = secrets.randbits(32)
return base64.b64encode(str(uint32).encode()).decode()
async def get_updates(
self,
get_updates_buf: str = "",
timeout_ms: Optional[int] = None,
) -> GetUpdatesResp:
"""长轮询获取新消息。"""
body = json.dumps({
"get_updates_buf": get_updates_buf,
"base_info": {"channel_version": "copaw-weixin-1.0.0"},
})
timeout = timeout_ms or self.long_poll_timeout_ms
try:
raw_text = await self._api_fetch(
"ilink/bot/getupdates",
body,
timeout_ms=timeout,
)
data = json.loads(raw_text)
return parse_get_updates_resp(data)
except (asyncio.TimeoutError, httpx.ReadTimeout):
# 长轮询超时是正常的
return GetUpdatesResp(ret=0, get_updates_buf=get_updates_buf)
async def send_message(self, msg: WeixinMessage) -> None:
"""发送消息。"""
body_dict = {
"msg": weixin_message_to_dict(msg),
"base_info": {"channel_version": "copaw-weixin-1.0.0"},
}
body = json.dumps(body_dict, ensure_ascii=False)
await self._api_fetch("ilink/bot/sendmessage", body)
async def close(self) -> None:
"""关闭客户端。"""
await self._client.aclose()
5. auth.py - 扫码登录
"""微信扫码登录模块。"""
import asyncio
from typing import Optional
from .api import DEFAULT_BASE_URL, fetch_qr_code, poll_qr_status
from .account import (
save_weixin_account,
register_weixin_account_id,
clear_context_tokens_for_account,
update_workspace_weixin_account_id,
)
DEFAULT_LOGIN_TIMEOUT_MS = 480_000 # 8 分钟
def print_qr_code(url: str) -> None:
"""在终端打印 ASCII 二维码。"""
try:
import qrcode
qr = qrcode.QRCode(version=1, box_size=1, border=1)
qr.add_data(url)
qr.make(fit=True)
qr.print_ascii(invert=True)
except ImportError:
print(f"二维码链接: {url}")
async def login_with_qr(
base_url: Optional[str] = None,
account_id: Optional[str] = None,
timeout_ms: int = DEFAULT_LOGIN_TIMEOUT_MS,
) -> WeixinQrWaitResult:
"""执行完整的扫码登录流程。"""
base_url = base_url or DEFAULT_BASE_URL
# 1. 获取二维码
qr_response = await fetch_qr_code(base_url, bot_type="3")
qrcode_url = qr_response.get("qrcode_img_content")
if not qrcode_url:
return WeixinQrWaitResult(message="获取二维码失败")
# 2. 显示二维码
print("\n使用微信扫描以下二维码:\n")
print_qr_code(qrcode_url)
# 3. 等待扫码
qrcode = qr_response.get("qrcode")
result = await poll_qr_status(base_url, qrcode, timeout_ms)
# 4. 保存账号
if result.connected and result.bot_token and result.account_id:
save_weixin_account(
result.account_id,
token=result.bot_token,
base_url=result.base_url,
user_id=result.user_id,
)
register_weixin_account_id(result.account_id)
# 更新配置文件
update_workspace_weixin_account_id(result.account_id)
print("\n✅ 与微信连接成功!")
return result
6. channel.py - 渠道主类
"""微信个人号渠道实现。"""
import asyncio
import logging
from typing import Any, Optional
from .account import (
list_weixin_account_ids,
load_sync_buf,
resolve_weixin_account,
restore_context_tokens,
save_sync_buf,
set_context_token,
get_context_token,
)
from .api import WeixinApiClient, DEFAULT_BASE_URL
from .types import MessageItemType, MessageType, MessageState, WeixinMessage
from ..base import BaseChannel
from ..schema import ChannelType
logger = logging.getLogger(__name__)
WEIXIN_CHANNEL: ChannelType = "weixin"
SESSION_EXPIRED_ERRCODE = 200011
class WeixinChannel(BaseChannel):
"""微信个人号渠道。"""
channel: ChannelType = WEIXIN_CHANNEL
display_name = "WeChat"
uses_manager_queue: bool = True
def __init__(
self,
process,
enabled: bool = True,
bot_prefix: str = "",
base_url: str = DEFAULT_BASE_URL,
account_id: Optional[str] = None,
**kwargs,
):
super().__init__(process, **kwargs)
self.enabled = enabled
self.bot_prefix = bot_prefix
self.base_url = base_url
self.account_id = account_id
self._client: Optional[WeixinApiClient] = None
self._account: Optional[ResolvedWeixinAccount] = None
self._monitor_task: Optional[asyncio.Task] = None
self._abort_event = asyncio.Event()
@classmethod
def from_env(cls, process, on_reply_sent=None) -> "WeixinChannel":
"""从环境变量创建渠道。"""
import os
from .account import list_weixin_account_ids
# 自动检测:如果有账号配置则默认启用
env_enabled = os.getenv("WEIXIN_CHANNEL_ENABLED", "")
if env_enabled:
enabled = env_enabled == "1"
else:
enabled = len(list_weixin_account_ids()) > 0
return cls(
process=process,
enabled=enabled,
bot_prefix=os.getenv("WEIXIN_BOT_PREFIX", ""),
base_url=os.getenv("WEIXIN_BASE_URL", DEFAULT_BASE_URL),
account_id=os.getenv("WEIXIN_ACCOUNT_ID"),
on_reply_sent=on_reply_sent,
)
async def start(self) -> None:
"""启动渠道。"""
if not self.enabled:
return
if not self._enqueue:
logger.error("❌ _enqueue callback not set!")
return
account_ids = list_weixin_account_ids()
if not account_ids:
logger.warning("No Weixin accounts. Run: copaw weixin login")
return
account_id = self.account_id or account_ids[0]
self._account = resolve_weixin_account(account_id)
if not self._account.configured:
logger.warning(f"Account {account_id} not configured. Run: copaw weixin login")
return
restore_context_tokens(account_id)
self._client = WeixinApiClient(
base_url=self._account.base_url,
token=self._account.token,
)
self._abort_event.clear()
self._monitor_task = asyncio.create_task(self._monitor_loop())
logger.info(f"✅ WeixinChannel 已启动: account={account_id}")
async def stop(self) -> None:
"""停止渠道。"""
self._abort_event.set()
if self._monitor_task:
self._monitor_task.cancel()
try:
await self._monitor_task
except asyncio.CancelledError:
pass
self._monitor_task = None
if self._client:
await self._client.close()
self._client = None
async def _monitor_loop(self) -> None:
"""长轮询监控循环。"""
account_id = self._account.account_id
get_updates_buf = load_sync_buf(account_id)
while not self._abort_event.is_set():
try:
resp = await self._client.get_updates(get_updates_buf=get_updates_buf)
# 检查错误
is_api_error = (resp.ret is not None and resp.ret != 0) or (
resp.errcode is not None and resp.errcode != 0
)
if is_api_error:
if resp.errcode == SESSION_EXPIRED_ERRCODE:
logger.error("Session expired. Run: copaw weixin login")
break
continue
# 更新同步缓冲
if resp.get_updates_buf:
save_sync_buf(account_id, resp.get_updates_buf)
get_updates_buf = resp.get_updates_buf
# 处理消息
for msg in resp.msgs or []:
await self._process_inbound_message(msg)
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"Monitor loop error: {e}")
await asyncio.sleep(2)
async def _process_inbound_message(self, msg: WeixinMessage) -> None:
"""处理入站消息。"""
from_user_id = msg.from_user_id
if not from_user_id:
return
logger.info(f"📥 收到微信消息: from={from_user_id}")
# 保存 context token
if msg.context_token:
set_context_token(self._account.account_id, from_user_id, msg.context_token)
# 构建消息内容
content_parts = []
for item in msg.item_list or []:
if item.type == MessageItemType.TEXT and item.text_item:
content_parts.append({
"type": "text",
"text": item.text_item.text,
})
if not content_parts:
return
# 入队处理
native = {
"channel_id": self.channel,
"sender_id": from_user_id,
"session_id": f"weixin:{from_user_id}",
"content_parts": content_parts,
"meta": {
"weixin_from_user_id": from_user_id,
"weixin_context_token": msg.context_token,
},
}
if self._enqueue:
self._enqueue(native)
async def send(self, to_handle: str, text: str, meta=None) -> None:
"""发送文本消息。"""
if not self._client or not self._account:
return
to_user_id = self._parse_user_id(to_handle)
context_token = get_context_token(self._account.account_id, to_user_id)
msg = WeixinMessage(
to_user_id=to_user_id,
client_id=self._generate_client_id(),
message_type=MessageType.BOT,
message_state=MessageState.FINISH,
context_token=context_token,
item_list=[{
"type": MessageItemType.TEXT,
"text_item": {"text": text},
}],
)
await self._client.send_message(msg)
logger.info(f"📤 发送微信消息: to={to_user_id}")
def _parse_user_id(self, to_handle: str) -> str:
"""解析用户 ID。"""
if to_handle.startswith("weixin:"):
return to_handle[7:]
return to_handle
配置说明
环境变量
| 变量 | 说明 | 默认值 |
|---|---|---|
WEIXIN_CHANNEL_ENABLED | 是否启用渠道 | 自动检测 |
WEIXIN_BOT_PREFIX | 消息前缀 | 空 |
WEIXIN_BASE_URL | API 基础 URL | https://ilinkai.weixin.qq.com |
WEIXIN_ACCOUNT_ID | 指定账号 ID | 第一个账号 |
配置文件
在 ~/.copaw/workspaces/<agent_id>/agent.json 中配置:
{
"channels": {
"weixin": {
"enabled": true,
"bot_prefix": "",
"base_url": "https://ilinkai.weixin.qq.com",
"account_id": "xxx-im-bot"
}
}
}
账号数据存储
账号数据存储在 ~/.copaw/weixin/ 目录:
~/.copaw/weixin/
├── accounts.json # 账号索引
└── accounts/
└── xxx-im-bot.json # 账号数据
账号数据文件格式:
{
"token": "xxx@im.bot:xxx",
"base_url": "https://ilinkai.weixin.qq.com",
"user_id": "xxx@im.wechat",
"saved_at": "2026-03-25T00:00:00"
}
CLI 命令
安装命令
# 安装并登录
copaw weixin install
# 扫码登录
copaw weixin login
# 查看状态
copaw weixin status
# 列出账号
copaw weixin accounts
渠道命令
# 通过渠道命令登录
copaw channels login --channel weixin
# 配置渠道
copaw channels config weixin --enabled true
前端集成
添加渠道标签
在 console/src/pages/Control/Channels/components/constants.ts 中添加:
export const CHANNEL_TAGS: Record<string, ChannelTagConfig> = {
// ... 其他渠道
weixin: {
label: 'WeChat',
color: '#07C160',
icon: WechatOutlined,
},
};
渠道配置表单
前端会自动渲染渠道配置表单,支持以下字段:
enabled- 是否启用bot_prefix- 消息前缀base_url- API 基础 URLaccount_id- 账号 ID
常见问题与解决方案
1. 渠道未启动
症状: 日志显示 WeixinChannel is disabled
原因: 没有检测到账号或配置未启用
解决方案:
# 登录账号
copaw weixin login
# 或设置环境变量
export WEIXIN_CHANNEL_ENABLED=1
2. 账号未配置
症状: 日志显示 Account xxx not configured. Run login first.
原因: 配置文件中的 account_id 与实际登录的账号不匹配
解决方案:
# 重新登录
copaw weixin login
# 或手动更新配置文件中的 account_id
3. Session 过期
症状: 日志显示 Session expired 或错误码 200011
原因: 登录凭证过期
解决方案:
copaw weixin login
4. getUpdates 失败
症状: 日志显示 getUpdates failed: ret=None errcode=None
原因: 错误检查逻辑问题
解决方案: 确保使用正确的错误检查逻辑:
is_api_error = (resp.ret is not None and resp.ret != 0) or (
resp.errcode is not None and resp.errcode != 0
)
5. 长轮询超时
症状: httpx.ReadTimeout 异常
原因: 超时异常类型不匹配
解决方案: 捕获正确的异常类型:
except (asyncio.TimeoutError, httpx.ReadTimeout):
# 长轮询超时是正常的
pass
6. 消息未发送
症状: 收到消息但没有回复
原因: 可能是 Agent 处理问题或模型配置问题
排查步骤:
- 检查模型配置是否正确
- 查看是否有 Agent 错误日志
- 检查
on_event_message_completed是否被调用
参考资源
源码参考
@tencent-weixin/openclaw-weixin- 腾讯官方 OpenClaw 微信插件src/copaw/app/channels/weixin/- CoPaw 微信渠道实现
相关文件
| 文件 | 说明 |
|---|---|
src/copaw/app/channels/weixin/channel.py | 渠道主类 |
src/copaw/app/channels/weixin/api.py | API 客户端 |
src/copaw/app/channels/weixin/auth.py | 登录认证 |
src/copaw/app/channels/weixin/account.py | 账号管理 |
src/copaw/app/channels/weixin/types.py | 类型定义 |
src/copaw/cli/weixin_cmd.py | CLI 命令 |
更新日志
v1.0.0 (2026-03-25)
- 初始实现
- 支持文本消息收发
- 支持扫码登录
- 支持账号管理
- 支持自动更新配置文件
- 添加 CLI 命令
- 前端集成
贡献
欢迎提交 Issue 和 Pull Request!
开发环境
# 克隆仓库
git clone https://github.com/copaw/copaw.git
cd copaw
# 安装依赖
pip install -e ".[dev]"
# 运行测试
pytest tests/channels/weixin/
代码风格
- 使用 Python 3.10+ 语法
- 遵循 PEP 8 规范
- 使用 type hints
- 添加 docstring