跳至主要内容

S3 数据湖

本页指导您设置 S3 数据湖目标连接器。

此连接器是 Airbyte 官方支持在 S3 上使用 Iceberg 协议的方式。它使用受支持的 Iceberg 目录将 Iceberg 表格式写入 S3 或兼容 S3 的存储后端。

先决条件

S3 数据湖连接器需要两样东西。

  1. 一个 S3 存储桶或兼容 S3 的存储后端。

  2. 一个受支持的 Iceberg 目录。目前,该连接器支持以下目录

    • REST
    • AWS Glue
    • Nessie
    • Polaris

设置指南

请按照以下步骤设置您的 S3 存储和 Iceberg 目录权限。

S3 设置和权限

S3 设置包括创建存储桶策略和进行身份验证。

创建存储桶策略

创建存储桶策略。

  1. 打开 IAM 控制台

  2. 在 IAM 仪表板中,选择 策略 > 创建策略

  3. 选择 JSON 选项卡,并将以下 JSON 粘贴到策略编辑器中。请在突出显示的行中替换您自己的存储桶名称。

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Action": [
    "s3:ListAllMyBuckets",
    "s3:GetObject*",
    "s3:PutObject",
    "s3:PutObjectAcl",
    "s3:DeleteObject",
    "s3:ListBucket*"
    ],
    "Resource": [
    "arn:aws:s3:::YOUR_BUCKET_NAME/*",
    "arn:aws:s3:::YOUR_BUCKET_NAME"
    ]
    }
    ]
    }
    注意

    仅对象级权限不足以进行身份验证。请包含 存储桶级 权限,如前述示例所示。

  4. 单击 下一步,为您的策略指定一个描述性名称,然后单击 创建策略

身份验证

在大多数情况下,您使用 IAM 用户进行身份验证。如果您使用 Airbyte Cloud 和 Glue 目录,则可以使用 IAM 角色进行身份验证。

使用 IAM 用户进行身份验证(自托管或 Cloud,任何目录)

使用现有或新的 访问密钥 ID 和秘密访问密钥

  1. 在 IAM 仪表板中,单击 用户

  2. 如果您正在使用现有的 IAM 用户,请选择该用户,然后单击 添加权限 > 添加权限。如果您正在创建一个新用户,请单击 添加用户

  3. 单击 直接附加策略,然后选中您的策略的复选框。单击 下一步 > 添加权限

  4. 单击 安全凭证 选项卡 > 创建访问密钥。AWS 控制台会提示您选择用例并为您的访问密钥添加可选标签。

  5. 单击 创建访问密钥。请记下您的密钥。

  6. 在 Airbyte 中,将这些密钥输入到 Airbyte 连接器的 AWS 访问密钥 IDAWS 秘密访问密钥 字段中。

使用 IAM 角色进行身份验证(仅限 Cloud 和 Glue 目录)
注意

要使用 IAM 角色进行 S3 身份验证,Airbyte 团队成员必须启用它。如果您想使用此功能,请 联系销售团队

  1. 在 IAM 仪表板中,单击角色,然后创建角色

  2. 选择AWS 账户受信任实体类型。

  3. 设置角色的信任关系。这允许 Airbyte 实例的 AWS 帐户承担此角色。您还需要指定外部 ID,这是信任服务(Airbyte)和受信任角色(您正在创建的角色)都知道的秘密密钥。此 ID 可防止“困惑的代理”问题。外部 ID 应为您的 Airbyte 工作区 ID,您可以在工作区页面的 URL 中找到它。编辑信任关系策略以包含外部 ID

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Principal": {
    "AWS": "arn:aws:iam::094410056844:user/delegated_access_user"
    },
    "Action": "sts:AssumeRole",
    "Condition": {
    "StringEquals": {
    "sts:ExternalId": "{your-airbyte-workspace-id}"
    }
    }
    }
    ]
    }
  4. 完成角色创建并保存角色 ARN 以供以后使用。

  5. 选择直接附加权限,然后找到并选中您的新策略。单击下一步,然后单击添加权限

  6. 在 Airbyte 中,选择 Glue 作为目录,并将角色 ARN 输入到 角色 ARN 字段中。

