AI agent Privacy Boundary Enforcement:PII、敏感字段和租户隔离如何在运行时强制生效

HTMLPAGE 团队
14 分钟阅读

别让敏感数据泄露!本文提供 PII 识别、数据分类、访问控制和拦截机制的完整方案,结合 GDPR、CCPA 等法规要求,在运行时强制保护隐私边界,确保敏感数据不被未授权访问或泄露。

#privacy boundary #PII protection #data classification #sensitive data handling #tenant isolation

为什么需要 Privacy Boundary Enforcement?

2025 年 8 月,某 HR SaaS 平台的 customer support agent 犯了一个严重错误:在处理一个客户的工单时,agent 意外返回了另一个客户的员工薪资数据。原因是 agent 在查询数据库时,没有正确应用租户隔离过滤条件,导致跨租户数据泄露。

这次事故导致了:

  • GDPR 罚款:€2.4M(因为泄露了欧盟员工的个人数据)。
  • 客户流失:3 个大客户终止了合同。
  • 声誉损失:负面新闻持续数周,新客户签约率下降 40%。

根本原因是什么?Privacy Boundary(隐私边界)没有在运行时强制生效。虽然系统设计时有租户隔离的概念,但在代码实现中,某个查询漏掉了 WHERE tenant_id = ? 条件,而这个错误直到事故发生前都没有被发现。

Privacy Boundary Enforcement 就是为了解决这类问题而设计的系统性方案。它不是简单的"加个 WHERE 条件",而是提供PII 识别、数据分类、访问控制、运行时拦截和持续监控的完整框架,确保隐私边界在任何情况下都不会被突破。

为什么需要 Privacy Boundary?

1. 法规要求(Regulatory Requirements)

越来越多的法规要求保护个人数据和敏感信息:

  • GDPR:要求保护个人数据(Personal Data),包括姓名、邮箱、位置、健康信息等。
  • CCPA(加州消费者隐私法):要求保护加州居民的个人信息。
  • HIPAA(美国医疗行业):要求保护患者健康信息(PHI)。
  • PCI DSS(支付行业):要求保护持卡人数据(如信用卡号)。
  • 中国个人信息保护法:要求保护中国公民的个人信息。

违反这些法规可能导致巨额罚款(GDPR 最高可达全球年收入的 4% 或 €20M)。

2. 用户信任(User Trust)

用户越来越关注隐私保护:

  • 87% 的用户表示会因为隐私问题停止使用某个服务(Pew Research, 2025)。
  • 63% 的用户表示会阅读隐私政策,但只有 12% 真正理解(Deloitte, 2025)。
  • 隐私保护成为竞争优势:Apple 的"Privacy is a fundamental human right"营销策略成功吸引了大量注重隐私的用户。

如果发生数据泄露,用户信任一旦失去,很难重建。

3. 风险控制(Risk Management)

隐私边界不仅是合规问题,更是风险管理问题:

  • 内部威胁:员工可能滥用访问权限,查看不该看的数据。
  • 外部攻击:黑客可能通过 SQL 注入、API 漏洞等方式窃取数据。
  • 意外泄露:开发人员可能在日志中打印敏感数据,或在测试环境使用生产数据。

Privacy Boundary Enforcement 可以在运行时自动检测和阻止这些风险。

PII 识别与分类

什么是 PII?

**PII(Personally Identifiable Information,个人身份信息)**是指可以单独或与其他信息结合识别特定个人的任何信息。

常见 PII 类型

  • 直接标识符:姓名、身份证号、护照号、社保号、驾照号。
  • 联系信息:邮箱地址、电话号码、家庭住址。
  • 生物特征:指纹、面部识别数据、DNA。
  • 财务信息:银行账号、信用卡号、收入、税务记录。
  • 健康信息:病历、诊断结果、处方、保险信息。
  • 在线标识符:IP 地址、Cookie ID、设备 ID、位置数据。
  • 其他:种族、宗教、政治观点、性取向(这些在某些法规中被视为"特殊类别数据",需要更严格的保护)。

