Google Cloud Functions

你的团队在 Google Cloud 生态中深耕多年:数据存储用 BigQuery,机器学习用 Vertex AI,消息队列用 Pub/Sub。当需要 Serverless 计算时,Google Cloud Functions 几乎是必然的选择。

「Cloud Functions 不是 GCP 唯一的 Serverless 选择,但可能是最简单的一个。」 理解它的定位和边界,才能做出正确的架构决策。

Cloud Functions 的演进

Google Cloud Functions 有两代版本:

特性第一代 (1st gen)第二代 (2nd gen)
底层架构Cloud Functions 专用Cloud Run(容器)
冷启动~100ms-1s~100ms-2s
最大超时9 分钟(60-9 分钟可配置)60 分钟
并发请求1 请求/实例多请求/实例
VPC 支持有限完整
最小实例不支持支持
HTTP 压缩不支持支持

触发器类型

HTTP 触发器

index.js
// Node.js HTTP 函数
const functions = require('@google-cloud/functions-framework');

functions.http('helloHttp', (req, res) => {
  const name = req.query.name || req.body.name || 'World';

  res
    .status(200)
    .set('Content-Type', 'application/json')
    .send(JSON.stringify({
      message: `Hello, ${name}!`,
      timestamp: new Date().toISOString()
    }));
});
main.py
# Python HTTP 函数
import functions_framework

@functions_framework.http
def hello_http(request):
    request_json = request.get_json(silent=True)
    name = request_json.get('name') if request_json else 'World'

    return {
        'message': f'Hello, {name}!',
        'timestamp': datetime.now(timezone.utc).isoformat()
    }

Cloud Storage 触发器

storage-trigger.js
const functions = require('@google-cloud/functions-framework');
const { Storage } = require('@google-cloud/storage');

const storage = new Storage();

functions.cloudEvent('processFile', async (cloudEvent) => {
  const file = cloudEvent.data;
  const bucketName = file.bucket;
  const fileName = file.name;

  console.log(`Processing file: ${fileName} in bucket: ${bucketName}`);

  // 获取文件内容
  const bucket = storage.bucket(bucketName);
  const fileRef = bucket.file(fileName);

  const [exists] = await fileRef.exists();
  if (!exists) {
    console.log('File does not exist');
    return;
  }

  // 根据文件类型处理
  if (fileName.endsWith('.csv')) {
    await processCSV(fileRef);
  } else if (fileName.endsWith('.json')) {
    await processJSON(fileRef);
  }

  console.log(`Completed processing: ${fileName}`);
});

async function processCSV(fileRef) {
  const [contents] = await fileRef.download();
  const rows = contents.toString().split('\n');

  console.log(`CSV has ${rows.length} rows`);

  // 处理完成后移动到 processed 文件夹
  const destFileName = fileRef.name.replace('uploads/', 'processed/');
  await fileRef.move(destFileName);
}

Pub/Sub 触发器

pubsub-trigger.js
functions.cloudEvent('processMessage', async (cloudEvent) => {
  const message = cloudEvent.data;
  const data = Buffer.from(message.data, 'base64').toString();

  console.log('Received message:', data);

  try {
    const payload = JSON.parse(data);

    switch (payload.type) {
      case 'user.created':
        await handleUserCreated(payload);
        break;
      case 'order.completed':
        await handleOrderCompleted(payload);
        break;
      default:
        console.log(`Unknown message type: ${payload.type}`);
    }
  } catch (error) {
    console.error('Error processing message:', error);
    throw error; // 导致消息重试
  }
});

async function handleUserCreated(payload) {
  console.log(`Creating user: ${payload.userId}`);
  // 用户创建逻辑
}

async function handleOrderCompleted(payload) {
  console.log(`Processing order: ${payload.orderId}`);
  // 订单处理逻辑
}

Firebase 触发器

firebase-trigger.js)
const functions = require('firebase-functions');
const admin = require('firebase-admin');

admin.initializeApp();

exports.onUserCreate = functions.firestore
  .document('users/{userId}')
  .onCreate(async (snap, context) => {
    const user = snap.data();
    const userId = context.params.userId;

    console.log(`New user created: ${userId}`);

    // 创建用户资料
    await admin.firestore().collection('profiles').doc(userId).set({
      createdAt: admin.firestore.FieldValue.serverTimestamp(),
      displayName: user.displayName || 'Anonymous',
      email: user.email || '',
    });

    // 发送欢迎邮件
    if (user.email) {
      await sendWelcomeEmail(user.email, user.displayName);
    }

    // 更新统计
    await incrementUserCount();
  });

