如何构建抗 DDoS 的 Web3 RPC 网关
目录
一、背景与问题分析
1.1 什么是 RPC 网关?
在 Web3 应用中,前端需要与区块链节点通信,通过 JSON-RPC 协议调用:
前端应用 → RPC 网关 → 区块链节点(全节点)
RPC 网关作为中间层,负责:
- 请求路由与负载均衡
- 访问控制与安全防护
- 缓存热点数据
- 监控与计费
1.2 为什么需要抗 DDoS?
Web3 RPC 网关面临的攻击场景:
| 攻击类型 | 描述 | 影响 |
|---|---|---|
| SYN Flood | 发送大量 TCP SYN 包 | 耗尽连接池,正常请求无法建立连接 |
| HTTP Flood | 大量合法格式的 RPC 请求 | 耗尽节点资源,服务响应缓慢或崩溃 |
| Slowloris | 慢速连接攻击 | 占用连接数,阻塞服务 |
| DNS 放大 | 利用 DNS 服务器放大流量 | 带宽被占满 |
| 应用层攻击 | 针对特定方法的频繁调用 | 如 eth_getLogs 大范围扫描,消耗大量 I/O |
1.3 设计目标
| 指标 | 目标值 |
|---|---|
| 可用性 | 99.9% SLA |
| 延迟 | P99 < 500ms |
| 吞吐量 | 10,000+ RPS |
| 防护能力 | 抵御 100Gbps+ DDoS 攻击 |
| 故障恢复 | < 30s 自动切换 |
二、系统架构设计
2.1 整体架构
┌─────────────────┐
│ CDN / WAF │
│ (Cloudflare等) │
└────────┬────────┘
│
┌────────▼────────┐
│ 负载均衡层 │
│ (L4/L7 LB) │
└────────┬────────┘
│
┌──────────────────────────────┼──────────────────────────────┐
│ │ │
┌────────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ API Gateway │ │ API Gateway │ │ API Gateway │
│ (Region A) │ │ (Region B) │ │ (Region C) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
┌────────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ Redis Cluster │ │ Redis Cluster │ │ Redis Cluster │
│ (限流/缓存) │ │ (限流/缓存) │ │ (限流/缓存) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
┌────────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ Node Cluster │ │ Node Cluster │ │ Node Cluster │
│ (区块链节点) │ │ (区块链节点) │ │ (区块链节点) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
2.2 分层防护策略
┌─────────────────────────────────────────────────────────────────┐
│ Layer 1: DNS 层防护 │
│ - DNS Anycast 路由 │
│ - DNS 防劫持 │
└─────────────────────────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Layer 2: 网络层防护 (CDN/WAF) │
│ - DDoS 清洗 │
│ - IP 黑名单 │
│ - 地理限制 │
└─────────────────────────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Layer 3: 应用层防护 (API Gateway) │
│ - 请求限流 │
│ - 请求验证 │
│ - 方法白名单 │
└─────────────────────────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Layer 4: 资源层防护 (节点层) │
│ - 连接池限制 │
│ - 请求队列 │
│ - 熔断降级 │
└─────────────────────────────────────────────────────────────────┘
三、核心组件设计
3.1 API Gateway 设计
API Gateway 是整个系统的核心,负责请求处理、限流、路由等功能。
3.1.1 技术选型
| 方案 | 优点 | 缺点 | 推荐场景 |
|---|---|---|---|
| Nginx + Lua | 高性能、成熟稳定 | 开发复杂度高 | 生产环境 |
| Kong | 功能丰富、插件生态 | 资源占用较高 | 企业级 |
| Envoy | 云原生、可观测性好 | 学习曲线陡 | K8s 环境 |
| 自研 Go | 完全可控、性能极致 | 开发成本高 | 大规模定制 |
推荐方案:基于 Go 自研 + Nginx 前置
3.1.2 核心模块
// internal/gateway/gateway.go
package gateway
import (
"context"
"net/http"
"time"
)
type Gateway struct {
config *Config
rateLimiter RateLimiter
cache Cache
router Router
pool NodePool
metrics *Metrics
}
type Config struct {
ListenAddr string
ReadTimeout time.Duration
WriteTimeout time.Duration
MaxConnections int
RateLimit RateLimitConfig
Cache CacheConfig
Nodes []NodeConfig
}
func NewGateway(cfg *Config) (*Gateway, error) {
return &Gateway{
config: cfg,
rateLimiter: NewRedisRateLimiter(cfg.RateLimit),
cache: NewRedisCache(cfg.Cache),
router: NewWeightedRouter(cfg.Nodes),
pool: NewNodePool(cfg.Nodes),
metrics: NewMetrics(),
}, nil
}
func (g *Gateway) Start() error {
server := &http.Server{
Addr: g.config.ListenAddr,
Handler: g.handler(),
ReadTimeout: g.config.ReadTimeout,
WriteTimeout: g.config.WriteTimeout,
}
return server.ListenAndServe()
}
3.2 请求处理流程
// internal/gateway/handler.go
package gateway
import (
"encoding/json"
"io"
"net/http"
)
func (g *Gateway) handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 1. 预检查
if !g.preCheck(r) {
g.writeError(w, http.StatusForbidden, "request blocked")
return
}
// 2. 解析请求
body, err := io.ReadAll(r.Body)
if err != nil {
g.writeError(w, http.StatusBadRequest, "invalid request body")
return
}
var req JSONRPCRequest
if err := json.Unmarshal(body, &req); err != nil {
g.writeError(w, http.StatusBadRequest, "invalid JSON-RPC format")
return
}
// 3. 方法白名单检查
if !g.isMethodAllowed(req.Method) {
g.writeError(w, http.StatusForbidden, "method not allowed")
return
}
// 4. 限流检查
clientID := g.getClientID(r)
if !g.rateLimiter.Allow(clientID, req.Method) {
g.writeError(w, http.StatusTooManyRequests, "rate limit exceeded")
return
}
// 5. 缓存检查
cacheKey := g.cacheKey(&req)
if cached, ok := g.cache.Get(cacheKey); ok {
g.writeResponse(w, cached)
g.metrics.RecordCacheHit(req.Method, time.Since(start))
return
}
// 6. 路由到节点
node := g.router.Select(&req)
resp, err := g.pool.Forward(r.Context(), node, &req)
if err != nil {
g.writeError(w, http.StatusBadGateway, "node error")
g.metrics.RecordError(req.Method, err)
return
}
// 7. 缓存响应
if g.shouldCache(&req) {
g.cache.Set(cacheKey, resp, g.cacheTTL(&req))
}
// 8. 返回响应
g.writeResponse(w, resp)
g.metrics.RecordRequest(req.Method, time.Since(start))
})
}
四、DDoS 防护策略
4.1 网络层防护
4.1.1 使用 Cloudflare 或类似服务
用户请求 → Cloudflare Edge → DDoS 清洗 → 源站
配置要点:
# Cloudflare 配置建议
security:
# 安全级别
security_level: high # under_attack_mode 可在攻击时启用
# 防火墙规则
firewall_rules:
- name: "Block known bad IPs"
expression: "ip.src in $bad_ips"
action: block
- name: "Rate limit per IP"
expression: "rate_limit > 100/minute"
action: challenge
- name: "Geo blocking"
expression: "not ip.geo.country in {'US', 'JP', 'SG', 'HK'}"
action: block
# Bot 管理
bot_management:
enable: true
fight_mode: true # 启用 Bot Fight Mode
# 缓存规则
cache_rules:
- name: "Cache eth_chainId"
expression: "http.request.body contains \"eth_chainId\""
ttl: 86400
4.1.2 IP 黑名单管理
// internal/security/blacklist.go
package security
import (
"context"
"sync"
"time"
)
type Blacklist struct {
mu sync.RWMutex
ips map[string]struct{}
prefixes []string // CIDR 前缀
store BlacklistStore
}
type BlacklistStore interface {
Get(ctx context.Context, ip string) (bool, error)
Add(ctx context.Context, ip string, ttl time.Duration) error
Remove(ctx context.Context, ip string) error
}
func NewBlacklist(store BlacklistStore) *Blacklist {
return &Blacklist{
ips: make(map[string]struct{}),
prefixes: []string{},
store: store,
}
}
func (b *Blacklist) IsBlocked(ip string) bool {
b.mu.RLock()
defer b.mu.RUnlock()
// 精确匹配
if _, ok := b.ips[ip]; ok {
return true
}
// CIDR 前缀匹配
for _, prefix := range b.prefixes {
if matchCIDR(ip, prefix) {
return true
}
}
return false
}
func (b *Blacklist) Add(ip string, ttl time.Duration) {
b.mu.Lock()
defer b.mu.Unlock()
b.ips[ip] = struct{}{}
// 持久化存储
go b.store.Add(context.Background(), ip, ttl)
}
// 自动封禁逻辑
func (b *Blacklist) AutoBan(ip string, violations int, window time.Duration) {
if violations > 100 { // 1分钟内超过100次违规
b.Add(ip, 24*time.Hour)
}
}
4.2 应用层防护
4.2.1 请求验证
// internal/security/validator.go
package security
import (
"regexp"
"strings"
)
type RequestValidator struct {
allowedMethods map[string]bool
maxBatchSize int
maxRequestSize int64
}
func NewRequestValidator() *RequestValidator {
// 允许的 RPC 方法白名单
allowed := map[string]bool{
"eth_chainId": true,
"eth_blockNumber": true,
"eth_getBalance": true,
"eth_getBlockByNumber": true,
"eth_getTransactionByHash": true,
"eth_call": true,
"eth_estimateGas": true,
"eth_getLogs": true, // 需要额外限制
"eth_sendRawTransaction": true,
// 禁止的方法
// "eth_getLogs" 大范围查询需要特殊处理
}
return &RequestValidator{
allowedMethods: allowed,
maxBatchSize: 10,
maxRequestSize: 1 << 20, // 1MB
}
}
func (v *RequestValidator) Validate(req *JSONRPCRequest) error {
// 检查请求大小
if len(req.Raw) > int(v.maxRequestSize) {
return ErrRequestTooLarge
}
// 检查方法是否允许
if !v.allowedMethods[req.Method] {
return ErrMethodNotAllowed
}
// 检查批量请求大小
if req.IsBatch && len(req.Batch) > v.maxBatchSize {
return ErrBatchTooLarge
}
// 特殊方法的参数验证
if req.Method == "eth_getLogs" {
return v.validateGetLogs(req)
}
return nil
}
func (v *RequestValidator) validateGetLogs(req *JSONRPCRequest) error {
params := req.Params.([]interface{})
if len(params) == 0 {
return nil
}
filter, ok := params[0].(map[string]interface{})
if !ok {
return nil
}
// 限制区块范围
fromBlock, _ := filter["fromBlock"].(string)
toBlock, _ := filter["toBlock"].(string)
if fromBlock != "" && toBlock != "" {
rangeSize := parseBlockRange(fromBlock, toBlock)
if rangeSize > 1000 { // 限制最大1000个区块
return ErrBlockRangeTooLarge
}
}
return nil
}
4.2.2 慢速攻击防护
// internal/security/slowloris.go
package security
import (
"net"
"net/http"
"sync"
"time"
)
type SlowlorisDefense struct {
mu sync.Mutex
connections map[string]*ConnectionInfo
maxConnPerIP int
readTimeout time.Duration
writeTimeout time.Duration
minReadRate int64 // bytes per second
}
type ConnectionInfo struct {
IP string
ConnTime time.Time
LastActive time.Time
BytesRead int64
}
func NewSlowlorisDefense() *SlowlorisDefense {
return &SlowlorisDefense{
connections: make(map[string]*ConnectionInfo),
maxConnPerIP: 10,
readTimeout: 10 * time.Second,
writeTimeout: 30 * time.Second,
minReadRate: 100, // 最低100 bytes/s
}
}
func (s *SlowlorisDefense) WrapHandler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip := getRealIP(r)
// 检查连接数
if !s.checkConnectionLimit(ip) {
http.Error(w, "too many connections", http.StatusTooManyRequests)
return
}
// 记录连接
connInfo := s.trackConnection(ip)
defer s.releaseConnection(ip)
// 设置超时
conn := r.Context().Value(http.ServerContextKey).(*http.Server)
conn.ReadTimeout = s.readTimeout
conn.WriteTimeout = s.writeTimeout
next.ServeHTTP(w, r)
})
}
func (s *SlowlorisDefense) checkConnectionLimit(ip string) bool {
s.mu.Lock()
defer s.mu.Unlock()
count := 0
for _, info := range s.connections {
if info.IP == ip {
count++
}
}
return count < s.maxConnPerIP
}
五、限流算法实现
5.1 多维度限流策略
┌─────────────────────────────────────────────────────────────────┐
│ 限流维度 │
├─────────────────────────────────────────────────────────────────┤
│ 全局限流 │ 整个网关的总 QPS 限制 │
│ IP 限流 │ 单个 IP 的请求频率限制 │
│ API Key 限流 │ 单个 API Key 的配额限制 │
│ 方法限流 │ 特定 RPC 方法的调用限制 │
│ 用户等级限流 │ 不同付费等级的差异化限流 │
└─────────────────────────────────────────────────────────────────┘
5.2 令牌桶算法实现
// internal/ratelimit/token_bucket.go
package ratelimit
import (
"sync"
"time"
)
// TokenBucket 令牌桶算法
type TokenBucket struct {
mu sync.Mutex
capacity int64 // 桶容量
tokens int64 // 当前令牌数
rate int64 // 令牌生成速率 (tokens/second)
lastUpdate time.Time // 上次更新时间
}
func NewTokenBucket(capacity, rate int64) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
rate: rate,
lastUpdate: time.Now(),
}
}
func (tb *TokenBucket) Allow(n int64) bool {
tb.mu.Lock()
defer tb.mu.Unlock()
// 计算新增令牌
now := time.Now()
elapsed := now.Sub(tb.lastUpdate).Seconds()
newTokens := int64(elapsed * float64(tb.rate))
// 更新令牌数
tb.tokens = min(tb.capacity, tb.tokens+newTokens)
tb.lastUpdate = now
// 检查是否有足够令牌
if tb.tokens >= n {
tb.tokens -= n
return true
}
return false
}
func (tb *TokenBucket) Wait(n int64) time.Duration {
tb.mu.Lock()
defer tb.mu.Unlock()
now := time.Now()
elapsed := now.Sub(tb.lastUpdate).Seconds()
newTokens := int64(elapsed * float64(tb.rate))
tb.tokens = min(tb.capacity, tb.tokens+newTokens)
tb.lastUpdate = now
if tb.tokens >= n {
tb.tokens -= n
return 0
}
// 计算等待时间
needed := n - tb.tokens
waitTime := time.Duration(float64(needed)/float64(tb.rate)) * time.Second
return waitTime
}
5.3 滑动窗口算法实现
// internal/ratelimit/sliding_window.go
package ratelimit
import (
"context"
"time"
)
// SlidingWindow 滑动窗口限流 (Redis 实现)
type SlidingWindow struct {
client RedisClient
key string
limit int64
window time.Duration
}
func NewSlidingWindow(client RedisClient, key string, limit int64, window time.Duration) *SlidingWindow {
return &SlidingWindow{
client: client,
key: key,
limit: limit,
window: window,
}
}
func (sw *SlidingWindow) Allow(ctx context.Context) (bool, error) {
now := time.Now().UnixMilli()
windowStart := now - sw.window.Milliseconds()
// Lua 脚本保证原子性
script := `
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window_start = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
-- 移除过期记录
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
-- 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)
if count < limit then
-- 添加新请求
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('PEXPIRE', key, ARGV[4])
return 1
end
return 0
`
result, err := sw.client.Eval(ctx, script,
[]string{sw.key},
now, windowStart, sw.limit, sw.window.Milliseconds(),
).Int()
if err != nil {
return false, err
}
return result == 1, nil
}
func (sw *SlidingWindow) Count(ctx context.Context) (int64, error) {
now := time.Now().UnixMilli()
windowStart := now - sw.window.Milliseconds()
count, err := sw.client.ZCount(ctx, sw.key,
string(windowStart), string(now),
).Result()
return count, err
}
5.4 分布式限流器
// internal/ratelimit/distributed_limiter.go
package ratelimit
import (
"context"
"time"
)
// DistributedLimiter 分布式限流器
type DistributedLimiter struct {
client RedisClient
// 限流配置
globalLimit int64
globalWindow time.Duration
ipLimit int64
ipWindow time.Duration
methodLimits map[string]MethodLimit
}
type MethodLimit struct {
Limit int64
Window time.Duration
}
type RateLimitConfig struct {
GlobalLimit int64 `yaml:"global_limit"`
GlobalWindow time.Duration `yaml:"global_window"`
IPLimit int64 `yaml:"ip_limit"`
IPWindow time.Duration `yaml:"ip_window"`
Methods map[string]MethodLimit `yaml:"methods"`
}
func NewDistributedLimiter(client RedisClient, cfg RateLimitConfig) *DistributedLimiter {
return &DistributedLimiter{
client: client,
globalLimit: cfg.GlobalLimit,
globalWindow: cfg.GlobalWindow,
ipLimit: cfg.IPLimit,
ipWindow: cfg.IPWindow,
methodLimits: cfg.Methods,
}
}
// Check 多维度限流检查
func (dl *DistributedLimiter) Check(ctx context.Context, ip, apiKey, method string) (bool, error) {
// 1. 全局限流
globalKey := "ratelimit:global"
globalLimiter := NewSlidingWindow(dl.client, globalKey, dl.globalLimit, dl.globalWindow)
if ok, err := globalLimiter.Allow(ctx); !ok || err != nil {
return false, err
}
// 2. IP 限流
ipKey := "ratelimit:ip:" + ip
ipLimiter := NewSlidingWindow(dl.client, ipKey, dl.ipLimit, dl.ipWindow)
if ok, err := ipLimiter.Allow(ctx); !ok || err != nil {
return false, err
}
// 3. API Key 限流 (如果提供)
if apiKey != "" {
apiKeyKey := "ratelimit:apikey:" + apiKey
apiKeyLimiter := NewSlidingWindow(dl.client, apiKeyKey, dl.ipLimit, dl.ipWindow)
if ok, err := apiKeyLimiter.Allow(ctx); !ok || err != nil {
return false, err
}
}
// 4. 方法限流
if methodLimit, ok := dl.methodLimits[method]; ok {
methodKey := "ratelimit:method:" + method
methodLimiter := NewSlidingWindow(dl.client, methodKey, methodLimit.Limit, methodLimit.Window)
if ok, err := methodLimiter.Allow(ctx); !ok || err != nil {
return false, err
}
}
return true, nil
}
5.5 限流配置示例
# config/ratelimit.yaml
rate_limit:
# 全局限流
global_limit: 100000 # 10万 QPS
global_window: 1s
# 单 IP 限流
ip_limit: 100 # 100 QPS/IP
ip_window: 1s
# 方法级别限流
methods:
eth_call:
limit: 50
window: 1s
eth_getLogs:
limit: 10 # 更严格的限制
window: 1s
eth_estimateGas:
limit: 20
window: 1s
eth_sendRawTransaction:
limit: 10
window: 1s
eth_getBalance:
limit: 100
window: 1s
# 批量请求
batch:
limit: 20
window: 1s
六、缓存层设计
6.1 缓存策略
┌─────────────────────────────────────────────────────────────────┐
│ 缓存分层设计 │
├─────────────────────────────────────────────────────────────────┤
│ L1: 本地内存缓存 │
│ - 热点数据 │
│ - chainId, blockNumber 等 │
│ - TTL: 1-10s │
├─────────────────────────────────────────────────────────────────┤
│ L2: Redis 分布式缓存 │
│ - 共享数据 │
│ - 交易收据、区块信息 │
│ - TTL: 10s-1h │
├─────────────────────────────────────────────────────────────────┤
│ L3: 区块链节点 │
│ - 最终数据源 │
└─────────────────────────────────────────────────────────────────┘
6.2 缓存实现
// internal/cache/cache.go
package cache
import (
"context"
"encoding/json"
"time"
)
// Cache 多级缓存
type Cache struct {
local *LocalCache
remote *RedisCache
}
type CacheConfig struct {
LocalCacheSize int `yaml:"local_cache_size"`
LocalCacheTTL time.Duration `yaml:"local_cache_ttl"`
RedisAddr string `yaml:"redis_addr"`
RedisPoolSize int `yaml:"redis_pool_size"`
}
func NewCache(cfg CacheConfig) (*Cache, error) {
local := NewLocalCache(cfg.LocalCacheSize, cfg.LocalCacheTTL)
remote, err := NewRedisCache(cfg.RedisAddr, cfg.RedisPoolSize)
if err != nil {
return nil, err
}
return &Cache{
local: local,
remote: remote,
}, nil
}
// Get 多级缓存获取
func (c *Cache) Get(ctx context.Context, key string, dest interface{}) error {
// 1. 先查本地缓存
if val, ok := c.local.Get(key); ok {
return json.Unmarshal(val, dest)
}
// 2. 查 Redis
val, err := c.remote.Get(ctx, key)
if err == nil {
// 回填本地缓存
c.local.Set(key, val)
return json.Unmarshal(val, dest)
}
return err
}
// Set 多级缓存设置
func (c *Cache) Set(ctx context.Context, key string, val interface{}, ttl time.Duration) error {
data, err := json.Marshal(val)
if err != nil {
return err
}
// 同时写入两级缓存
c.local.Set(key, data)
return c.remote.Set(ctx, key, data, ttl)
}
// Delete 删除缓存
func (c *Cache) Delete(ctx context.Context, key string) error {
c.local.Delete(key)
return c.remote.Delete(ctx, key)
}
6.3 本地缓存实现
// internal/cache/local.go
package cache
import (
"sync"
"time"
)
// LocalCache 本地内存缓存 (LRU)
type LocalCache struct {
mu sync.RWMutex
capacity int
ttl time.Duration
items map[string]*cacheItem
order *list.List // LRU 顺序
}
type cacheItem struct {
key string
value []byte
expireAt time.Time
element *list.Element
}
func NewLocalCache(capacity int, ttl time.Duration) *LocalCache {
c := &LocalCache{
capacity: capacity,
ttl: ttl,
items: make(map[string]*cacheItem),
order: list.New(),
}
// 启动清理协程
go c.cleanup()
return c
}
func (c *LocalCache) Get(key string) ([]byte, bool) {
c.mu.Lock()
defer c.mu.Unlock()
item, ok := c.items[key]
if !ok {
return nil, false
}
// 检查过期
if time.Now().After(item.expireAt) {
c.deleteLocked(key)
return nil, false
}
// 更新 LRU 顺序
c.order.MoveToFront(item.element)
return item.value, true
}
func (c *LocalCache) Set(key string, value []byte) {
c.mu.Lock()
defer c.mu.Unlock()
// 已存在则更新
if item, ok := c.items[key]; ok {
item.value = value
item.expireAt = time.Now().Add(c.ttl)
c.order.MoveToFront(item.element)
return
}
// 容量不足则淘汰
if c.order.Len() >= c.capacity {
oldest := c.order.Back()
if oldest != nil {
c.deleteLocked(oldest.Value.(string))
}
}
// 添加新项
element := c.order.PushFront(key)
c.items[key] = &cacheItem{
key: key,
value: value,
expireAt: time.Now().Add(c.ttl),
element: element,
}
}
func (c *LocalCache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.deleteLocked(key)
}
func (c *LocalCache) deleteLocked(key string) {
if item, ok := c.items[key]; ok {
c.order.Remove(item.element)
delete(c.items, key)
}
}
func (c *LocalCache) cleanup() {
ticker := time.NewTicker(time.Minute)
for range ticker.C {
c.mu.Lock()
now := time.Now()
for key, item := range c.items {
if now.After(item.expireAt) {
c.deleteLocked(key)
}
}
c.mu.Unlock()
}
}
6.4 缓存键设计
// internal/cache/keys.go
package cache
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
)
// CacheKeyGenerator 缓存键生成器
type CacheKeyGenerator struct {
prefix string
}
func NewCacheKeyGenerator(prefix string) *CacheKeyGenerator {
return &CacheKeyGenerator{prefix: prefix}
}
// Generate 生成缓存键
func (g *CacheKeyGenerator) Generate(method string, params interface{}) string {
// 序列化参数
paramsJSON, _ := json.Marshal(params)
// 计算参数哈希
hash := sha256.Sum256(paramsJSON)
paramsHash := hex.EncodeToString(hash[:])[:16]
return fmt.Sprintf("%s:%s:%s", g.prefix, method, paramsHash)
}
// 区块相关缓存键
func (g *CacheKeyGenerator) BlockByNumber(blockNumber string) string {
return fmt.Sprintf("%s:block:number:%s", g.prefix, blockNumber)
}
func (g *CacheKeyGenerator) BlockByHash(blockHash string) string {
return fmt.Sprintf("%s:block:hash:%s", g.prefix, blockHash)
}
// 交易相关缓存键
func (g *CacheKeyGenerator) TransactionByHash(txHash string) string {
return fmt.Sprintf("%s:tx:hash:%s", g.prefix, txHash)
}
func (g *CacheKeyGenerator) TransactionReceipt(txHash string) string {
return fmt.Sprintf("%s:tx:receipt:%s", g.prefix, txHash)
}
// 账户相关缓存键
func (g *CacheKeyGenerator) Balance(address, blockTag string) string {
return fmt.Sprintf("%s:balance:%s:%s", g.prefix, address, blockTag)
}
func (g *CacheKeyGenerator) Nonce(address, blockTag string) string {
return fmt.Sprintf("%s:nonce:%s:%s", g.prefix, address, blockTag)
}
// 合约调用缓存键
func (g *CacheKeyGenerator) EthCall(to string, data string, blockTag string) string {
return fmt.Sprintf("%s:call:%s:%s:%s", g.prefix, to, data[:16], blockTag)
}
6.5 缓存 TTL 策略
// internal/cache/ttl.go
package cache
import "time"
// MethodTTL 不同方法的缓存时间
var MethodTTL = map[string]time.Duration{
// 区块链状态 - 不缓存或极短
"eth_blockNumber": 1 * time.Second,
"eth_gasPrice": 5 * time.Second,
"eth_chainId": 24 * time.Hour, // 链 ID 不会变
// 区块数据 - 永久缓存 (已确认区块)
"eth_getBlockByNumber": 5 * time.Minute, // 最新区块短缓存
"eth_getBlockByHash": 24 * time.Hour, // 历史区块永久缓存
"eth_getBlockTransactionCountByNumber": 5 * time.Minute,
"eth_getBlockTransactionCountByHash": 24 * time.Hour,
// 交易数据
"eth_getTransactionByHash": 24 * time.Hour,
"eth_getTransactionReceipt": 24 * time.Hour,
"eth_getTransactionByBlockHashAndIndex": 24 * time.Hour,
// 账户数据 - 短缓存
"eth_getBalance": 10 * time.Second,
"eth_getNonce": 10 * time.Second,
"eth_getCode": 1 * time.Hour,
"eth_getStorageAt": 10 * time.Second,
// 合约调用 - 根据区块标签
"eth_call": 10 * time.Second, // latest
"eth_estimateGas": 0, // 不缓存
// 日志查询 - 不缓存或极短
"eth_getLogs": 0, // 不缓存,结果可能很大
// 发送交易 - 不缓存
"eth_sendRawTransaction": 0,
}
// GetTTL 获取方法的缓存时间
func GetTTL(method string) time.Duration {
if ttl, ok := MethodTTL[method]; ok {
return ttl
}
return 10 * time.Second // 默认 10 秒
}
// IsCacheable 判断是否可缓存
func IsCacheable(method string) bool {
ttl, ok := MethodTTL[method]
return ok && ttl > 0
}
七、监控与告警
7.1 监控指标
// internal/metrics/metrics.go
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// 请求计数
RequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_gateway_requests_total",
Help: "Total number of RPC requests",
},
[]string{"method", "status"},
)
// 请求延迟
RequestDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "rpc_gateway_request_duration_seconds",
Help: "Request duration in seconds",
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10},
},
[]string{"method"},
)
// 活跃连接数
ActiveConnections = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "rpc_gateway_active_connections",
Help: "Number of active connections",
},
)
// 限流计数
RateLimitHits = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_gateway_rate_limit_hits_total",
Help: "Total number of rate limit hits",
},
[]string{"type", "key"},
)
// 缓存命中率
CacheHits = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_gateway_cache_hits_total",
Help: "Total number of cache hits",
},
[]string{"level", "method"},
)
// 节点健康状态
NodeHealth = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "rpc_gateway_node_health",
Help: "Node health status (1=healthy, 0=unhealthy)",
},
[]string{"node"},
)
// 节点请求分布
NodeRequests = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_gateway_node_requests_total",
Help: "Total requests per node",
},
[]string{"node", "method"},
)
// DDoS 攻击检测
DDoSAlerts = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_gateway_ddos_alerts_total",
Help: "Total number of DDoS alerts",
},
[]string{"type", "severity"},
)
)
7.2 告警规则
# config/alerting.yaml
groups:
- name: rpc_gateway
rules:
# 高错误率
- alert: HighErrorRate
expr: |
sum(rate(rpc_gateway_requests_total{status=~"5.."}[5m])) /
sum(rate(rpc_gateway_requests_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }}"
# 高延迟
- alert: HighLatency
expr: |
histogram_quantile(0.99,
sum(rate(rpc_gateway_request_duration_seconds_bucket[5m])) by (le, method)
) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "P99 latency for {{ $labels.method }} is {{ $value }}s"
# 限流触发过多
- alert: HighRateLimitHits
expr: |
sum(rate(rpc_gateway_rate_limit_hits_total[5m])) > 100
for: 2m
labels:
severity: warning
annotations:
summary: "High rate limit hits"
description: "Rate limit hits: {{ $value }}/s"
# 节点不健康
- alert: NodeUnhealthy
expr: rpc_gateway_node_health == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Node {{ $labels.node }} is unhealthy"
# 可疑流量模式
- alert: SuspiciousTrafficPattern
expr: |
sum(rate(rpc_gateway_requests_total[1m])) >
2 * sum(rate(rpc_gateway_requests_total[1h] offset 1h))
for: 1m
labels:
severity: critical
annotations:
summary: "Suspicious traffic pattern detected"
description: "Current traffic is 2x higher than usual"
# 缓存命中率下降
- alert: LowCacheHitRate
expr: |
sum(rate(rpc_gateway_cache_hits_total{level="l1"}[5m])) /
sum(rate(rpc_gateway_requests_total[5m])) < 0.3
for: 10m
labels:
severity: warning
annotations:
summary: "Low cache hit rate"
description: "L1 cache hit rate is {{ $value | humanizePercentage }}"
7.3 健康检查
// internal/health/health.go
package health
import (
"context"
"net/http"
"sync"
"time"
)
type HealthChecker struct {
checks map[string]CheckFunc
mu sync.RWMutex
}
type CheckFunc func(ctx context.Context) error
type HealthStatus struct {
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
Checks map[string]Check `json:"checks"`
}
type Check struct {
Status string `json:"status"`
Latency string `json:"latency"`
Error string `json:"error,omitempty"`
}
func NewHealthChecker() *HealthChecker {
return &HealthChecker{
checks: make(map[string]CheckFunc),
}
}
func (h *HealthChecker) Register(name string, check CheckFunc) {
h.mu.Lock()
defer h.mu.Unlock()
h.checks[name] = check
}
func (h *HealthChecker) Check(ctx context.Context) HealthStatus {
h.mu.RLock()
defer h.mu.RUnlock()
status := HealthStatus{
Status: "healthy",
Timestamp: time.Now(),
Checks: make(map[string]Check),
}
var wg sync.WaitGroup
var mu sync.Mutex
for name, check := range h.checks {
wg.Add(1)
go func(name string, check CheckFunc) {
defer wg.Done()
start := time.Now()
err := check(ctx)
latency := time.Since(start)
mu.Lock()
defer mu.Unlock()
checkResult := Check{
Latency: latency.String(),
}
if err != nil {
checkResult.Status = "unhealthy"
checkResult.Error = err.Error()
status.Status = "unhealthy"
} else {
checkResult.Status = "healthy"
}
status.Checks[name] = checkResult
}(name, check)
}
wg.Wait()
return status
}
func (h *HealthChecker) Handler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
status := h.Check(r.Context())
w.Header().Set("Content-Type", "application/json")
if status.Status == "unhealthy" {
w.WriteHeader(http.StatusServiceUnavailable)
}
json.NewEncoder(w).Encode(status)
}
}
八、部署方案
8.1 容器化部署
# Dockerfile
FROM golang:1.21-alpine AS builder
WORKDIR /app
# 依赖
COPY go.mod go.sum ./
RUN go mod download
# 构建
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o rpc-gateway ./cmd/gateway
# 运行镜像
FROM alpine:3.19
RUN apk --no-cache add ca-certificates tzdata
WORKDIR /app
COPY --from=builder /app/rpc-gateway .
COPY --from=builder /app/config ./config
EXPOSE 8080 9090
ENTRYPOINT ["./rpc-gateway"]
CMD ["--config", "config/config.yaml"]
8.2 Kubernetes 部署
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rpc-gateway
labels:
app: rpc-gateway
spec:
replicas: 3
selector:
matchLabels:
app: rpc-gateway
template:
metadata:
labels:
app: rpc-gateway
spec:
containers:
- name: gateway
image: rpc-gateway:latest
ports:
- containerPort: 8080
name: http
- containerPort: 9090
name: metrics
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "2000m"
memory: "2Gi"
env:
- name: REDIS_ADDR
valueFrom:
secretKeyRef:
name: rpc-gateway-secrets
key: redis-addr
- name: NODE_URLS
valueFrom:
configMapKeyRef:
name: rpc-gateway-config
key: node-urls
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: config
mountPath: /app/config
volumes:
- name: config
configMap:
name: rpc-gateway-config
---
apiVersion: v1
kind: Service
metadata:
name: rpc-gateway
spec:
selector:
app: rpc-gateway
ports:
- port: 80
targetPort: 8080
name: http
- port: 9090
targetPort: 9090
name: metrics
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: rpc-gateway-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: rpc-gateway
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: rpc-gateway-pdb
spec:
minAvailable: 2
selector:
matchLabels:
app: rpc-gateway
8.3 配置管理
# config/config.yaml
server:
http_addr: ":8080"
metrics_addr: ":9090"
read_timeout: 10s
write_timeout: 30s
max_connections: 10000
rate_limit:
global_limit: 100000
global_window: 1s
ip_limit: 100
ip_window: 1s
methods:
eth_call:
limit: 50
window: 1s
eth_getLogs:
limit: 10
window: 1s
eth_estimateGas:
limit: 20
window: 1s
cache:
local_size: 10000
local_ttl: 10s
redis_addr: "${REDIS_ADDR}"
redis_pool_size: 100
nodes:
- name: "node-1"
url: "${NODE_URL_1}"
weight: 100
max_connections: 100
- name: "node-2"
url: "${NODE_URL_2}"
weight: 100
max_connections: 100
- name: "node-3"
url: "${NODE_URL_3}"
weight: 50
max_connections: 50
security:
blacklist_ttl: 24h
auto_ban_threshold: 100
auto_ban_window: 1m
logging:
level: info
format: json
monitoring:
enabled: true
prometheus_addr: ":9090"
8.4 多区域部署
┌─────────────────────────────────────────────────────────────────┐
│ 全球负载均衡 │
│ (AWS Route53 / Cloudflare) │
└───────────────────────────┬─────────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ us-east-1 │ │ eu-west-1 │ │ ap-southeast-1│
│ (美国东部) │ │ (欧洲西部) │ │ (亚太东南) │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ - API Gateway │ │ - API Gateway │ │ - API Gateway │
│ - Redis Cluster │ │ - Redis Cluster │ │ - Redis Cluster │
│ - Node Cluster │ │ - Node Cluster │ │ - Node Cluster │
└─────────────────┘ └─────────────────┘ └─────────────────┘
九、成本估算
9.1 基础设施成本 (月)
| 组件 | 规格 | 数量 | 单价 | 月费用 |
|---|---|---|---|---|
| API Gateway | 4C8G | 3 | $100 | $300 |
| Redis Cluster | 4C16G | 3 | $150 | $450 |
| 区块链节点 | 8C32G | 3 | $300 | $900 |
| 负载均衡 | - | 1 | $50 | $50 |
| CDN/WAF | Cloudflare Pro | 1 | $20 | $20 |
| 监控 | Prometheus + Grafana | 1 | $50 | $50 |
| 总计 | $1,770 |
9.2 扩展成本
| QPS | Gateway 实例 | 节点实例 | 月费用增量 |
|---|---|---|---|
| 10K | 3 | 3 | 基准 |
| 50K | 6 | 6 | +$1,770 |
| 100K | 12 | 12 | +$5,310 |
总结
构建抗 DDoS 的 Web3 RPC 网关需要多层防护:
- 网络层:CDN/WAF 清洗、Anycast 路由
- 应用层:限流、请求验证、方法白名单
- 资源层:连接池、熔断降级、缓存
- 监控层:实时监控、自动告警、快速响应
核心设计原则:
- 纵深防御:多层防护,单点失效不影响整体
- 弹性扩展:水平扩展应对流量峰值
- 快速恢复:自动故障转移,最小化影响
本文仅供技术学习参考,不构成任何投资建议。