自动识别 PII

方法一:正则表达式匹配

适用于结构化数据(如数据库字段、JSON 对象)。

const piiPatterns: Record<string, RegExp> = {
  email: /^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$/,
  phone: /^\+?[1-9]\d{1,14}$/, // E.164 格式
  ssn: /^\d{3}-\d{2}-\d{4}$/, // 美国社保号
  credit_card: /^\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}$/,
  ip_address: /^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$/,
};

function detectPII(value: string): string[] {
  const detected: string[] = [];
  
  for (const [type, pattern] of Object.entries(piiPatterns)) {
    if (pattern.test(value)) {
      detected.push(type);
    }
  }
  
  return detected;
}

// 使用
console.log(detectPII("john.doe@example.com")); // ["email"]
console.log(detectPII("123-45-6789")); // ["ssn"]

优点:简单快速,适合已知格式的数据。
缺点:无法识别非结构化文本中的 PII(如 "My name is John Doe and I live at 123 Main St")。

方法二:ML 分类器

适用于非结构化文本(如用户消息、文档、日志)。

实现方式

  • 预训练模型:使用 spaCy、Stanford NER、AWS Comprehend 等 NER(Named Entity Recognition)模型。
  • 自定义训练:使用标注好的 PII 数据集训练自定义模型。
  • 云服务:使用 Google Cloud DLP、Azure Text Analytics、AWS Macie 等云服务的 PII 检测 API。

示例(使用 spaCy):

import spacy

# 加载预训练模型
nlp = spacy.load("en_core_web_lg")

def detect_pii_in_text(text: str) -> list[dict]:
    doc = nlp(text)
    
    pii_entities = []
    for ent in doc.ents:
        if ent.label_ in ["PERSON", "ORG", "GPE", "DATE", "EMAIL", "PHONE"]:
            pii_entities.append({
                "text": ent.text,
                "label": ent.label_,
                "start": ent.start_char,
                "end": ent.end_char,
            })
    
    return pii_entities

# 使用
text = "My name is John Doe and my email is john@example.com"
print(detect_pii_in_text(text))
# Output: [
#   {"text": "John Doe", "label": "PERSON", "start": 11, "end": 19},
#   {"text": "john@example.com", "label": "EMAIL", "start": 35, "end": 51}
# ]

优点:可以识别非结构化文本中的 PII,准确率高。
缺点:计算成本高,延迟较大;需要定期更新模型以适应新的 PII 模式。

方法三:自定义规则引擎

结合正则表达式、关键词列表、上下文分析等多种方法。

示例

interface PIIRule {
  id: string;
  name: string;
  type: "regex" | "keyword" | "context" | "ml";
  pattern?: RegExp;
  keywords?: string[];
  context_window?: number; // 上下文窗口大小
  confidence_threshold?: number; // ML 模型的置信度阈值
  action: "block" | "mask" | "alert" | "log";
}

class PIIDetector {
  private rules: PIIRule[];
  
  constructor(rules: PIIRule[]) {
    this.rules = rules;
  }
  
  detect(text: string): PIIDetection[] {
    const detections: PIIDetection[] = [];
    
    for (const rule of this.rules) {
      switch (rule.type) {
        case "regex":
          if (rule.pattern && rule.pattern.test(text)) {
            detections.push({
              rule_id: rule.id,
              rule_name: rule.name,
              matched_text: text.match(rule.pattern)?.[0] || "",
              confidence: 1.0,
              action: rule.action,
            });
          }
          break;
        
        case "keyword":
          if (rule.keywords) {
            for (const keyword of rule.keywords) {
              if (text.toLowerCase().includes(keyword.toLowerCase())) {
                detections.push({
                  rule_id: rule.id,
                  rule_name: rule.name,
                  matched_text: keyword,
                  confidence: 0.8,
                  action: rule.action,
                });
              }
            }
          }
          break;
        
        case "ml":
          // 调用 ML 模型
          const mlResult = this.callMLModel(text, rule.confidence_threshold || 0.9);
          if (mlResult.detected) {
            detections.push({
              rule_id: rule.id,
              rule_name: rule.name,
              matched_text: mlResult.matched_text,
              confidence: mlResult.confidence,
              action: rule.action,
            });
          }
          break;
      }
    }
    
    return detections;
  }
  
