返回博客列表

如何构建抗 DDoS 的 Web3 RPC 网关

web3
RPCDDoS网关高可用架构设计

⚠️ 免责声明:本文内容仅供技术学习与研究参考,不构成任何投资建议。请读者独立思考,谨慎决策。

如何构建抗 DDoS 的 Web3 RPC 网关

目录


一、背景与问题分析

1.1 什么是 RPC 网关?

在 Web3 应用中,前端需要与区块链节点通信,通过 JSON-RPC 协议调用:

前端应用 → RPC 网关 → 区块链节点(全节点)

RPC 网关作为中间层,负责:

  • 请求路由与负载均衡
  • 访问控制与安全防护
  • 缓存热点数据
  • 监控与计费

1.2 为什么需要抗 DDoS?

Web3 RPC 网关面临的攻击场景:

攻击类型描述影响
SYN Flood发送大量 TCP SYN 包耗尽连接池,正常请求无法建立连接
HTTP Flood大量合法格式的 RPC 请求耗尽节点资源,服务响应缓慢或崩溃
Slowloris慢速连接攻击占用连接数,阻塞服务
DNS 放大利用 DNS 服务器放大流量带宽被占满
应用层攻击针对特定方法的频繁调用eth_getLogs 大范围扫描,消耗大量 I/O

1.3 设计目标

指标目标值
可用性99.9% SLA
延迟P99 < 500ms
吞吐量10,000+ RPS
防护能力抵御 100Gbps+ DDoS 攻击
故障恢复< 30s 自动切换

二、系统架构设计

2.1 整体架构

                                    ┌─────────────────┐
                                    │   CDN / WAF     │
                                    │ (Cloudflare等)  │
                                    └────────┬────────┘
                                             │
                                    ┌────────▼────────┐
                                    │   负载均衡层    │
                                    │ (L4/L7 LB)     │
                                    └────────┬────────┘
                                             │
              ┌──────────────────────────────┼──────────────────────────────┐
              │                              │                              │
     ┌────────▼────────┐           ┌────────▼────────┐           ┌────────▼────────┐
     │   API Gateway   │           │   API Gateway   │           │   API Gateway   │
     │   (Region A)    │           │   (Region B)    │           │   (Region C)    │
     └────────┬────────┘           └────────┬────────┘           └────────┬────────┘
              │                              │                              │
     ┌────────▼────────┐           ┌────────▼────────┐           ┌────────▼────────┐
     │   Redis Cluster │           │   Redis Cluster │           │   Redis Cluster │
     │   (限流/缓存)    │           │   (限流/缓存)    │           │   (限流/缓存)    │
     └────────┬────────┘           └────────┬────────┘           └────────┬────────┘
              │                              │                              │
     ┌────────▼────────┐           ┌────────▼────────┐           ┌────────▼────────┐
     │  Node Cluster   │           │  Node Cluster   │           │  Node Cluster   │
     │  (区块链节点)    │           │  (区块链节点)    │           │  (区块链节点)    │
     └─────────────────┘           └─────────────────┘           └─────────────────┘

2.2 分层防护策略

┌─────────────────────────────────────────────────────────────────┐
│  Layer 1: DNS 层防护                                             │
│  - DNS Anycast 路由                                              │
│  - DNS 防劫持                                                    │
└─────────────────────────────────────────────────────────────────┘
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│  Layer 2: 网络层防护 (CDN/WAF)                                   │
│  - DDoS 清洗                                                    │
│  - IP 黑名单                                                    │
│  - 地理限制                                                     │
└─────────────────────────────────────────────────────────────────┘
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│  Layer 3: 应用层防护 (API Gateway)                               │
│  - 请求限流                                                     │
│  - 请求验证                                                     │
│  - 方法白名单                                                   │
└─────────────────────────────────────────────────────────────────┘
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│  Layer 4: 资源层防护 (节点层)                                    │
│  - 连接池限制                                                   │
│  - 请求队列                                                     │
│  - 熔断降级                                                     │
└─────────────────────────────────────────────────────────────────┘

三、核心组件设计

3.1 API Gateway 设计

API Gateway 是整个系统的核心,负责请求处理、限流、路由等功能。

3.1.1 技术选型

方案优点缺点推荐场景
Nginx + Lua高性能、成熟稳定开发复杂度高生产环境
Kong功能丰富、插件生态资源占用较高企业级
Envoy云原生、可观测性好学习曲线陡K8s 环境
自研 Go完全可控、性能极致开发成本高大规模定制

推荐方案:基于 Go 自研 + Nginx 前置

3.1.2 核心模块

// internal/gateway/gateway.go
package gateway

import (
    "context"
    "net/http"
    "time"
)

type Gateway struct {
    config      *Config
    rateLimiter RateLimiter
    cache       Cache
    router      Router
    pool        NodePool
    metrics     *Metrics
}

type Config struct {
    ListenAddr        string
    ReadTimeout       time.Duration
    WriteTimeout      time.Duration
    MaxConnections    int
    RateLimit         RateLimitConfig
    Cache             CacheConfig
    Nodes             []NodeConfig
}