Iceberg 目录设置和权限

设置过程的其余部分取决于您使用的目录。

REST

输入您的 REST 目录的 URI。您可能还需要输入默认命名空间。

AWS Glue

  1. 更新您之前创建的 S3 策略,以授予这些 Glue 权限。

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Action": [
    "s3:ListAllMyBuckets",
    "s3:GetObject*",
    "s3:PutObject",
    "s3:PutObjectAcl",
    "s3:DeleteObject",
    "s3:ListBucket*",
    "glue:TagResource",
    "glue:UnTagResource",
    "glue:BatchCreatePartition",
    "glue:BatchDeletePartition",
    "glue:BatchDeleteTable",
    "glue:BatchGetPartition",
    "glue:CreateDatabase",
    "glue:CreateTable",
    "glue:CreatePartition",
    "glue:DeletePartition",
    "glue:DeleteTable",
    "glue:GetDatabase",
    "glue:GetPartition",
    "glue:GetPartitions",
    "glue:GetTable",
    "glue:GetTables",
    "glue:UpdateDatabase",
    "glue:UpdatePartition",
    "glue:UpdateTable"
    ],
    "Resource": [
    "arn:aws:s3:::YOUR_BUCKET_NAME/*",
    "arn:aws:s3:::YOUR_BUCKET_NAME"
    ]
    }
    ]
    }
  2. 仓库位置 选项设置为 s3://<存储桶名称>/存储桶内的路径

  3. 如果您使用的是 Airbyte Cloud 并且使用 IAM 角色进行身份验证,请将 角色 ARN 选项设置为您在 设置 S3 上的身份验证 时记下的值。

  4. 如果您有一个现有的 Glue 表,并且想要用 Airbyte 管理的 Iceberg 表替换该表,请删除 Glue 表。否则,您将遇到错误 输入 Glue 表不是 iceberg 表:<您的表名>

    从控制台删除 Glue 表 可能不会立即删除它们。请等待 AWS 完成其后台处理,或使用 AWS API 删除所有表版本。

  5. 如果您正在使用 AWS Lake Formation,您必须通过 Lake Formation 授予一些权限

    1. 您必须授予 S3 路径上的 Data location_access
    2. 如果您希望连接器代表您创建数据库,您还必须授予目录上的 Create database
    3. (高级选项) 如果您想手动创建数据库,并且只将连接器写入这些特定的数据库,那么您必须授予数据库上的 Create table, Describe

Nessie

要使用 Nessie 进行身份验证,请执行以下两件事。

  1. 设置您的 Nessie 目录的 URI 和访问令牌以进行身份验证到该目录。

  2. 仓库位置 选项设置为 s3://<存储桶名称>/存储桶内的路径

Polaris