  private callMLModel(text: string, threshold: number): MLResult {
    // 调用外部 ML 服务
    // ...
    return { detected: false, matched_text: "", confidence: 0 };
  }
}

// 配置规则
const rules: PIIRule[] = [
  {
    id: "email_detection",
    name: "Email Address Detection",
    type: "regex",
    pattern: /^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$/,
    action: "mask",
  },
  {
    id: "ssn_detection",
    name: "Social Security Number Detection",
    type: "regex",
    pattern: /^\d{3}-\d{2}-\d{4}$/,
    action: "block",
  },
  {
    id: "credit_card_detection",
    name: "Credit Card Number Detection",
    type: "regex",
    pattern: /^\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}$/,
    action: "block",
  },
  {
    id: "name_detection",
    name: "Person Name Detection (ML)",
    type: "ml",
    confidence_threshold: 0.9,
    action: "alert",
  },
];

// 使用
const detector = new PIIDetector(rules);
const detections = detector.detect("My email is john@example.com and SSN is 123-45-6789");
console.log(detections);

数据分类标签

为每个数据字段添加分类标签,便于后续的访问控制和脱敏策略。

enum DataClassification {
  PUBLIC = "public",           // 公开数据,无限制
  INTERNAL = "internal",       // 内部数据,仅限员工访问
  CONFIDENTIAL = "confidential", // 机密数据,需授权访问
  PII = "pii",                 // 个人身份信息,严格管控
  PHI = "phi",                 // 患者健康信息(HIPAA)
  PCI = "pci",                 // 支付卡信息(PCI DSS)
}

interface DataField {
  field_name: string;
  data_type: string;
  classification: DataClassification;
  pii_types?: string[]; // 如果是 PII,具体类型(如 "email", "phone")
  retention_days?: number; // 保留期限
  encryption_required: boolean; // 是否需要加密
  masking_rule?: string; // 脱敏规则(如 "show_first_3_chars")
}

// 示例:用户表的字段分类
const userTableFields: DataField[] = [
  {
    field_name: "user_id",
    data_type: "uuid",
    classification: DataClassification.INTERNAL,
    encryption_required: false,
  },
  {
    field_name: "email",
    data_type: "string",
    classification: DataClassification.PII,
    pii_types: ["email"],
    encryption_required: true,
    masking_rule: "show_first_3_chars", // 如 "joh***@example.com"
  },
  {
    field_name: "phone",
    data_type: "string",
    classification: DataClassification.PII,
    pii_types: ["phone"],
    encryption_required: true,
    masking_rule: "show_last_4_digits", // 如 "***-***-1234"
  },
  {
    field_name: "date_of_birth",
    data_type: "date",
    classification: DataClassification.PII,
    pii_types: ["date_of_birth"],
    encryption_required: true,
    masking_rule: "show_only_year", // 如 "1990"
  },
  {
    field_name: "profile_picture_url",
    data_type: "string",
    classification: DataClassification.PUBLIC,
    encryption_required: false,
  },
];

访问控制策略

RBAC(基于角色的访问控制)

根据用户角色授予不同的数据访问权限。

enum Role {
  ADMIN = "admin",
  SUPPORT_AGENT = "support_agent",
 ANALYST = "analyst",
  CUSTOMER = "customer",
}

interface AccessControlRule {
  role: Role;
  resource_type: string;
  classification: DataClassification;
  allowed_actions: Array<"read" | "write" | "delete">;
  conditions?: Record<string, any>; // 额外条件(如只能访问自己的数据)
}