func NewGateway(cfg *Config) (*Gateway, error) {
    return &Gateway{
        config:      cfg,
        rateLimiter: NewRedisRateLimiter(cfg.RateLimit),
        cache:       NewRedisCache(cfg.Cache),
        router:      NewWeightedRouter(cfg.Nodes),
        pool:        NewNodePool(cfg.Nodes),
        metrics:     NewMetrics(),
    }, nil
}

func (g *Gateway) Start() error {
    server := &http.Server{
        Addr:         g.config.ListenAddr,
        Handler:      g.handler(),
        ReadTimeout:  g.config.ReadTimeout,
        WriteTimeout: g.config.WriteTimeout,
    }
    return server.ListenAndServe()
}

3.2 请求处理流程

// internal/gateway/handler.go
package gateway

import (
    "encoding/json"
    "io"
    "net/http"
)

func (g *Gateway) handler() http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        
        // 1. 预检查
        if !g.preCheck(r) {
            g.writeError(w, http.StatusForbidden, "request blocked")
            return
        }
        
        // 2. 解析请求
        body, err := io.ReadAll(r.Body)
        if err != nil {
            g.writeError(w, http.StatusBadRequest, "invalid request body")
            return
        }
        
        var req JSONRPCRequest
        if err := json.Unmarshal(body, &req); err != nil {
            g.writeError(w, http.StatusBadRequest, "invalid JSON-RPC format")
            return
        }
        
        // 3. 方法白名单检查
        if !g.isMethodAllowed(req.Method) {
            g.writeError(w, http.StatusForbidden, "method not allowed")
            return
        }
        
        // 4. 限流检查
        clientID := g.getClientID(r)
        if !g.rateLimiter.Allow(clientID, req.Method) {
            g.writeError(w, http.StatusTooManyRequests, "rate limit exceeded")
            return
        }
        
        // 5. 缓存检查
        cacheKey := g.cacheKey(&req)
        if cached, ok := g.cache.Get(cacheKey); ok {
            g.writeResponse(w, cached)
            g.metrics.RecordCacheHit(req.Method, time.Since(start))
            return
        }
        
        // 6. 路由到节点
        node := g.router.Select(&req)
        resp, err := g.pool.Forward(r.Context(), node, &req)
        if err != nil {
            g.writeError(w, http.StatusBadGateway, "node error")
            g.metrics.RecordError(req.Method, err)
            return
        }
        
        // 7. 缓存响应
        if g.shouldCache(&req) {
            g.cache.Set(cacheKey, resp, g.cacheTTL(&req))
        }
        
        // 8. 返回响应
        g.writeResponse(w, resp)
        g.metrics.RecordRequest(req.Method, time.Since(start))
    })
}

四、DDoS 防护策略

4.1 网络层防护

4.1.1 使用 Cloudflare 或类似服务

用户请求 → Cloudflare Edge → DDoS 清洗 → 源站

配置要点:

# Cloudflare 配置建议
security:
  # 安全级别
  security_level: high  # under_attack_mode 可在攻击时启用
  
  # 防火墙规则
  firewall_rules:
    - name: "Block known bad IPs"
      expression: "ip.src in $bad_ips"
      action: block
      
    - name: "Rate limit per IP"
      expression: "rate_limit > 100/minute"
      action: challenge
      
    - name: "Geo blocking"
      expression: "not ip.geo.country in {'US', 'JP', 'SG', 'HK'}"
      action: block
      
  # Bot 管理
  bot_management:
    enable: true
    fight_mode: true  # 启用 Bot Fight Mode
    
  # 缓存规则
  cache_rules:
    - name: "Cache eth_chainId"
      expression: "http.request.body contains \"eth_chainId\""
      ttl: 86400

4.1.2 IP 黑名单管理

// internal/security/blacklist.go
package security

import (
    "context"
    "sync"
    "time"
)

type Blacklist struct {
    mu       sync.RWMutex
    ips      map[string]struct{}
    prefixes []string // CIDR 前缀
    store    BlacklistStore
}

type BlacklistStore interface {
    Get(ctx context.Context, ip string) (bool, error)
    Add(ctx context.Context, ip string, ttl time.Duration) error
    Remove(ctx context.Context, ip string) error
}

func NewBlacklist(store BlacklistStore) *Blacklist {
    return &Blacklist{
        ips:      make(map[string]struct{}),
        prefixes: []string{},
        store:    store,
    }
}

func (b *Blacklist) IsBlocked(ip string) bool {
    b.mu.RLock()
    defer b.mu.RUnlock()
    
    // 精确匹配
    if _, ok := b.ips[ip]; ok {
        return true
    }
    
    // CIDR 前缀匹配
    for _, prefix := range b.prefixes {
        if matchCIDR(ip, prefix) {
            return true
        }
    }
    
    return false
}

func (b *Blacklist) Add(ip string, ttl time.Duration) {
    b.mu.Lock()
    defer b.mu.Unlock()
    b.ips[ip] = struct{}{}
    
    // 持久化存储
    go b.store.Add(context.Background(), ip, ttl)
}

// 自动封禁逻辑
func (b *Blacklist) AutoBan(ip string, violations int, window time.Duration) {
    if violations > 100 { // 1分钟内超过100次违规
        b.Add(ip, 24*time.Hour)
    }
}

4.2 应用层防护

4.2.1 请求验证

// internal/security/validator.go
package security

import (
    "regexp"
    "strings"
)