exports.onOrderUpdate = functions.firestore
  .document('orders/{orderId}')
  .onUpdate(async (change, context) => {
    const before = change.before.data();
    const after = change.after.data();
    const orderId = context.params.orderId;

    // 检测状态变更
    if (before.status !== 'completed' && after.status === 'completed') {
      console.log(`Order ${orderId} completed`);

      // 更新库存
      await updateInventory(after.items);

      // 触发后续流程
      await triggerPostOrderWorkflow(orderId);
    }
  });

第二代函数的 Cloud Run 特性

并发处理

concurrent-handler.js)
// 2nd gen 支持并发处理
functions.http('concurrentHandler', (req, res) => {
  // 每个实例可以同时处理多个请求
  // 默认并发数基于内存:1 实例 / 256MB

  const processing = req.body.map(item => processItem(item));

  Promise.all(processing)
    .then(results => {
      res.status(200).json({ results });
    })
    .catch(error => {
      res.status(500).json({ error: error.message });
    });
});

async function processItem(item) {
  // 模拟处理
  await new Promise(resolve => setTimeout(resolve, 100));
  return { ...item, processed: true };
}

最小实例

main.py
import functions_framework
from flask import Flask

# 创建 Flask 应用
app = Flask(__name__)

@app.route('/')
def hello():
    return 'Hello, World!'

# 注册函数
functions_framework(app=app, target='app')
gcloud-config.yaml
# deployment.yaml
runtime: python311
entrypoint: gunicorn
instance_size: SMALL
min_instances: 2  # 始终保持 2 个热实例
max_instances: 100
timeout: 60
service_account: my-function@my-project.iam.gserviceaccount.com

vpc_connector: my-vpc-connector
vpc_egress_settings: private-ranges-only

environment_variables:
  ENV: production

环境配置

环境变量

env-vars.yaml
# 部署时设置环境变量
gcloud functions deploy my-function \
  --set-env-vars "ENV=production,REGION=us-central1" \
  --runtime python311
access-env.js)
// 访问环境变量
const dbHost = process.env['DB_HOST'];
const dbPassword = process.env['DB_PASSWORD']; // 建议使用 Secret Manager

// 访问 Secret Manager
const { SecretManagerServiceClient } = require('@google-cloud/secret-manager');

async function accessSecret(secretName) {
  const client = new SecretManagerServiceClient();
  const [version] = await client.accessSecretVersion({ name: secretName });
  return version.payload.data.toString();
}

Secret Manager 集成

secret-config.yaml
# 使用 Secret Manager 存储敏感信息
gcloud functions deploy my-function \
  --set-secrets "DB_PASSWORD=db-password:latest,API_KEY=api-key:latest" \
  --runtime nodejs18
use-secret.js)
const { SecretManagerServiceClient } = require('@google-cloud/secret-manager');

const client = new SecretManagerServiceClient();

async function getDbPassword() {
  const [accessResponse] = await client.accessSecretVersion({
    name: 'projects/my-project/secrets/db-password/versions/latest'
  });

  return accessResponse.payload.data.toString();
}

监控与日志

Cloud Logging 集成

structured-logging.js)
const functions = require('@google-cloud/functions-framework');
const { Logging } = require('@google-cloud/logging');

const logging = new Logging();

functions.http('loggedFunction', async (req, res) => {
  const logger = logging.log('my-function');

  // 结构化日志
  logger.info({
    message: 'Processing request',
    requestId: req.headers['x-request-id'],
    userId: req.body.userId,
    timestamp: new Date().toISOString()
  });

  try {
    const result = await processBusinessLogic(req.body);

    logger.info({
      message: 'Request processed successfully',
      result: result
    });

    res.status(200).json(result);
  } catch (error) {
    logger.error({
      message: 'Request failed',
      error: error.message,
      stack: error.stack
    });

    res.status(500).json({ error: 'Internal error' });
  }
});

Cloud Monitoring 指标

custom-metrics.js)
const { MetricServiceClient } = require('@google-cloud/monitoring');

// 创建自定义指标
const metricClient = new MetricServiceClient();

async function recordLatency(durationMs) {
  const metric = metricClient.metricPath(myProjectId, 'custom.googleapis.com', 'function_latency');

  const dataPoint = {
    interval: {
      endTime: {
        seconds: Date.now() / 1000,
      },
    },
    value: {
      doubleValue: durationMs,
    },
  };

  const timeSeries = {
    metric: {
      type: metric,
    },
    resource: {
      type: 'cloud_function',
      labels: {
        function_name: 'my-function',
        region: 'us-central1',
      },
    },
    points: [dataPoint],
  };

  await metricClient.createTimeSeries({ name: metric, timeSeries: [timeSeries] });
}