const accessControlRules: AccessControlRule[] = [
  {
    role: Role.ADMIN,
    resource_type: "user_profile",
    classification: DataClassification.PII,
    allowed_actions: ["read", "write", "delete"],
  },
  {
    role: Role.SUPPORT_AGENT,
    resource_type: "user_profile",
    classification: DataClassification.PII,
    allowed_actions: ["read"],
    conditions: {
      mask_fields: ["ssn", "credit_card"], // 脱敏某些字段
      tenant_scope: true, // 只能访问同一租户的用户
    },
  },
  {
    role: Role.ANALYST,
    resource_type: "user_profile",
    classification: DataClassification.PII,
    allowed_actions: ["read"],
    conditions: {
      anonymize: true, // 必须匿名化
      aggregate_only: true, // 只能访问聚合数据
    },
  },
  {
    role: Role.CUSTOMER,
    resource_type: "user_profile",
    classification: DataClassification.PII,
    allowed_actions: ["read", "write"],
    conditions: {
      self_only: true, // 只能访问自己的数据
    },
  },
];

function checkAccess(
  userRole: Role,
  resourceType: string,
  classification: DataClassification,
  action: "read" | "write" | "delete",
  context?: Record<string, any>
): { allowed: boolean; reason?: string } {
  const rule = accessControlRules.find(
    r =>
      r.role === userRole &&
      r.resource_type === resourceType &&
      r.classification === classification
  );
  
  if (!rule) {
    return { allowed: false, reason: "No matching access control rule" };
  }
  
  if (!rule.allowed_actions.includes(action)) {
    return { allowed: false, reason: `Action '${action}' not allowed for role '${userRole}'` };
  }
  
  // 检查额外条件
  if (rule.conditions) {
    if (rule.conditions.tenant_scope && context?.tenant_id !== context?.user_tenant_id) {
      return { allowed: false, reason: "Cross-tenant access denied" };
    }
    
    if (rule.conditions.self_only && context?.user_id !== context?.resource_owner_id) {
      return { allowed: false, reason: "Self-only access required" };
    }
  }
  
  return { allowed: true };
}

// 使用
const result = checkAccess(
  Role.SUPPORT_AGENT,
  "user_profile",
  DataClassification.PII,
  "read",
  {
    tenant_id: "tenant_123",
    user_tenant_id: "tenant_456", // 不同租户
  }
);

console.log(result);
// Output: { allowed: false, reason: "Cross-tenant access denied" }

ABAC(基于属性的访问控制)

更细粒度的权限控制,基于多个属性动态决策。

interface ABACPolicy {
  effect: "allow" | "deny";
  conditions: ABACCondition[];
}

interface ABACCondition {
  attribute: string;
  operator: "equals" | "not_equals" | "in" | "not_in" | "greater_than" | "less_than" | "contains";
  value: any;
}

function evaluateABACPolicy(
  policy: ABACPolicy,
  attributes: Record<string, any>
): boolean {
  let result = policy.effect === "allow";
  
  for (const condition of policy.conditions) {
    const actualValue = attributes[condition.attribute];
    
    let conditionMet = false;
    switch (condition.operator) {
      case "equals":
        conditionMet = actualValue === condition.value;
        break;
      case "not_equals":
        conditionMet = actualValue !== condition.value;
        break;
      case "in":
        conditionMet = condition.value.includes(actualValue);
        break;
      case "not_in":
        conditionMet = !condition.value.includes(actualValue);
        break;
      case "greater_than":
        conditionMet = actualValue > condition.value;
        break;
      case "less_than":
        conditionMet = actualValue < condition.value;
        break;
      case "contains":
        conditionMet = actualValue?.includes(condition.value);
        break;
    }
    
    if (policy.effect === "allow") {
      result = result && conditionMet;
    } else {
      result = result && !conditionMet;
    }
  }
  
  return result;
}

// 示例策略:只允许在工作时间、从办公网络、由 support agent 角色访问 PII 数据
const policy: ABACPolicy = {
  effect: "allow",
  conditions: [
    { attribute: "role", operator: "equals", value: "support_agent" },
    { attribute: "time_of_day", operator: "greater_than", value: "09:00" },
    { attribute: "time_of_day", operator: "less_than", value: "18:00" },
    { attribute: "source_ip", operator: "in", value: ["10.0.0.0/8", "172.16.0.0/12"] },
    { attribute: "data_classification", operator: "equals", value: "pii" },
  ],
};