type RequestValidator struct {
    allowedMethods map[string]bool
    maxBatchSize   int
    maxRequestSize int64
}

func NewRequestValidator() *RequestValidator {
    // 允许的 RPC 方法白名单
    allowed := map[string]bool{
        "eth_chainId":           true,
        "eth_blockNumber":       true,
        "eth_getBalance":        true,
        "eth_getBlockByNumber":  true,
        "eth_getTransactionByHash": true,
        "eth_call":              true,
        "eth_estimateGas":       true,
        "eth_getLogs":           true, // 需要额外限制
        "eth_sendRawTransaction": true,
        // 禁止的方法
        // "eth_getLogs" 大范围查询需要特殊处理
    }
    
    return &RequestValidator{
        allowedMethods: allowed,
        maxBatchSize:   10,
        maxRequestSize: 1 << 20, // 1MB
    }
}

func (v *RequestValidator) Validate(req *JSONRPCRequest) error {
    // 检查请求大小
    if len(req.Raw) > int(v.maxRequestSize) {
        return ErrRequestTooLarge
    }
    
    // 检查方法是否允许
    if !v.allowedMethods[req.Method] {
        return ErrMethodNotAllowed
    }
    
    // 检查批量请求大小
    if req.IsBatch && len(req.Batch) > v.maxBatchSize {
        return ErrBatchTooLarge
    }
    
    // 特殊方法的参数验证
    if req.Method == "eth_getLogs" {
        return v.validateGetLogs(req)
    }
    
    return nil
}

func (v *RequestValidator) validateGetLogs(req *JSONRPCRequest) error {
    params := req.Params.([]interface{})
    if len(params) == 0 {
        return nil
    }
    
    filter, ok := params[0].(map[string]interface{})
    if !ok {
        return nil
    }
    
    // 限制区块范围
    fromBlock, _ := filter["fromBlock"].(string)
    toBlock, _ := filter["toBlock"].(string)
    
    if fromBlock != "" && toBlock != "" {
        rangeSize := parseBlockRange(fromBlock, toBlock)
        if rangeSize > 1000 { // 限制最大1000个区块
            return ErrBlockRangeTooLarge
        }
    }
    
    return nil
}

4.2.2 慢速攻击防护

// internal/security/slowloris.go
package security

import (
    "net"
    "net/http"
    "sync"
    "time"
)

type SlowlorisDefense struct {
    mu             sync.Mutex
    connections    map[string]*ConnectionInfo
    maxConnPerIP   int
    readTimeout    time.Duration
    writeTimeout   time.Duration
    minReadRate    int64 // bytes per second
}

type ConnectionInfo struct {
    IP         string
    ConnTime   time.Time
    LastActive time.Time
    BytesRead  int64
}

func NewSlowlorisDefense() *SlowlorisDefense {
    return &SlowlorisDefense{
        connections:  make(map[string]*ConnectionInfo),
        maxConnPerIP: 10,
        readTimeout:  10 * time.Second,
        writeTimeout: 30 * time.Second,
        minReadRate:  100, // 最低100 bytes/s
    }
}

func (s *SlowlorisDefense) WrapHandler(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ip := getRealIP(r)
        
        // 检查连接数
        if !s.checkConnectionLimit(ip) {
            http.Error(w, "too many connections", http.StatusTooManyRequests)
            return
        }
        
        // 记录连接
        connInfo := s.trackConnection(ip)
        defer s.releaseConnection(ip)
        
        // 设置超时
        conn := r.Context().Value(http.ServerContextKey).(*http.Server)
        conn.ReadTimeout = s.readTimeout
        conn.WriteTimeout = s.writeTimeout
        
        next.ServeHTTP(w, r)
    })
}

func (s *SlowlorisDefense) checkConnectionLimit(ip string) bool {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    count := 0
    for _, info := range s.connections {
        if info.IP == ip {
            count++
        }
    }
    
    return count < s.maxConnPerIP
}

五、限流算法实现

5.1 多维度限流策略

┌─────────────────────────────────────────────────────────────────┐
│                        限流维度                                  │
├─────────────────────────────────────────────────────────────────┤
│  全局限流     │ 整个网关的总 QPS 限制                            │
│  IP 限流     │ 单个 IP 的请求频率限制                            │
│  API Key 限流 │ 单个 API Key 的配额限制                          │
│  方法限流    │ 特定 RPC 方法的调用限制                           │
│  用户等级限流 │ 不同付费等级的差异化限流                          │
└─────────────────────────────────────────────────────────────────┘

5.2 令牌桶算法实现

// internal/ratelimit/token_bucket.go
package ratelimit

import (
    "sync"
    "time"
)

// TokenBucket 令牌桶算法
type TokenBucket struct {
    mu         sync.Mutex
    capacity   int64     // 桶容量
    tokens     int64     // 当前令牌数
    rate       int64     // 令牌生成速率 (tokens/second)
    lastUpdate time.Time // 上次更新时间
}

func NewTokenBucket(capacity, rate int64) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        tokens:     capacity,
        rate:       rate,
        lastUpdate: time.Now(),
    }
}