与 Cloud Run 的选择

Cloud Functions 适合:

  • 简单的事件驱动函数:上传文件、接收消息
  • 快速原型:不需要构建容器
  • 单函数独立部署:每个函数独立管理

Cloud Run 适合:

  • 复杂应用:需要多个端点、中间件
  • 已有容器镜像:复用现有 Docker 镜像
  • 长期运行任务:需要更长超时
  • 更细粒度控制:自定义容器配置
flowchart TB
    subgraph "简单场景"
        CF[Cloud Functions\n无容器\n快速部署]
    end

    subgraph "复杂场景"
        CR[Cloud Run\n容器化\n完整 Web 应用]
    end

    subgraph "事件驱动"
        ST[Storage 触发器]
        PS[Pub/Sub 触发器]
        FT[Firestore 触发器]
    end

    subgraph "HTTP 服务"
        API[REST API]
        MID[中间件链]
        AUTH[认证/授权]
    end

    ST & PS & FT --> CF
    API & MID & AUTH --> CR

性能优化

连接池复用

db-pool.js)
// 模块级数据库连接池
const { Pool } = require('pg');

const pool = new Pool({
  host: process.env['DB_HOST'],
  database: process.env['DB_NAME'],
  user: process.env['DB_USER'],
  password: process.env['DB_PASSWORD'],
  max: 10, // 最大连接数
  idleTimeoutMillis: 30000,
});

exports.handler = async (req, res) => {
  const client = await pool.connect();

  try {
    const result = await client.query('SELECT * FROM users WHERE id = $1', [req.body.userId]);
    res.json(result.rows);
  } finally {
    client.release(); // 释放回连接池
  }
};

依赖预加载

ml-model.js)
// 模块级模型加载(只在冷启动时执行)
let model = null;
let modelLoading = null;

async function loadModel() {
  if (model) return model;
  if (modelLoading) return modelLoading;

  modelLoading = loadMyMLModel(); // 返回 Promise
  model = await modelLoading;
  return model;
}

exports.predict = async (req, res) => {
  const mlModel = await loadModel();
  const prediction = mlModel.predict(req.body.features);

  res.json({ prediction });
};

最佳实践

1. 幂等性

idempotent.js)
// 使用幂等键处理重复消息
exports.processOrder = async (event) => {
  const messageId = event.message.messageId;
  const data = JSON.parse(Buffer.from(event.data).toString());

  // 检查是否已处理
  const cache = await getCache();
  const cached = await cache.get(`processed:${messageId}`);

  if (cached) {
    console.log(`Message ${messageId} already processed`);
    return;
  }

  // 业务逻辑
  await processOrderLogic(data);

  // 标记为已处理
  await cache.setex(`processed:${messageId}`, 86400, '1');
};

2. 错误处理与重试

retry-config.yaml
# 配置重试策略
gcloud functions deploy my-function \
  --retry \
  --runtime nodejs18
retry-handler.js)
// 设置最大重试次数
const MAX_RETRIES = 3;

exports.processWithRetry = async (event) => {
  const retryCount = event.publishTime ? 0 : (event.attributes?.['retry-count'] || 0);

  try {
    await processBusinessLogic(event.data);
  } catch (error) {
    if (retryCount < MAX_RETRIES) {
      console.log(`Retrying (${retryCount + 1}/${MAX_RETRIES})`);
      throw error; // 导致重试
    }
    // 超过重试次数,发送到死信队列
    await sendToDeadLetterQueue(event);
  }
};

与 AWS Lambda 对比

维度AWS LambdaGCP Cloud Functions
触发器事件源集成Cloud Events SDK
HTTP 框架API Gateway(额外费用)内置 HTTP
执行时间最大 15 分钟最大 60 分钟(2nd gen)
并发按需扩展最大 1000/实例
最小实例支持(2nd gen)
计费请求数 + 执行时间请求数 + 执行时间 + 入口流量
多云支持AWS 专用GCP 专用

延伸思考

Cloud Functions 的简洁性是它的优势,也是它的局限。当你的函数变得复杂、需要更多控制时,可能需要迁移到 Cloud Run。

Google Cloud 的 Serverless 策略越来越清晰:Cloud Functions 用于简单的事件驱动,Cloud Run 用于复杂的容器化工作负载,Anthos 用于混合/多云场景。

选择哪个,取决于你的具体需求:

  • 需要快速响应 HTTP 请求?→ Cloud Functions
  • 需要处理长时间任务?→ Cloud Run
  • 需要跨云部署?→ Anthos Service Mesh