// 评估
const attributes = {
  role: "support_agent",
  time_of_day: "14:30",
  source_ip: "10.0.1.42",
  data_classification: "pii",
};

const allowed = evaluateABACPolicy(policy, attributes);
console.log(allowed); // true

优点:非常灵活,可以基于任意属性组合制定策略。
缺点:策略复杂度高,难以维护和调试。

运行时拦截机制

Middleware(中间件)

在数据访问层部署中间件,自动检查和拦截违规访问。

import { NextFunction, Request, Response } from "express";

async function privacyBoundaryMiddleware(
  req: Request,
  res: Response,
  next: NextFunction
) {
  const userRole = req.user?.role;
  const requestedResource = req.params.resource;
  const action = req.method === "GET" ? "read" : req.method === "POST" ? "write" : "delete";
  
  // 获取资源的数据分类
  const resourceMetadata = await getResourceMetadata(requestedResource);
  const classification = resourceMetadata?.classification || DataClassification.PUBLIC;
  
  // 检查访问权限
  const accessCheck = checkAccess(userRole, requestedResource, classification, action, {
    user_id: req.user?.id,
    tenant_id: req.user?.tenant_id,
    resource_owner_id: resourceMetadata?.owner_id,
  });
  
  if (!accessCheck.allowed) {
    // 记录审计日志
    await logPrivacyViolation({
      timestamp: new Date().toISOString(),
      user_id: req.user?.id,
      role: userRole,
      resource: requestedResource,
      action,
      classification,
      reason: accessCheck.reason,
      source_ip: req.ip,
    });
    
    // 返回 403 Forbidden
    return res.status(403).json({
      error: "Forbidden",
      message: accessCheck.reason,
    });
  }
  
  // 如果需要脱敏,应用脱敏规则
  if (resourceMetadata?.masking_rule && action === "read") {
    req.shouldMask = true;
    req.maskingRule = resourceMetadata.masking_rule;
  }
  
  next();
}

// 使用
app.use("/api/data/*", privacyBoundaryMiddleware);

Proxy(代理)

在数据库或 API 前面部署 Proxy,自动拦截和修改请求。

示例架构

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│  AI Agents  │────▶│ Privacy      │────▶│  Database / │
│             │     │ Proxy        │     │  API        │
└─────────────┘     │ (Enforces    │     └─────────────┘
                    │  Boundaries) │
                    └──────────────┘
                           │
                           ▼
                    ┌──────────────┐
                    │  Audit Log   │
                    └──────────────┘

实现方式

  • Database Proxy:使用 ProxySQL、PgBouncer 等,自动添加 WHERE tenant_id = ? 条件。
  • API Gateway:使用 Kong、Apigee 等,自动检查和脱敏响应数据。
  • Service Mesh:使用 Istio、Linkerd 等,在微服务间强制实施隐私策略。

Data Masking(数据脱敏)

在返回数据前自动脱敏敏感字段。

function applyDataMasking(data: any, maskingRule: string): any {
  switch (maskingRule) {
    case "show_first_3_chars":
      // 如 "john@example.com" → "joh***@example.com"
      if (typeof data === "string" && data.includes("@")) {
        const [localPart, domain] = data.split("@");
        return `${localPart.slice(0, 3)}***@${domain}`;
      }
      return data;
    
    case "show_last_4_digits":
      // 如 "123-456-7890" → "***-***-7890"
      if (typeof data === "string" && data.length >= 4) {
        return "***-***-" + data.slice(-4);
      }
      return data;
    
    case "show_only_year":
      // 如 "1990-05-15" → "1990"
      if (typeof data === "string" && data.match(/^\d{4}-\d{2}-\d{2}$/)) {
        return data.slice(0, 4);
      }
      return data;
    
    case "full_mask":
      // 完全隐藏
      return "***";
    
    default:
      return data;
  }
}