func (tb *TokenBucket) Allow(n int64) bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    // 计算新增令牌
    now := time.Now()
    elapsed := now.Sub(tb.lastUpdate).Seconds()
    newTokens := int64(elapsed * float64(tb.rate))
    
    // 更新令牌数
    tb.tokens = min(tb.capacity, tb.tokens+newTokens)
    tb.lastUpdate = now
    
    // 检查是否有足够令牌
    if tb.tokens >= n {
        tb.tokens -= n
        return true
    }
    
    return false
}

func (tb *TokenBucket) Wait(n int64) time.Duration {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    now := time.Now()
    elapsed := now.Sub(tb.lastUpdate).Seconds()
    newTokens := int64(elapsed * float64(tb.rate))
    
    tb.tokens = min(tb.capacity, tb.tokens+newTokens)
    tb.lastUpdate = now
    
    if tb.tokens >= n {
        tb.tokens -= n
        return 0
    }
    
    // 计算等待时间
    needed := n - tb.tokens
    waitTime := time.Duration(float64(needed)/float64(tb.rate)) * time.Second
    return waitTime
}

5.3 滑动窗口算法实现

// internal/ratelimit/sliding_window.go
package ratelimit

import (
    "context"
    "time"
)

// SlidingWindow 滑动窗口限流 (Redis 实现)
type SlidingWindow struct {
    client    RedisClient
    key       string
    limit     int64
    window    time.Duration
}

func NewSlidingWindow(client RedisClient, key string, limit int64, window time.Duration) *SlidingWindow {
    return &SlidingWindow{
        client: client,
        key:    key,
        limit:  limit,
        window: window,
    }
}

func (sw *SlidingWindow) Allow(ctx context.Context) (bool, error) {
    now := time.Now().UnixMilli()
    windowStart := now - sw.window.Milliseconds()
    
    // Lua 脚本保证原子性
    script := `
        local key = KEYS[1]
        local now = tonumber(ARGV[1])
        local window_start = tonumber(ARGV[2])
        local limit = tonumber(ARGV[3])
        
        -- 移除过期记录
        redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
        
        -- 获取当前窗口内的请求数
        local count = redis.call('ZCARD', key)
        
        if count < limit then
            -- 添加新请求
            redis.call('ZADD', key, now, now .. '-' .. math.random())
            redis.call('PEXPIRE', key, ARGV[4])
            return 1
        end
        
        return 0
    `
    
    result, err := sw.client.Eval(ctx, script, 
        []string{sw.key},
        now, windowStart, sw.limit, sw.window.Milliseconds(),
    ).Int()
    
    if err != nil {
        return false, err
    }
    
    return result == 1, nil
}

func (sw *SlidingWindow) Count(ctx context.Context) (int64, error) {
    now := time.Now().UnixMilli()
    windowStart := now - sw.window.Milliseconds()
    
    count, err := sw.client.ZCount(ctx, sw.key, 
        string(windowStart), string(now),
    ).Result()
    
    return count, err
}

5.4 分布式限流器

// internal/ratelimit/distributed_limiter.go
package ratelimit

import (
    "context"
    "time"
)

// DistributedLimiter 分布式限流器
type DistributedLimiter struct {
    client RedisClient
    
    // 限流配置
    globalLimit    int64
    globalWindow   time.Duration
    
    ipLimit        int64
    ipWindow       time.Duration
    
    methodLimits   map[string]MethodLimit
}

type MethodLimit struct {
    Limit  int64
    Window time.Duration
}

type RateLimitConfig struct {
    GlobalLimit  int64         `yaml:"global_limit"`
    GlobalWindow time.Duration `yaml:"global_window"`
    IPLimit      int64         `yaml:"ip_limit"`
    IPWindow     time.Duration `yaml:"ip_window"`
    Methods      map[string]MethodLimit `yaml:"methods"`
}

func NewDistributedLimiter(client RedisClient, cfg RateLimitConfig) *DistributedLimiter {
    return &DistributedLimiter{
        client:        client,
        globalLimit:   cfg.GlobalLimit,
        globalWindow:  cfg.GlobalWindow,
        ipLimit:       cfg.IPLimit,
        ipWindow:      cfg.IPWindow,
        methodLimits:  cfg.Methods,
    }
}

// Check 多维度限流检查
func (dl *DistributedLimiter) Check(ctx context.Context, ip, apiKey, method string) (bool, error) {
    // 1. 全局限流
    globalKey := "ratelimit:global"
    globalLimiter := NewSlidingWindow(dl.client, globalKey, dl.globalLimit, dl.globalWindow)
    if ok, err := globalLimiter.Allow(ctx); !ok || err != nil {
        return false, err
    }
    
    // 2. IP 限流
    ipKey := "ratelimit:ip:" + ip
    ipLimiter := NewSlidingWindow(dl.client, ipKey, dl.ipLimit, dl.ipWindow)
    if ok, err := ipLimiter.Allow(ctx); !ok || err != nil {
        return false, err
    }
    
    // 3. API Key 限流 (如果提供)
    if apiKey != "" {
        apiKeyKey := "ratelimit:apikey:" + apiKey
        apiKeyLimiter := NewSlidingWindow(dl.client, apiKeyKey, dl.ipLimit, dl.ipWindow)
        if ok, err := apiKeyLimiter.Allow(ctx); !ok || err != nil {
            return false, err
        }
    }
    
    // 4. 方法限流
    if methodLimit, ok := dl.methodLimits[method]; ok {
        methodKey := "ratelimit:method:" + method
        methodLimiter := NewSlidingWindow(dl.client, methodKey, methodLimit.Limit, methodLimit.Window)
        if ok, err := methodLimiter.Allow(ctx); !ok || err != nil {
            return false, err
        }
    }
    
    return true, nil
}