要使用 Apache Polaris 进行身份验证,请按照以下步骤操作。

  1. 设置您的 Polaris 目录并创建一个具有必要权限的主体。请参阅 Apache Polaris 文档 以获取详细的设置说明。

  2. 在 Polaris 中创建主体时,您将收到 OAuth 凭据(客户端 ID 和客户端密钥)。请安全地保存这些凭据。

  3. 向您的主体目录角色授予所需的权限。您可以

    选项 A:授予广泛的 CATALOG_MANAGE_CONTENT 权限(推荐以简化操作)

    • 此单个权限允许连接器管理目录中的表和命名空间

    选项 B:授予特定的细粒度权限:

    • TABLE_LIST - 列出命名空间中的表
    • TABLE_CREATE - 创建新表
    • TABLE_DROP - 删除表
    • TABLE_READ_PROPERTIES - 读取表元数据
    • TABLE_WRITE_PROPERTIES - 更新表元数据
    • TABLE_WRITE_DATA - 将数据写入表
    • NAMESPACE_LIST - 列出命名空间
    • NAMESPACE_CREATE - 创建新命名空间
    • NAMESPACE_READ_PROPERTIES - 读取命名空间元数据
  4. 在 Airbyte 连接器配置中,提供以下信息

    • Polaris 服务器 URI:您的 Polaris 服务器的基本 URL。例如:https://:8181/api/catalog
    • Catalog 名称:您在 Polaris 中创建的 catalog 的名称(例如:quickstart_catalog
    • Client ID:创建 principal 时提供的 OAuth Client ID
    • Client Secret:创建 principal 时提供的 OAuth Client Secret
    • 默认命名空间:当目标命名空间设置为“目标定义”或“源定义”时,用于表标识符的命名空间
  5. 仓库位置 选项设置为 s3://<存储桶名称>/存储桶内的路径

  6. 确保您的 Polaris catalog 已配置适当的存储凭据以访问您的 S3 bucket。

输出 schema

Airbyte 如何生成 Iceberg schema

在每个 stream 中,Airbyte 将顶层字段映射到 Iceberg 字段。Airbyte 将嵌套字段(对象、数组和联合)映射到字符串列,并将其作为序列化的 JSON 写入。

这是 Airbyte 类型与 Iceberg 类型之间的完整映射。

Airbyte 类型Iceberg 类型
布尔值布尔值
日期日期
整数长整型
数字双精度浮点数
字符串字符串
带时区的日期时间*日期时间
不带时区的日期时间日期时间
带时区的日期时间戳*带时区的日期时间戳
不带时区的日期时间戳不带时区的日期时间戳
对象字符串(JSON 序列化的值)
数组字符串(JSON 序列化的值)
联合字符串(JSON 序列化的值)

*Airbyte 会在写入 Iceberg 文件之前,将 带时区的日期时间带时区的日期时间戳 类型转换为协调世界时 (UTC)。

管理 schema 演化

此连接器从不重写现有的 Iceberg 数据文件。这意味着 Airbyte 只能处理特定的源 schema 更改。

  • 添加或删除列
  • 拓宽列
  • 更改主键

您有以下选项来管理 schema 演化。

  • 要自动处理不受支持的 schema 更改,请将 完全刷新 - 覆盖 用作您的 同步模式

  • 要按出现的方式处理不受支持的 schema 更改,请等待同步失败,然后采取措施恢复它。您可以

    • 手动编辑 Iceberg 中的表 schema。
    • 刷新 Airbyte 中的连接。
    • 清除 Airbyte 中的连接。

命名

与大多数 Airbyte 目标连接器一样,S3 Data Lake 连接器可能会修改标识符(stream 名称/命名空间、列名称)以与目标兼容。

特别是,当使用 AWS Glue 时,连接器将

  • 将所有 stream 表名和命名空间 转换为小写
  • 将表名/命名空间中的任何非字母数字字符更改为 下划线 以与 Athena 兼容

去重

此连接器使用合并读取策略来支持去重。

  • Airbyte 将 stream 的主键转换为 Iceberg 的 标识符列
  • “更新插入”是对该行主键的基于等值的删除,然后插入新数据。

关于主键的假设

S3 Data Lake 连接器假定以下两种情况之一为真

  • 源从不尝试在单个同步中多次发出相同的主键。
  • 如果源在单个尝试中多次发出相同的主键,它始终以从旧到新的游标顺序发出这些记录。

如果不满足这些条件,您可能会在 Iceberg 中看到不准确的数据,表现为旧记录优先于新记录。如果发生这种情况,请将追加或覆盖用作您的 同步模式

有未知数量的 API 源具有不满足这些条件的 stream。Airbyte 知道 StripeMonday 不满足这些条件,但可能还有其他。

分支和数据可用性

Iceberg 支持对您的数据进行 类似 Git 的语义。此连接器利用这些语义来提供具有弹性的同步。

  • 在每次同步中,每个微批次都会创建一个新的快照。

  • 在截断同步期间,连接器会将刷新后的数据写入 airbyte_staging 分支,并在同步结束时用 airbyte_staging 替换 main 分支。由于大多数查询引擎以 main 分支为目标,因此人们可以在截断同步结束之前查询您的数据,此时它会原子地交换到新版本。

分支替换

在 stream 同步结束时,我们将用我们正在处理的 airbyte_staging 分支替换当前的 main 分支。我们故意避免快速前进以更好地处理潜在的压缩问题。重要警告:在同步开始后对 main 分支所做的任何更改都将在该过程中丢失。

压缩

警告

请勿在截断刷新同步期间运行压缩以防止数据丢失。 在截断刷新同步期间,系统会删除不属于最新生成的所有文件。这包括

  • 没有生成 ID 的文件(压缩文件)
  • 来自先前生成的的文件

如果压缩与同步同时运行,它将删除当前生成中的文件,导致数据丢失。系统通过解析文件名中的生成 ID 来识别生成。

注意事项和限制

本节记录了有关此 Iceberg 目标如何与其他产品交互的已知注意事项和限制。

Snowflake

Airbyte 使用 Iceberg 行级别删除来标记旧记录版本为过时。如果您将 Iceberg 表用于 Snowflake,Snowflake 不会识别 Iceberg 表的本机 Iceberg 行级别删除,对于使用 Glue 等外部目录的 Iceberg 表 (请参阅 Snowflake 的文档)。因此,您的查询结果将返回记录的所有版本。

例如,下表包含 'Alice' 记录的三个版本。

idnameupdated_at_airbyte_extracted_at
1Alice2024-03-01 10:002024-03-01 10:10
1Alice2024-03-02 12:002024-03-02 12:10
1Alice2024-03-03 14:002024-03-03 14:10

为了缓解此问题,请生成一个标志以检测过时的记录。Airbyte 生成一个 airbyte_extracted_at 元数据字段,该字段有助于实现此目的。

row_number() over (partition by {primary_key} order by {cursor}, _airbyte_extracted_at)) != 1 OR _ab_cdc_deleted_at IS NOT NULL as is_outdated;