// 使用
const email = "john.doe@example.com";
console.log(applyDataMasking(email, "show_first_3_chars"));
// Output: "joh***@example.com"

const phone = "123-456-7890";
console.log(applyDataMasking(phone, "show_last_4_digits"));
// Output: "***-***-7890"

租户隔离设计

Logical Isolation(逻辑隔离)

所有租户的数据存储在同一个数据库中,通过 tenant_id 字段区分。

优点

  • 成本低:只需维护一个数据库实例。
  • 易于扩展:添加新租户无需 provisioning 新数据库。
  • 易于备份:只需备份一个数据库。

缺点

  • 安全风险高:如果代码中漏掉 WHERE tenant_id = ?,会导致跨租户数据泄露。
  • 性能问题:大租户可能影响小租户的性能。
  • 合规困难:某些法规要求物理隔离(如金融、医疗行业)。

实现方式

-- 所有表都包含 tenant_id 字段
CREATE TABLE users (
  id UUID PRIMARY KEY,
  tenant_id UUID NOT NULL,
  email VARCHAR(255) NOT NULL,
  -- 其他字段...
  FOREIGN KEY (tenant_id) REFERENCES tenants(id)
);

-- 查询时必须包含 tenant_id 过滤
SELECT * FROM users WHERE tenant_id = 'tenant_123' AND id = 'user_456';

最佳实践

  • Row-Level Security (RLS):在数据库层面强制实施租户隔离,即使应用代码漏掉 WHERE 条件也不会泄露数据。
-- PostgreSQL RLS 示例
ALTER TABLE users ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation_policy ON users
  USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- 设置当前租户 ID
SET app.current_tenant_id = 'tenant_123';

-- 现在所有查询都会自动应用租户隔离
SELECT * FROM users WHERE id = 'user_456';
-- 实际执行:SELECT * FROM users WHERE id = 'user_456' AND tenant_id = 'tenant_123'

Physical Isolation(物理隔离)

每个租户有独立的数据库实例或 schema。

优点

  • 安全性高:物理上隔离,不可能跨租户访问。
  • 性能隔离:大租户不会影响小租户。
  • 合规友好:满足严格的合规要求(如金融、医疗)。

缺点

  • 成本高:需要维护多个数据库实例。
  • 运维复杂:备份、升级、监控都需要针对每个租户单独进行。
  • 扩展困难:添加新租户需要 provisioning 新数据库。

实现方式

# 每个租户一个独立的数据库
tenants:
  tenant_123:
    database: "prod_tenant_123"
    host: "db-tenant-123.example.com"
  
  tenant_456:
    database: "prod_tenant_456"
    host: "db-tenant-456.example.com"

适用场景

  • 大型企业客户,愿意为独立数据库支付额外费用。
  • 合规要求严格的行业(金融、医疗、政府)。
  • 对性能和安全性要求极高的场景。

Hybrid(混合隔离)

结合逻辑隔离和物理隔离的优点。

实现方式

  • 小租户:共享数据库(逻辑隔离),降低成本。
  • 大租户/高价值租户:独立数据库(物理隔离),提高安全性和性能。
  • 敏感数据:无论租户大小,PII/PHI/PCI 数据始终物理隔离存储。
async function getTenantDatabase(tenantId: string): Promise<DatabaseConnection> {
  const tenant = await getTenantMetadata(tenantId);
  
  if (tenant.isolation_mode === "physical") {
    // 物理隔离:返回独立的数据库连接
    return getPhysicalDatabaseConnection(tenant.database_host);
  } else {
    // 逻辑隔离:返回共享数据库连接,但设置 tenant_id
    const conn = getSharedDatabaseConnection();
    await conn.execute(`SET app.current_tenant_id = '${tenantId}'`);
    return conn;
  }
}

监控与告警

违规访问检测

实时监控并检测异常的访问模式。

interface PrivacyViolation {
  timestamp: string;
  user_id: string;
  role: string;
  resource: string;
  action: string;
  classification: DataClassification;
  reason: string;
  source_ip: string;
  severity: "low" | "medium" | "high" | "critical";
}