5.5 限流配置示例

# config/ratelimit.yaml
rate_limit:
  # 全局限流
  global_limit: 100000      # 10万 QPS
  global_window: 1s
  
  # 单 IP 限流
  ip_limit: 100             # 100 QPS/IP
  ip_window: 1s
  
  # 方法级别限流
  methods:
    eth_call:
      limit: 50
      window: 1s
      
    eth_getLogs:
      limit: 10             # 更严格的限制
      window: 1s
      
    eth_estimateGas:
      limit: 20
      window: 1s
      
    eth_sendRawTransaction:
      limit: 10
      window: 1s
      
    eth_getBalance:
      limit: 100
      window: 1s
      
    # 批量请求
    batch:
      limit: 20
      window: 1s

六、缓存层设计

6.1 缓存策略

┌─────────────────────────────────────────────────────────────────┐
│                        缓存分层设计                              │
├─────────────────────────────────────────────────────────────────┤
│  L1: 本地内存缓存                                               │
│  - 热点数据                                                     │
│  - chainId, blockNumber 等                                      │
│  - TTL: 1-10s                                                   │
├─────────────────────────────────────────────────────────────────┤
│  L2: Redis 分布式缓存                                            │
│  - 共享数据                                                     │
│  - 交易收据、区块信息                                            │
│  - TTL: 10s-1h                                                  │
├─────────────────────────────────────────────────────────────────┤
│  L3: 区块链节点                                                  │
│  - 最终数据源                                                   │
└─────────────────────────────────────────────────────────────────┘

6.2 缓存实现

// internal/cache/cache.go
package cache

import (
    "context"
    "encoding/json"
    "time"
)

// Cache 多级缓存
type Cache struct {
    local  *LocalCache
    remote *RedisCache
}

type CacheConfig struct {
    LocalCacheSize   int           `yaml:"local_cache_size"`
    LocalCacheTTL    time.Duration `yaml:"local_cache_ttl"`
    RedisAddr        string        `yaml:"redis_addr"`
    RedisPoolSize    int           `yaml:"redis_pool_size"`
}

func NewCache(cfg CacheConfig) (*Cache, error) {
    local := NewLocalCache(cfg.LocalCacheSize, cfg.LocalCacheTTL)
    remote, err := NewRedisCache(cfg.RedisAddr, cfg.RedisPoolSize)
    if err != nil {
        return nil, err
    }
    
    return &Cache{
        local:  local,
        remote: remote,
    }, nil
}

// Get 多级缓存获取
func (c *Cache) Get(ctx context.Context, key string, dest interface{}) error {
    // 1. 先查本地缓存
    if val, ok := c.local.Get(key); ok {
        return json.Unmarshal(val, dest)
    }
    
    // 2. 查 Redis
    val, err := c.remote.Get(ctx, key)
    if err == nil {
        // 回填本地缓存
        c.local.Set(key, val)
        return json.Unmarshal(val, dest)
    }
    
    return err
}

// Set 多级缓存设置
func (c *Cache) Set(ctx context.Context, key string, val interface{}, ttl time.Duration) error {
    data, err := json.Marshal(val)
    if err != nil {
        return err
    }
    
    // 同时写入两级缓存
    c.local.Set(key, data)
    return c.remote.Set(ctx, key, data, ttl)
}

// Delete 删除缓存
func (c *Cache) Delete(ctx context.Context, key string) error {
    c.local.Delete(key)
    return c.remote.Delete(ctx, key)
}

6.3 本地缓存实现

// internal/cache/local.go
package cache

import (
    "sync"
    "time"
)

// LocalCache 本地内存缓存 (LRU)
type LocalCache struct {
    mu       sync.RWMutex
    capacity int
    ttl      time.Duration
    items    map[string]*cacheItem
    order    *list.List // LRU 顺序
}

type cacheItem struct {
    key       string
    value     []byte
    expireAt  time.Time
    element   *list.Element
}

func NewLocalCache(capacity int, ttl time.Duration) *LocalCache {
    c := &LocalCache{
        capacity: capacity,
        ttl:      ttl,
        items:    make(map[string]*cacheItem),
        order:    list.New(),
    }
    
    // 启动清理协程
    go c.cleanup()
    
    return c
}

func (c *LocalCache) Get(key string) ([]byte, bool) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    item, ok := c.items[key]
    if !ok {
        return nil, false
    }
    
    // 检查过期
    if time.Now().After(item.expireAt) {
        c.deleteLocked(key)
        return nil, false
    }
    
    // 更新 LRU 顺序
    c.order.MoveToFront(item.element)
    
    return item.value, true
}