现在,您可以查询 is_outdated 是否为 false,以识别 'Alice' 记录的最新版本。

idnameupdated_at_airbyte_extracted_atrow_numberis_outdated
1Alice2024-03-01 10:002024-03-01 10:103True
1Alice2024-03-02 12:002024-03-02 12:102True
1Alice2024-03-03 14:002024-03-03 14:101False

参考

配置字段参考

字段
类型
属性名称
对象
catalog_type
字符串
main_branch_name
字符串
s3_bucket_name
字符串
s3_bucket_region
字符串
warehouse_location
字符串
access_key_id
字符串
s3_endpoint
字符串
secret_access_key

变更日志

展开以查看
版本日期拉取请求主题
0.3.422026-01-1270205实现对 Polaris catalog 的 scope 和 OAuth 服务器 URI 属性的支持
0.3.412025-11-0769232升级到 Bulk CDK 0.1.69。更改以处理提交模式的变化
0.3.402025-11-0569133升级到 Bulk CDK 0.1.61。
0.3.392025-10-1568108实现 Polaris 支持
0.3.382025-10-0767005修复:将空字符串 role_arn 视为 null,以防止产生误导性的配置错误
0.3.372025-10-0767150修复检查操作以使用唯一的表名,防止与过时元数据和并发操作冲突
0.3.362025-09-2666711CHECK 操作使用配置的默认数据集,而不是 airbyte_test_namespace
0.3.352025-07-2363746删除表中的不必要属性
0.3.342025-07-1162952更新 CDK 版本
0.3.332025-07-0962888更新 CDK 版本以处理截断刷新场景中删除文件时的压缩问题
0.3.322025-07-0862852修复元数据(撤销意外存档)
0.3.312025-07-0762835固定到最新的 CDK 版本 0.522
0.3.302025-06-2662105从 main 到 staging 的 ReplaceBranch,而不是快速前进
0.3.292025-06-1361588发布版本以考虑管道中可能发生的重复发布。无操作更改。 警告:这存在一个错误。请勿使用。
0.3.282025-05-0759710CDK 反压错误修复
0.3.272025-04-2158146升级到最新 CDK
0.3.262025-04-1758104杂项:现在传递一个字符串来表示区域
0.3.252025-04-1658085内部重构
0.3.242025-03-2756435错误修复:正确处理非正数。
0.3.232025-03-2556395错误修复:正确地强制转换嵌套数组中的值。
0.3.222025-03-2456355升级到 airbyte/java-connector-base:2.0.1 以兼容 M4。
0.3.212025-03-22#56347错误修复:stream start 并不总是等待 iceberg 设置
0.3.202025-03-24#55849内部重构
0.3.192025-03-19#55798CDK:类型改进
0.3.182025-03-18#55811CDK:传递 DestinationStream 而不是 Descriptor
0.3.172025-03-13#55737CDK:传递 DestinationRecordRaw 而不是 DestinationRecordAirbyteValue
0.3.162025-03-13#55755从标识符字段中排除数字字段
0.3.152025-02-28#54724认证连接器
0.3.142025-02-14#53241新的 CDK 接口;性能改进,跳过初始记录暂存
0.3.132025-02-14#53697内部重构
0.3.122025-02-12#53170改进文档,调整无效模式演化的错误处理
0.3.112025-02-12#53216支持在覆盖/截断刷新/清除同步中进行任意模式更改
0.3.102025-02-11#53622启用 Nessie 集成测试
0.3.92025-02-10#53165非常基础的可用性改进和文档
0.3.82025-02-10#52666将块大小更改为 1.5Gb
0.3.72025-02-07#53141添加围绕 Rest catalog 的集成测试
0.3.62025-02-06#53172内部重构
0.3.52025-02-06#53164改进在去重模式下空主键的错误消息
0.3.42025-02-05#53173调整规范措辞
0.3.32025-02-05#53176修复 time_with_timezone 处理 (值现在调整为 UTC)
0.3.22025-02-04#52690在使用 AWS Glue 时处理流名称/命名空间中的特殊字符
0.3.12025-02-03#52633修复去重
0.3.02025-01-31#52639使数据库/命名空间成为必填字段
0.2.232025-01-27#51600内部重构
0.2.222025-01-22#52081实现对 REST catalog 的支持
0.2.212025-01-27#52564修复流中 0 条记录时的崩溃
0.2.202025-01-23#52068添加对默认命名空间(/数据库名称)的支持
0.2.192025-01-16#51595连接器配置选项的澄清
0.2.182025-01-15#51042将结构体作为 JSON 字符串写入,而不是 Iceberg 结构体。
0.2.172025-01-14#51542新的标识符字段应标记为必需。
0.2.162025-01-14#51538如果传入字段与现有字段不同,则更新标识符字段
0.2.152025-01-14#51530为 nessie catalog 设置 S3 存储桶的 AWS 区域
0.2.142025-01-14#50413根据传入模式更新现有表模式
0.2.132025-01-14#50412实现确定 iceberg 类型之间超类型的逻辑
0.2.122025-01-10#50876添加对 AWS 实例配置文件身份验证的支持
0.2.112025-01-10#50971AWS 身份验证流程中的内部重构
0.2.102025-01-09#50400添加 S3DataLakeTypesComparator
0.2.92025-01-09#51022重命名 Iceberg V2 中的所有类和文件
0.2.82025-01-09#51012重命名/清理 Iceberg V2 中的包
0.2.72025-01-09#50957添加对 GLUE RBAC (Assume role) 的支持
0.2.62025-01-08#50991首次公开发布。