async function detectPrivacyViolations(): Promise<void> {
  const violations = await queryRecentViolations();
  
  for (const violation of violations) {
    // 判断严重程度
    let severity: "low" | "medium" | "high" | "critical" = "low";
    
    if (violation.classification === DataClassification.PII || 
        violation.classification === DataClassification.PHI) {
      severity = "high";
    }
    
    if (violation.role === "admin" && violation.action === "delete") {
      severity = "critical";
    }
    
    // 发送告警
    await sendAlert({
      severity,
      title: `Privacy Boundary Violation Detected`,
      message: `User ${violation.user_id} (${violation.role}) attempted to ${violation.action} ${violation.resource} (${violation.classification})`,
      channel: "#security-alerts",
      pagerduty: severity === "critical",
    });
  }
}

// 每分钟检查一次
setInterval(detectPrivacyViolations, 60 * 1000);

异常行为分析

使用机器学习检测异常的访问模式。

示例

  • 突然增加的 PII 访问:某个用户平时每天访问 10 次 PII 数据,今天突然访问了 1,000 次。
  • 非工作时间访问:某个用户通常在 9:00-18:00 访问数据,今天在凌晨 3:00 访问了大量敏感数据。
  • 跨租户访问尝试:某个 support agent 尝试访问多个不同租户的数据。
  • 批量导出:某个分析师导出了超过正常阈值的聚合数据。

实现方式

import { AnomalyDetector } from "@company/anomaly-detection-sdk";

const detector = new AnomalyDetector({
  model: "isolation_forest",
  training_period_days: 30,
  sensitivity: 0.95,
});

async function analyzeAccessPattern(userId: string): Promise<AnomalyScore> {
  // 获取过去 24 小时的访问记录
  const accessLogs = await getAccessLogs(userId, "24h");
  
  // 提取特征
  const features = {
    total_accesses: accessLogs.length,
    pii_accesses: accessLogs.filter(log => log.classification === DataClassification.PII).length,
    unique_resources: new Set(accessLogs.map(log => log.resource)).size,
    access_hours: accessLogs.map(log => new Date(log.timestamp).getHours()),
    cross_tenant_attempts: countCrossTenantAttempts(accessLogs),
  };
  
  // 检测异常
  const score = await detector.score(features);
  
  if (score.anomaly_score > 0.9) {
    // 高度异常,立即告警
    await sendAlert({
      severity: "critical",
      title: "Anomalous Access Pattern Detected",
      message: `User ${userId} shows highly anomalous access pattern (score: ${score.anomaly_score})`,
      channel: "#security-alerts",
      pagerduty: true,
    });
  }
  
  return score;
}

FAQ

Q1: 如何自动识别 PII 数据?

A: 结合多种方法:

  • 正则表达式:识别已知格式的 PII(如邮箱、电话、社保号)。
  • ML 分类器:识别非结构化文本中的 PII(如姓名、地址)。
  • 元数据分析:根据字段名、注释、schema 推断是否为 PII(如字段名为 "email"、"ssn")。
  • 人工标注:数据所有者手动标记 PII 字段,作为训练数据或验证集。

Q2: Privacy Boundary 和 Access Control 有什么区别?

A:

  • Access Control:决定谁可以访问什么资源(如 "support_agent 可以读取用户资料")。
  • Privacy Boundary:决定如何保护敏感数据,即使在授权访问的情况下(如 "support_agent 可以看到用户资料,但 SSN 字段必须脱敏")。

简单来说,Access Control 是"能不能访问",Privacy Boundary 是"能看到什么"。

Q3: 如何实现细粒度的数据脱敏?

A:

  • 字段级脱敏:根据数据分类和访问者角色,对不同字段应用不同的脱敏规则。
  • 动态脱敏:在查询时实时脱敏,而不是预先脱敏存储。
  • 上下文感知:根据访问上下文(如时间、地点、目的)动态调整脱敏策略。
  • 可逆脱敏:对于需要后续处理的场景,使用可逆的加密或 tokenization,而不是不可逆的掩码。