func (c *LocalCache) Set(key string, value []byte) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    // 已存在则更新
    if item, ok := c.items[key]; ok {
        item.value = value
        item.expireAt = time.Now().Add(c.ttl)
        c.order.MoveToFront(item.element)
        return
    }
    
    // 容量不足则淘汰
    if c.order.Len() >= c.capacity {
        oldest := c.order.Back()
        if oldest != nil {
            c.deleteLocked(oldest.Value.(string))
        }
    }
    
    // 添加新项
    element := c.order.PushFront(key)
    c.items[key] = &cacheItem{
        key:      key,
        value:    value,
        expireAt: time.Now().Add(c.ttl),
        element:  element,
    }
}

func (c *LocalCache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.deleteLocked(key)
}

func (c *LocalCache) deleteLocked(key string) {
    if item, ok := c.items[key]; ok {
        c.order.Remove(item.element)
        delete(c.items, key)
    }
}

func (c *LocalCache) cleanup() {
    ticker := time.NewTicker(time.Minute)
    for range ticker.C {
        c.mu.Lock()
        now := time.Now()
        for key, item := range c.items {
            if now.After(item.expireAt) {
                c.deleteLocked(key)
            }
        }
        c.mu.Unlock()
    }
}

6.4 缓存键设计

// internal/cache/keys.go
package cache

import (
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "fmt"
)

// CacheKeyGenerator 缓存键生成器
type CacheKeyGenerator struct {
    prefix string
}

func NewCacheKeyGenerator(prefix string) *CacheKeyGenerator {
    return &CacheKeyGenerator{prefix: prefix}
}

// Generate 生成缓存键
func (g *CacheKeyGenerator) Generate(method string, params interface{}) string {
    // 序列化参数
    paramsJSON, _ := json.Marshal(params)
    
    // 计算参数哈希
    hash := sha256.Sum256(paramsJSON)
    paramsHash := hex.EncodeToString(hash[:])[:16]
    
    return fmt.Sprintf("%s:%s:%s", g.prefix, method, paramsHash)
}

// 区块相关缓存键
func (g *CacheKeyGenerator) BlockByNumber(blockNumber string) string {
    return fmt.Sprintf("%s:block:number:%s", g.prefix, blockNumber)
}

func (g *CacheKeyGenerator) BlockByHash(blockHash string) string {
    return fmt.Sprintf("%s:block:hash:%s", g.prefix, blockHash)
}

// 交易相关缓存键
func (g *CacheKeyGenerator) TransactionByHash(txHash string) string {
    return fmt.Sprintf("%s:tx:hash:%s", g.prefix, txHash)
}

func (g *CacheKeyGenerator) TransactionReceipt(txHash string) string {
    return fmt.Sprintf("%s:tx:receipt:%s", g.prefix, txHash)
}

// 账户相关缓存键
func (g *CacheKeyGenerator) Balance(address, blockTag string) string {
    return fmt.Sprintf("%s:balance:%s:%s", g.prefix, address, blockTag)
}

func (g *CacheKeyGenerator) Nonce(address, blockTag string) string {
    return fmt.Sprintf("%s:nonce:%s:%s", g.prefix, address, blockTag)
}

// 合约调用缓存键
func (g *CacheKeyGenerator) EthCall(to string, data string, blockTag string) string {
    return fmt.Sprintf("%s:call:%s:%s:%s", g.prefix, to, data[:16], blockTag)
}

6.5 缓存 TTL 策略

// internal/cache/ttl.go
package cache

import "time"

// MethodTTL 不同方法的缓存时间
var MethodTTL = map[string]time.Duration{
    // 区块链状态 - 不缓存或极短
    "eth_blockNumber":    1 * time.Second,
    "eth_gasPrice":       5 * time.Second,
    "eth_chainId":        24 * time.Hour, // 链 ID 不会变
    
    // 区块数据 - 永久缓存 (已确认区块)
    "eth_getBlockByNumber":  5 * time.Minute, // 最新区块短缓存
    "eth_getBlockByHash":    24 * time.Hour,  // 历史区块永久缓存
    "eth_getBlockTransactionCountByNumber": 5 * time.Minute,
    "eth_getBlockTransactionCountByHash":   24 * time.Hour,
    
    // 交易数据
    "eth_getTransactionByHash":         24 * time.Hour,
    "eth_getTransactionReceipt":        24 * time.Hour,
    "eth_getTransactionByBlockHashAndIndex": 24 * time.Hour,
    
    // 账户数据 - 短缓存
    "eth_getBalance":     10 * time.Second,
    "eth_getNonce":       10 * time.Second,
    "eth_getCode":        1 * time.Hour,
    "eth_getStorageAt":   10 * time.Second,
    
    // 合约调用 - 根据区块标签
    "eth_call":          10 * time.Second, // latest
    "eth_estimateGas":   0,                // 不缓存
    
    // 日志查询 - 不缓存或极短
    "eth_getLogs":       0, // 不缓存,结果可能很大
    
    // 发送交易 - 不缓存
    "eth_sendRawTransaction": 0,
}

// GetTTL 获取方法的缓存时间
func GetTTL(method string) time.Duration {
    if ttl, ok := MethodTTL[method]; ok {
        return ttl
    }
    return 10 * time.Second // 默认 10 秒
}

// IsCacheable 判断是否可缓存
func IsCacheable(method string) bool {
    ttl, ok := MethodTTL[method]
    return ok && ttl > 0
}

七、监控与告警

7.1 监控指标

