databricks/iceberg-kafka-connect
Java
Captured source
source ↗databricks/iceberg-kafka-connect
Language: Java
License: Apache-2.0
Stars: 284
Forks: 67
Open issues: 107
Created: 2023-04-07T02:56:28Z
Pushed: 2025-07-03T17:11:42Z
Default branch: main
Fork: no
Archived: no
README:
THIS REPOSITORY IS NOT MAINTAINED AS THE CODE HAS BEEN DONATED TO THE UPSTREAM APACHE ICEBERG PROJECT
Please visit the Documentation or find the current maintained source code here
Apache Iceberg Sink Connector
The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writing data from Kafka into Iceberg tables.
Features
- Commit coordination for centralized Iceberg commits
- Exactly-once delivery semantics
- Multi-table fan-out
- Row mutations (update/delete rows), upsert mode
- Automatic table creation and schema evolution
- Field name mapping via Iceberg’s column mapping functionality
Installation
The Apache Iceberg Sink Connector is under active development, with early access builds available under Releases. You can build the connector zip archive yourself by running:
./gradlew -xtest clean build
The zip archive will be found under ./kafka-connect-runtime/build/distributions.
Configuration
| Property | Description | |--------------------------------------------|------------------------------------------------------------------------------------------------------------------| | iceberg.tables | Comma-separated list of destination tables | | iceberg.tables.dynamic-enabled | Set to true to route to a table specified in routeField instead of using routeRegex, default is false | | iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables | | iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified | | iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) | | iceberg.tables.default-partition-by | Default comma-separated list of partition fields to use when creating tables | | iceberg.tables.cdc-field | Name of the field containing the CDC operation, I, U, or D, default is none | | iceberg.tables.upsert-mode-enabled | Set to true to enable upsert mode, default is false | | iceberg.tables.auto-create-enabled | Set to true to automatically create destination tables, default is false | | iceberg.tables.evolve-schema-enabled | Set to true to add any missing record fields to the table schema, default is false | | iceberg.tables.schema-force-optional | Set to true to set columns as optional during table create and evolution, default is false to respect schema | | iceberg.tables.schema-case-insensitive | Set to true to look up table columns by case-insensitive name, default is false for case-sensitive | | iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create | | iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence | | iceberg.table.\.commit-branch | Table-specific branch for commits, use iceberg.tables.default-commit-branch if not specified | | iceberg.table.\.id-columns | Comma-separated list of columns that identify a row in the table (primary key) | | iceberg.table.\.partition-by | Comma-separated list of partition fields to use when creating the table | | iceberg.table.\.route-regex | The regex used to match a record's routeField to a table | | iceberg.control.topic | Name of the control topic, default is control-iceberg | | iceberg.control.group-id | Name of the consumer group to store offsets, default is cg-control- | | iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) | | iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) | | iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) | | iceberg.catalog | Name of the catalog, default is iceberg | | iceberg.catalog.* | Properties passed through to Iceberg catalog initialization | | iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded | | iceberg.hadoop.* | Properties passed through to the Hadoop configuration | | iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |
If iceberg.tables.dynamic-enabled is false (the default) then you must specify iceberg.tables. If iceberg.tables.dynamic-enabled is true then you must specify iceberg.tables.route-field which will contain the name of the table. Enabling iceberg.tables.upsert-mode-enabled will cause all appends to be preceded by an equality delete. Both CDC and upsert mode require an Iceberg V2 table with identity fields defined.
Kafka configuration
By default the connector will attempt to use Kafka client config from the worker properties for connecting to the control topic. If that config cannot be read for some reason, Kafka client settings can be set explicitly using iceberg.kafka.* properties.
Source topic offsets
Source topic offsets are stored in two different consumer groups. The first is the sink-managed consumer group defined by the iceberg.control.group-id property. The second is the Kafka Connect managed consumer group which is named connect- by default. The sink-managed consumer group is used by the sink to achieve exactly-once processing. The Kafka Connect consumer group is only used as a fallback if the sink-managed consumer group is missing. To reset the offsets, both consumer groups need to be reset.
Message format
Messages should be converted to a struct or map using the appropriate Kafka Connect converter.
Catalog configuration
The iceberg.catalog.* properties are required for connecting to the Iceberg catalog. The core catalog types are included in the default distribution, including REST, Glue, DynamoDB, Hadoop, Nessie, JDBC, and Hive. JDBC drivers are not included in the default distribution, so you will need to include those if needed. When using a Hive catalog, you can use the distribution that includes the Hive metastore client, otherwise you will need to include that yourself.
To set the catalog type, you can set iceberg.catalog.type to rest, hive, or hadoop. For other catalog…
Excerpt shown — open the source for the full document.