S3 数据湖
本页指导您设置 S3 数据湖目标连接器。
此连接器是 Airbyte 官方支持在 S3 上使用 Iceberg 协议的方式。它使用受支持的 Iceberg 目录将 Iceberg 表格式写入 S3 或兼容 S3 的存储后端。
先决条件
S3 数据湖连接器需要两样东西。
-
一个 S3 存储桶或兼容 S3 的存储后端。
-
一个受支持的 Iceberg 目录。目前,该连接器支持以下目录
- REST
- AWS Glue
- Nessie
- Polaris
设置指南
请按照以下步骤设置您的 S3 存储和 Iceberg 目录权限。
S3 设置和权限
S3 设置包括创建存储桶策略和进行身份验证。
创建存储桶策略
创建存储桶策略。
-
打开 IAM 控制台。
-
在 IAM 仪表板中,选择 策略 > 创建策略。
-
选择 JSON 选项卡,并将以下 JSON 粘贴到策略编辑器中。请在突出显示的行中替换您自己的存储桶名称。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets",
"s3:GetObject*",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:DeleteObject",
"s3:ListBucket*"
],
"Resource": [
"arn:aws:s3:::YOUR_BUCKET_NAME/*",
"arn:aws:s3:::YOUR_BUCKET_NAME"
]
}
]
}注意仅对象级权限不足以进行身份验证。请包含 存储桶级 权限,如前述示例所示。
-
单击 下一步,为您的策略指定一个描述性名称,然后单击 创建策略。
身份验证
在大多数情况下,您使用 IAM 用户进行身份验证。如果您使用 Airbyte Cloud 和 Glue 目录,则可以使用 IAM 角色进行身份验证。
使用 IAM 用户进行身份验证(自托管或 Cloud,任何目录)
使用现有或新的 访问密钥 ID 和秘密访问密钥。
-
在 IAM 仪表板中,单击 用户。
-
如果您正在使用现有的 IAM 用户,请选择该用户,然后单击 添加权限 > 添加权限。如果您正在创建一个新用户,请单击 添加用户。
-
单击 直接附加策略,然后选中您的策略的复选框。单击 下一步 > 添加权限。
-
单击 安全凭证 选项卡 > 创建访问密钥。AWS 控制台会提示您选择用例并为您的访问密钥添加可选标签。
-
单击 创建访问密钥。请记下您的密钥。
-
在 Airbyte 中,将这些密钥输入到 Airbyte 连接器的 AWS 访问密钥 ID 和 AWS 秘密访问密钥 字段中。
使用 IAM 角色进行身份验证(仅限 Cloud 和 Glue 目录)
要使用 IAM 角色进行 S3 身份验证,Airbyte 团队成员必须启用它。如果您想使用此功能,请 联系销售团队。
-
在 IAM 仪表板中,单击角色,然后创建角色。
-
选择AWS 账户受信任实体类型。
-
设置角色的信任关系。这允许 Airbyte 实例的 AWS 帐户承担此角色。您还需要指定外部 ID,这是信任服务(Airbyte)和受信任角色(您正在创建的角色)都知道的秘密密钥。此 ID 可防止“困惑的代理”问题。外部 ID 应为您的 Airbyte 工作区 ID,您可以在工作区页面的 URL 中找到它。编辑信任关系策略以包含外部 ID
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::094410056844:user/delegated_access_user"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "{your-airbyte-workspace-id}"
}
}
}
]
} -
完成角色创建并保存角色 ARN 以供以后使用。
-
选择直接附加权限,然后找到并选中您的新策略。单击下一步,然后单击添加权限。
-
在 Airbyte 中,选择 Glue 作为目录,并将角色 ARN 输入到 角色 ARN 字段中。
Iceberg 目录设置和权限
设置过程的其余部分取决于您使用的目录。
REST
输入您的 REST 目录的 URI。您可能还需要输入默认命名空间。
AWS Glue
-
更新您之前创建的 S3 策略,以授予这些 Glue 权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets",
"s3:GetObject*",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:DeleteObject",
"s3:ListBucket*",
"glue:TagResource",
"glue:UnTagResource",
"glue:BatchCreatePartition",
"glue:BatchDeletePartition",
"glue:BatchDeleteTable",
"glue:BatchGetPartition",
"glue:CreateDatabase",
"glue:CreateTable",
"glue:CreatePartition",
"glue:DeletePartition",
"glue:DeleteTable",
"glue:GetDatabase",
"glue:GetPartition",
"glue:GetPartitions",
"glue:GetTable",
"glue:GetTables",
"glue:UpdateDatabase",
"glue:UpdatePartition",
"glue:UpdateTable"
],
"Resource": [
"arn:aws:s3:::YOUR_BUCKET_NAME/*",
"arn:aws:s3:::YOUR_BUCKET_NAME"
]
}
]
} -
将 仓库位置 选项设置为
s3://<存储桶名称>/存储桶内的路径。 -
如果您使用的是 Airbyte Cloud 并且使用 IAM 角色进行身份验证,请将 角色 ARN 选项设置为您在 设置 S3 上的身份验证 时记下的值。
-
如果您有一个现有的 Glue 表,并且想要用 Airbyte 管理的 Iceberg 表替换该表,请删除 Glue 表。否则,您将遇到错误
输入 Glue 表不是 iceberg 表:<您的表名>。从控制台删除 Glue 表 可能不会立即删除它们。请等待 AWS 完成其后台处理,或使用 AWS API 删除所有表版本。
-
如果您正在使用 AWS Lake Formation,您必须通过 Lake Formation 授予一些权限
- 您必须授予 S3 路径上的
Data location_access。 - 如果您希望连接器代表您创建数据库,您还必须授予目录上的
Create database。 - (高级选项) 如果您想手动创建数据库,并且只将连接器写入这些特定的数据库,那么您必须授予数据库上的
Create table, Describe。
- 您必须授予 S3 路径上的
Nessie
要使用 Nessie 进行身份验证,请执行以下两件事。
-
设置您的 Nessie 目录的 URI 和访问令牌以进行身份验证到该目录。
-
将 仓库位置 选项设置为
s3://<存储桶名称>/存储桶内的路径。
Polaris
要使用 Apache Polaris 进行身份验证,请按照以下步骤操作。
-
设置您的 Polaris 目录并创建一个具有必要权限的主体。请参阅 Apache Polaris 文档 以获取详细的设置说明。
-
在 Polaris 中创建主体时,您将收到 OAuth 凭据(客户端 ID 和客户端密钥)。请安全地保存这些凭据。
-
向您的主体目录角色授予所需的权限。您可以
选项 A:授予广泛的
CATALOG_MANAGE_CONTENT权限(推荐以简化操作)- 此单个权限允许连接器管理目录中的表和命名空间
选项 B:授予特定的细粒度权限:
TABLE_LIST- 列出命名空间中的表TABLE_CREATE- 创建新表TABLE_DROP- 删除表TABLE_READ_PROPERTIES- 读取表元数据TABLE_WRITE_PROPERTIES- 更新表元数据TABLE_WRITE_DATA- 将数据写入表NAMESPACE_LIST- 列出命名空间NAMESPACE_CREATE- 创建新命名空间NAMESPACE_READ_PROPERTIES- 读取命名空间元数据
-
在 Airbyte 连接器配置中,提供以下信息
- Polaris 服务器 URI:您的 Polaris 服务器的基本 URL。例如:
https://:8181/api/catalog - Catalog 名称:您在 Polaris 中创建的 catalog 的名称(例如:
quickstart_catalog) - Client ID:创建 principal 时提供的 OAuth Client ID
- Client Secret:创建 principal 时提供的 OAuth Client Secret
- 默认命名空间:当目标命名空间设置为“目标定义”或“源定义”时,用于表标识符的命名空间
- Polaris 服务器 URI:您的 Polaris 服务器的基本 URL。例如:
-
将 仓库位置 选项设置为
s3://<存储桶名称>/存储桶内的路径。 -
确保您的 Polaris catalog 已配置适当的存储凭据以访问您的 S3 bucket。
输出 schema
Airbyte 如何生成 Iceberg schema
在每个 stream 中,Airbyte 将顶层字段映射到 Iceberg 字段。Airbyte 将嵌套字段(对象、数组和联合)映射到字符串列,并将其作为序列化的 JSON 写入。
这是 Airbyte 类型与 Iceberg 类型之间的完整映射。
| Airbyte 类型 | Iceberg 类型 |
|---|---|
| 布尔值 | 布尔值 |
| 日期 | 日期 |
| 整数 | 长整型 |
| 数字 | 双精度浮点数 |
| 字符串 | 字符串 |
| 带时区的日期时间* | 日期时间 |
| 不带时区的日期时间 | 日期时间 |
| 带时区的日期时间戳* | 带时区的日期时间戳 |
| 不带时区的日期时间戳 | 不带时区的日期时间戳 |
| 对象 | 字符串(JSON 序列化的值) |
| 数组 | 字符串(JSON 序列化的值) |
| 联合 | 字符串(JSON 序列化的值) |
*Airbyte 会在写入 Iceberg 文件之前,将 带时区的日期时间 和 带时区的日期时间戳 类型转换为协调世界时 (UTC)。
管理 schema 演化
此连接器从不重写现有的 Iceberg 数据文件。这意味着 Airbyte 只能处理特定的源 schema 更改。
- 添加或删除列
- 拓宽列
- 更改主键
您有以下选项来管理 schema 演化。
-
要按出现的方式处理不受支持的 schema 更改,请等待同步失败,然后采取措施恢复它。您可以
命名
与大多数 Airbyte 目标连接器一样,S3 Data Lake 连接器可能会修改标识符(stream 名称/命名空间、列名称)以与目标兼容。
特别是,当使用 AWS Glue 时,连接器将
去重
此连接器使用合并读取策略来支持去重。
- Airbyte 将 stream 的主键转换为 Iceberg 的 标识符列。
- “更新插入”是对该行主键的基于等值的删除,然后插入新数据。
关于主键的假设
S3 Data Lake 连接器假定以下两种情况之一为真
- 源从不尝试在单个同步中多次发出相同的主键。
- 如果源在单个尝试中多次发出相同的主键,它始终以从旧到新的游标顺序发出这些记录。
如果不满足这些条件,您可能会在 Iceberg 中看到不准确的数据,表现为旧记录优先于新记录。如果发生这种情况,请将追加或覆盖用作您的 同步模式。
有未知数量的 API 源具有不满足这些条件的 stream。Airbyte 知道 Stripe 和 Monday 不满足这些条件,但可能还有其他。
分支和数据可用性
Iceberg 支持对您的数据进行 类似 Git 的语义。此连接器利用这些语义来提供具有弹性的同步。
-
在每次同步中,每个微批次都会创建一个新的快照。
-
在截断同步期间,连接器会将刷新后的数据写入
airbyte_staging分支,并在同步结束时用airbyte_staging替换main分支。由于大多数查询引擎以main分支为目标,因此人们可以在截断同步结束之前查询您的数据,此时它会原子地交换到新版本。
分支替换
在 stream 同步结束时,我们将用我们正在处理的 airbyte_staging 分支替换当前的 main 分支。我们故意避免快速前进以更好地处理潜在的压缩问题。重要警告:在同步开始后对 main 分支所做的任何更改都将在该过程中丢失。
压缩
请勿在截断刷新同步期间运行压缩以防止数据丢失。 在截断刷新同步期间,系统会删除不属于最新生成的所有文件。这包括
- 没有生成 ID 的文件(压缩文件)
- 来自先前生成的的文件
如果压缩与同步同时运行,它将删除当前生成中的文件,导致数据丢失。系统通过解析文件名中的生成 ID 来识别生成。
注意事项和限制
本节记录了有关此 Iceberg 目标如何与其他产品交互的已知注意事项和限制。
Snowflake
Airbyte 使用 Iceberg 行级别删除来标记旧记录版本为过时。如果您将 Iceberg 表用于 Snowflake,Snowflake 不会识别 Iceberg 表的本机 Iceberg 行级别删除,对于使用 Glue 等外部目录的 Iceberg 表 (请参阅 Snowflake 的文档)。因此,您的查询结果将返回记录的所有版本。
例如,下表包含 'Alice' 记录的三个版本。
id | name | updated_at | _airbyte_extracted_at |
|---|---|---|---|
| 1 | Alice | 2024-03-01 10:00 | 2024-03-01 10:10 |
| 1 | Alice | 2024-03-02 12:00 | 2024-03-02 12:10 |
| 1 | Alice | 2024-03-03 14:00 | 2024-03-03 14:10 |
为了缓解此问题,请生成一个标志以检测过时的记录。Airbyte 生成一个 airbyte_extracted_at 元数据字段,该字段有助于实现此目的。
row_number() over (partition by {primary_key} order by {cursor}, _airbyte_extracted_at)) != 1 OR _ab_cdc_deleted_at IS NOT NULL as is_outdated;
现在,您可以查询 is_outdated 是否为 false,以识别 'Alice' 记录的最新版本。
id | name | updated_at | _airbyte_extracted_at | row_number | is_outdated |
|---|---|---|---|---|---|
| 1 | Alice | 2024-03-01 10:00 | 2024-03-01 10:10 | 3 | True |
| 1 | Alice | 2024-03-02 12:00 | 2024-03-02 12:10 | 2 | True |
| 1 | Alice | 2024-03-03 14:00 | 2024-03-03 14:10 | 1 | False |
参考
配置字段参考
变更日志
展开以查看
| 版本 | 日期 | 拉取请求 | 主题 |
|---|---|---|---|
| 0.3.42 | 2026-01-12 | 70205 | 实现对 Polaris catalog 的 scope 和 OAuth 服务器 URI 属性的支持 |
| 0.3.41 | 2025-11-07 | 69232 | 升级到 Bulk CDK 0.1.69。更改以处理提交模式的变化 |
| 0.3.40 | 2025-11-05 | 69133 | 升级到 Bulk CDK 0.1.61。 |
| 0.3.39 | 2025-10-15 | 68108 | 实现 Polaris 支持 |
| 0.3.38 | 2025-10-07 | 67005 | 修复:将空字符串 role_arn 视为 null,以防止产生误导性的配置错误 |
| 0.3.37 | 2025-10-07 | 67150 | 修复检查操作以使用唯一的表名,防止与过时元数据和并发操作冲突 |
| 0.3.36 | 2025-09-26 | 66711 | CHECK 操作使用配置的默认数据集,而不是 airbyte_test_namespace |
| 0.3.35 | 2025-07-23 | 63746 | 删除表中的不必要属性 |
| 0.3.34 | 2025-07-11 | 62952 | 更新 CDK 版本 |
| 0.3.33 | 2025-07-09 | 62888 | 更新 CDK 版本以处理截断刷新场景中删除文件时的压缩问题 |
| 0.3.32 | 2025-07-08 | 62852 | 修复元数据(撤销意外存档) |
| 0.3.31 | 2025-07-07 | 62835 | 固定到最新的 CDK 版本 0.522 |
| 0.3.30 | 2025-06-26 | 62105 | 从 main 到 staging 的 ReplaceBranch,而不是快速前进 |
| 0.3.29 | 2025-06-13 | 61588 | |
| 0.3.28 | 2025-05-07 | 59710 | CDK 反压错误修复 |
| 0.3.27 | 2025-04-21 | 58146 | 升级到最新 CDK |
| 0.3.26 | 2025-04-17 | 58104 | 杂项:现在传递一个字符串来表示区域 |
| 0.3.25 | 2025-04-16 | 58085 | 内部重构 |
| 0.3.24 | 2025-03-27 | 56435 | 错误修复:正确处理非正数。 |
| 0.3.23 | 2025-03-25 | 56395 | 错误修复:正确地强制转换嵌套数组中的值。 |
| 0.3.22 | 2025-03-24 | 56355 | 升级到 airbyte/java-connector-base:2.0.1 以兼容 M4。 |
| 0.3.21 | 2025-03-22 | #56347 | 错误修复:stream start 并不总是等待 iceberg 设置 |
| 0.3.20 | 2025-03-24 | #55849 | 内部重构 |
| 0.3.19 | 2025-03-19 | #55798 | CDK:类型改进 |
| 0.3.18 | 2025-03-18 | #55811 | CDK:传递 DestinationStream 而不是 Descriptor |
| 0.3.17 | 2025-03-13 | #55737 | CDK:传递 DestinationRecordRaw 而不是 DestinationRecordAirbyteValue |
| 0.3.16 | 2025-03-13 | #55755 | 从标识符字段中排除数字字段 |
| 0.3.15 | 2025-02-28 | #54724 | 认证连接器 |
| 0.3.14 | 2025-02-14 | #53241 | 新的 CDK 接口;性能改进,跳过初始记录暂存 |
| 0.3.13 | 2025-02-14 | #53697 | 内部重构 |
| 0.3.12 | 2025-02-12 | #53170 | 改进文档,调整无效模式演化的错误处理 |
| 0.3.11 | 2025-02-12 | #53216 | 支持在覆盖/截断刷新/清除同步中进行任意模式更改 |
| 0.3.10 | 2025-02-11 | #53622 | 启用 Nessie 集成测试 |
| 0.3.9 | 2025-02-10 | #53165 | 非常基础的可用性改进和文档 |
| 0.3.8 | 2025-02-10 | #52666 | 将块大小更改为 1.5Gb |
| 0.3.7 | 2025-02-07 | #53141 | 添加围绕 Rest catalog 的集成测试 |
| 0.3.6 | 2025-02-06 | #53172 | 内部重构 |
| 0.3.5 | 2025-02-06 | #53164 | 改进在去重模式下空主键的错误消息 |
| 0.3.4 | 2025-02-05 | #53173 | 调整规范措辞 |
| 0.3.3 | 2025-02-05 | #53176 | 修复 time_with_timezone 处理 (值现在调整为 UTC) |
| 0.3.2 | 2025-02-04 | #52690 | 在使用 AWS Glue 时处理流名称/命名空间中的特殊字符 |
| 0.3.1 | 2025-02-03 | #52633 | 修复去重 |
| 0.3.0 | 2025-01-31 | #52639 | 使数据库/命名空间成为必填字段 |
| 0.2.23 | 2025-01-27 | #51600 | 内部重构 |
| 0.2.22 | 2025-01-22 | #52081 | 实现对 REST catalog 的支持 |
| 0.2.21 | 2025-01-27 | #52564 | 修复流中 0 条记录时的崩溃 |
| 0.2.20 | 2025-01-23 | #52068 | 添加对默认命名空间(/数据库名称)的支持 |
| 0.2.19 | 2025-01-16 | #51595 | 连接器配置选项的澄清 |
| 0.2.18 | 2025-01-15 | #51042 | 将结构体作为 JSON 字符串写入,而不是 Iceberg 结构体。 |
| 0.2.17 | 2025-01-14 | #51542 | 新的标识符字段应标记为必需。 |
| 0.2.16 | 2025-01-14 | #51538 | 如果传入字段与现有字段不同,则更新标识符字段 |
| 0.2.15 | 2025-01-14 | #51530 | 为 nessie catalog 设置 S3 存储桶的 AWS 区域 |
| 0.2.14 | 2025-01-14 | #50413 | 根据传入模式更新现有表模式 |
| 0.2.13 | 2025-01-14 | #50412 | 实现确定 iceberg 类型之间超类型的逻辑 |
| 0.2.12 | 2025-01-10 | #50876 | 添加对 AWS 实例配置文件身份验证的支持 |
| 0.2.11 | 2025-01-10 | #50971 | AWS 身份验证流程中的内部重构 |
| 0.2.10 | 2025-01-09 | #50400 | 添加 S3DataLakeTypesComparator |
| 0.2.9 | 2025-01-09 | #51022 | 重命名 Iceberg V2 中的所有类和文件 |
| 0.2.8 | 2025-01-09 | #51012 | 重命名/清理 Iceberg V2 中的包 |
| 0.2.7 | 2025-01-09 | #50957 | 添加对 GLUE RBAC (Assume role) 的支持 |
| 0.2.6 | 2025-01-08 | #50991 | 首次公开发布。 |