// internal/metrics/metrics.go
package metrics

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    // 请求计数
    RequestsTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rpc_gateway_requests_total",
            Help: "Total number of RPC requests",
        },
        []string{"method", "status"},
    )
    
    // 请求延迟
    RequestDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "rpc_gateway_request_duration_seconds",
            Help:    "Request duration in seconds",
            Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10},
        },
        []string{"method"},
    )
    
    // 活跃连接数
    ActiveConnections = promauto.NewGauge(
        prometheus.GaugeOpts{
            Name: "rpc_gateway_active_connections",
            Help: "Number of active connections",
        },
    )
    
    // 限流计数
    RateLimitHits = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rpc_gateway_rate_limit_hits_total",
            Help: "Total number of rate limit hits",
        },
        []string{"type", "key"},
    )
    
    // 缓存命中率
    CacheHits = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rpc_gateway_cache_hits_total",
            Help: "Total number of cache hits",
        },
        []string{"level", "method"},
    )
    
    // 节点健康状态
    NodeHealth = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "rpc_gateway_node_health",
            Help: "Node health status (1=healthy, 0=unhealthy)",
        },
        []string{"node"},
    )
    
    // 节点请求分布
    NodeRequests = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rpc_gateway_node_requests_total",
            Help: "Total requests per node",
        },
        []string{"node", "method"},
    )
    
    // DDoS 攻击检测
    DDoSAlerts = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rpc_gateway_ddos_alerts_total",
            Help: "Total number of DDoS alerts",
        },
        []string{"type", "severity"},
    )
)

7.2 告警规则

# config/alerting.yaml
groups:
  - name: rpc_gateway
    rules:
      # 高错误率
      - alert: HighErrorRate
        expr: |
          sum(rate(rpc_gateway_requests_total{status=~"5.."}[5m])) /
          sum(rate(rpc_gateway_requests_total[5m])) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }}"
      
      # 高延迟
      - alert: HighLatency
        expr: |
          histogram_quantile(0.99, 
            sum(rate(rpc_gateway_request_duration_seconds_bucket[5m])) by (le, method)
          ) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High latency detected"
          description: "P99 latency for {{ $labels.method }} is {{ $value }}s"
      
      # 限流触发过多
      - alert: HighRateLimitHits
        expr: |
          sum(rate(rpc_gateway_rate_limit_hits_total[5m])) > 100
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "High rate limit hits"
          description: "Rate limit hits: {{ $value }}/s"
      
      # 节点不健康
      - alert: NodeUnhealthy
        expr: rpc_gateway_node_health == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Node {{ $labels.node }} is unhealthy"
      
      # 可疑流量模式
      - alert: SuspiciousTrafficPattern
        expr: |
          sum(rate(rpc_gateway_requests_total[1m])) > 
          2 * sum(rate(rpc_gateway_requests_total[1h] offset 1h))
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Suspicious traffic pattern detected"
          description: "Current traffic is 2x higher than usual"
      
      # 缓存命中率下降
      - alert: LowCacheHitRate
        expr: |
          sum(rate(rpc_gateway_cache_hits_total{level="l1"}[5m])) /
          sum(rate(rpc_gateway_requests_total[5m])) < 0.3
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Low cache hit rate"
          description: "L1 cache hit rate is {{ $value | humanizePercentage }}"

7.3 健康检查

// internal/health/health.go
package health

import (
    "context"
    "net/http"
    "sync"
    "time"
)

type HealthChecker struct {
    checks map[string]CheckFunc
    mu     sync.RWMutex
}

type CheckFunc func(ctx context.Context) error

type HealthStatus struct {
    Status    string            `json:"status"`
    Timestamp time.Time         `json:"timestamp"`
    Checks    map[string]Check  `json:"checks"`
}

type Check struct {
    Status  string `json:"status"`
    Latency string `json:"latency"`
    Error   string `json:"error,omitempty"`
}

func NewHealthChecker() *HealthChecker {
    return &HealthChecker{
        checks: make(map[string]CheckFunc),
    }
}

func (h *HealthChecker) Register(name string, check CheckFunc) {
    h.mu.Lock()
    defer h.mu.Unlock()
    h.checks[name] = check
}

func (h *HealthChecker) Check(ctx context.Context) HealthStatus {
    h.mu.RLock()
    defer h.mu.RUnlock()
    
    status := HealthStatus{
        Status:    "healthy",
        Timestamp: time.Now(),
        Checks:    make(map[string]Check),
    }
    
    var wg sync.WaitGroup
    var mu sync.Mutex
    
    for name, check := range h.checks {
        wg.Add(1)
        go func(name string, check CheckFunc) {
            defer wg.Done()
            
            start := time.Now()
            err := check(ctx)
            latency := time.Since(start)
            
            mu.Lock()
            defer mu.Unlock()
            
            checkResult := Check{
                Latency: latency.String(),
            }
            
            if err != nil {
                checkResult.Status = "unhealthy"
                checkResult.Error = err.Error()
                status.Status = "unhealthy"
            } else {
                checkResult.Status = "healthy"
            }
            
            status.Checks[name] = checkResult
        }(name, check)
    }
    
    wg.Wait()
    return status
}

func (h *HealthChecker) Handler() http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        status := h.Check(r.Context())
        
        w.Header().Set("Content-Type", "application/json")
        if status.Status == "unhealthy" {
            w.WriteHeader(http.StatusServiceUnavailable)
        }
        
        json.NewEncoder(w).Encode(status)
    }
}