Q4: 租户隔离有哪些方案?

A:

  • 逻辑隔离:所有租户共享数据库,通过 tenant_id 区分。成本低,但安全风险高。
  • 物理隔离:每个租户独立数据库。安全性高,但成本高。
  • 混合隔离:小租户逻辑隔离,大租户物理隔离。平衡成本和安全性。

选择时考虑:

  • 合规要求:某些行业要求物理隔离。
  • 客户规模:大客户通常要求物理隔离。
  • 成本预算:物理隔离成本高。
  • 运维能力:物理隔离运维复杂度高。

Q5: 如何处理跨租户的数据共享需求?

A:

  • 显式授权:租户 A 明确授权租户 B 访问某些数据。
  • 数据池:将需要共享的数据复制到独立的"共享数据池",租户从中读取。
  • 联邦查询:使用联邦查询技术,在不复制数据的情况下实现跨租户查询。
  • 审计追踪:所有跨租户访问都必须记录审计日志,便于追溯。

注意:跨租户数据共享需要谨慎设计,确保不会无意中泄露其他租户的数据。

Q6: GDPR 要求的"被遗忘权"如何实现?

A:

  1. 识别所有 PII:找到该用户的所有 PII 数据(主数据库、备份、日志、缓存等)。
  2. 删除或匿名化:删除 PII 数据,或用匿名标识符替换。
  3. 通知下游系统:告知所有使用了该用户数据的下游系统进行删除。
  4. 验证删除:再次扫描系统,确认所有 PII 已被删除。
  5. 记录证据:将删除过程和验证结果记录到审计日志中。

挑战

  • 备份数据:备份中的 PII 可能需要等到备份过期后自然删除,或在备份中标记为"待删除"。
  • 第三方系统:如果数据已分享给第三方,需要通知第三方删除,但无法强制。
  • 法律例外:某些数据可能因法律要求必须保留(如财务记录、审计日志),需要在隐私政策中说明。

Q7: 如何验证 Privacy Boundary 有效?

A:

  • 渗透测试:定期进行渗透测试,尝试绕过隐私边界。
  • 自动化扫描:使用工具扫描代码和配置,发现潜在的隐私漏洞。
  • 红队演练:组织红队模拟攻击,测试隐私边界的鲁棒性。
  • 合规审计:聘请第三方机构进行合规审计,验证隐私保护措施的有效性。
  • 监控告警:实时监控隐私违规行为,及时发现和修复问题。

Q8: 隐私策略变更后如何快速生效?

A:

  • 集中管理:将隐私策略集中存储在策略引擎(如 OPA、AuthZ)中,而不是硬编码在应用中。
  • 热重载:支持策略的热重载,无需重启服务即可生效。
  • 版本控制:对策略进行版本控制,便于回溯和审计。
  • 灰度发布:先在少量租户或用户上测试新策略,验证无误后再全量发布。
  • 回滚机制:如果新策略导致问题,可以快速回滚到旧版本。

延伸阅读

Checklist

在实施 Privacy Boundary Enforcement 之前,请确认以下事项:

  • 已完成 PII 数据盘点,列出所有包含 PII 的数据源
  • 已定义数据分类标准(Public、Internal、Confidential、PII、PHI、PCI)
  • 已实现 PII 自动识别机制(正则表达式、ML 分类器、规则引擎)
  • 已制定访问控制策略(RBAC/ABAC),遵循最小权限原则
  • 已实现运行时拦截机制(Middleware、Proxy、Data Masking)
  • 已设计租户隔离方案(Logical/Physical/Hybrid)
  • 已配置监控和告警,检测隐私违规行为
  • 已实现异常行为分析,发现潜在的隐私风险
  • 已制定 GDPR"被遗忘权"的实现流程
  • 已安排定期的隐私边界有效性验证(渗透测试、合规审计)

下一步行动:阅读 AI agent Consent Management,了解用户同意撤回后,如何处理历史会话、缓存和索引中的数据。