八、部署方案

8.1 容器化部署

# Dockerfile
FROM golang:1.21-alpine AS builder

WORKDIR /app

# 依赖
COPY go.mod go.sum ./
RUN go mod download

# 构建
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o rpc-gateway ./cmd/gateway

# 运行镜像
FROM alpine:3.19

RUN apk --no-cache add ca-certificates tzdata

WORKDIR /app

COPY --from=builder /app/rpc-gateway .
COPY --from=builder /app/config ./config

EXPOSE 8080 9090

ENTRYPOINT ["./rpc-gateway"]
CMD ["--config", "config/config.yaml"]

8.2 Kubernetes 部署

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rpc-gateway
  labels:
    app: rpc-gateway
spec:
  replicas: 3
  selector:
    matchLabels:
      app: rpc-gateway
  template:
    metadata:
      labels:
        app: rpc-gateway
    spec:
      containers:
        - name: gateway
          image: rpc-gateway:latest
          ports:
            - containerPort: 8080
              name: http
            - containerPort: 9090
              name: metrics
          resources:
            requests:
              cpu: "500m"
              memory: "512Mi"
            limits:
              cpu: "2000m"
              memory: "2Gi"
          env:
            - name: REDIS_ADDR
              valueFrom:
                secretKeyRef:
                  name: rpc-gateway-secrets
                  key: redis-addr
            - name: NODE_URLS
              valueFrom:
                configMapKeyRef:
                  name: rpc-gateway-config
                  key: node-urls
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 10
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 5
          volumeMounts:
            - name: config
              mountPath: /app/config
      volumes:
        - name: config
          configMap:
            name: rpc-gateway-config
---
apiVersion: v1
kind: Service
metadata:
  name: rpc-gateway
spec:
  selector:
    app: rpc-gateway
  ports:
    - port: 80
      targetPort: 8080
      name: http
    - port: 9090
      targetPort: 9090
      name: metrics
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: rpc-gateway-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: rpc-gateway
  minReplicas: 3
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: rpc-gateway-pdb
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: rpc-gateway

8.3 配置管理

# config/config.yaml
server:
  http_addr: ":8080"
  metrics_addr: ":9090"
  read_timeout: 10s
  write_timeout: 30s
  max_connections: 10000

rate_limit:
  global_limit: 100000
  global_window: 1s
  ip_limit: 100
  ip_window: 1s
  methods:
    eth_call:
      limit: 50
      window: 1s
    eth_getLogs:
      limit: 10
      window: 1s
    eth_estimateGas:
      limit: 20
      window: 1s

cache:
  local_size: 10000
  local_ttl: 10s
  redis_addr: "${REDIS_ADDR}"
  redis_pool_size: 100

nodes:
  - name: "node-1"
    url: "${NODE_URL_1}"
    weight: 100
    max_connections: 100
  - name: "node-2"
    url: "${NODE_URL_2}"
    weight: 100
    max_connections: 100
  - name: "node-3"
    url: "${NODE_URL_3}"
    weight: 50
    max_connections: 50

security:
  blacklist_ttl: 24h
  auto_ban_threshold: 100
  auto_ban_window: 1m

logging:
  level: info
  format: json

monitoring:
  enabled: true
  prometheus_addr: ":9090"

8.4 多区域部署

┌─────────────────────────────────────────────────────────────────┐
│                        全球负载均衡                              │
│                    (AWS Route53 / Cloudflare)                   │
└───────────────────────────┬─────────────────────────────────────┘
                            │
         ┌──────────────────┼──────────────────┐
         │                  │                  │
         ▼                  ▼                  ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│   us-east-1     │ │   eu-west-1      │ │   ap-southeast-1│
│   (美国东部)     │ │   (欧洲西部)      │ │   (亚太东南)     │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ - API Gateway   │ │ - API Gateway   │ │ - API Gateway   │
│ - Redis Cluster │ │ - Redis Cluster │ │ - Redis Cluster │
│ - Node Cluster  │ │ - Node Cluster  │ │ - Node Cluster  │
└─────────────────┘ └─────────────────┘ └─────────────────┘

九、成本估算

9.1 基础设施成本 (月)

组件规格数量单价月费用
API Gateway4C8G3$100$300
Redis Cluster4C16G3$150$450
区块链节点8C32G3$300$900
负载均衡-1$50$50
CDN/WAFCloudflare Pro1$20$20
监控Prometheus + Grafana1$50$50
总计$1,770

9.2 扩展成本

QPSGateway 实例节点实例月费用增量
10K33基准
50K66+$1,770
100K1212+$5,310

总结

构建抗 DDoS 的 Web3 RPC 网关需要多层防护:

  1. 网络层:CDN/WAF 清洗、Anycast 路由
  2. 应用层:限流、请求验证、方法白名单
  3. 资源层:连接池、熔断降级、缓存
  4. 监控层:实时监控、自动告警、快速响应

核心设计原则:

  • 纵深防御:多层防护,单点失效不影响整体
  • 弹性扩展:水平扩展应对流量峰值
  • 快速恢复:自动故障转移,最小化影响

本文仅供技术学习参考,不构成任何投资建议。