Migrating to Apache Iceberg
Adobe Experience Platform is an open system for driving real-time personalized experiences. Customers use it to centralize and standardize their data across the enterprise resulting in a 360-degree view of their data of interest. That view can then be used with intelligent services to drive experiences across multiple devices, run targeted campaigns, classify profiles and other entities into segments, and leverage advanced analytics. At the center of our Data Lake Architecture is the underlying storage. Data Lake relies on a Hadoop Distributed File System (HDFS) compatible backend for data storage, which today is the cloud-based storage provided by Azure (Azure’s Gen2 Data Lake Service (ADLS)).
The Problem
Adobe Experience Platform Catalog Service provides a way of listing, searching, and provisioning a DataSet, which is our equivalent of a Table in a Relational Database. It is helpful in providing information such as name, description, schema, and applying for permissions, and all metadata recorded on Adobe Experience Platform. As more data is ingested over time, it becomes difficult to query metadata from Catalog. With the introduction of Iceberg, we see a transitional shift in how metadata is captured and recorded. Iceberg records Adobe Experience Platform single-tenant storage architecture exposes us to some interesting challenges when migrating customers to Iceberg. For each tenant, we can have one of three scenarios:
Tenant is a new customer completely on Apache Iceberg.
Tenant is an existing customer actively building out new integrations having a hybrid of Iceberg and legacy datasets.
Tenant is an existing customer with only legacy datasets.
Here’s a snapshot of differently sized datasets across all the clients we’ve migrated.
Motivations for Migration
Iceberg migration is the process of moving data from one table format to Iceberg or introducing Iceberg as a table format to the data. While this might seem pretty straightforward, it involves a change in storage and table format. Apart from the benefits that Apache Iceberg provides out-of-the-box, there were several factors that motivated us to migrate customers to Iceberg:
Decoupling dependency on Catalog: With Iceberg, metadata is now co-located with data files and we can reduce dependency on a meta-data service. Iceberg’s metadata is now our source of truth, which we use for querying and filtering data.
Faster data access: Iceberg’s metadata is leveraged for partition pruning and filtering, which makes data access queries performant and cheaper to execute. Customers see their data access queries improving and returning results faster while maintaining accuracy.
Clean-up: Historically, customers build a proof of concept to vet new features or workflows. These datasets are usually isolated to a feature release. We clean up stale datasets and artifacts that were no longer needed as part of this migration process.
Future migrations: We realized this was an opportunity to set the stage for future migrations. From this, we learned how we can better support similar types of migrations. For instance, we could build an offline version of the dataset that we swap out the older version for it with little to no impact on producers and/or consumers.
Customer Categorizations
When clients interact with Data Lake, they read and write data to DataSets based on its unique GUID and/or name. The metadata for datasets is stored in Catalog and is used to configure partitioning behavior, schema information, policy enforcement, and other behaviors within Adobe Experience Platform. We needed to devise a plan that not only catered to each customer’s downtime and availability constraints but also considered their need for maintaining metadata in Catalog or data on ADLS. Each customer had a different degree of comfort dropping data and/ or metadata. Based on our evaluations, we bucketed customers into four categories:
Account Reset: These customers are fine dropping their data from storage and also resetting corresponding metadata. They do not experience any downtime as new datasets are enabled for Iceberg flows.
Metadata Reset: These customers do not care about the metadata as long as all of their existing data is accessible albeit under a new dataset. There is no anticipated downtime as data can be backfilled to a new Iceberg enabled dataset.
Data Reset: These customers do not care about the data but are interested in maintaining the metadata. There is a small scheduled downtime (usually minutes) as we need to drop existing data from the Data Lake and reset the dataset for Iceberg ingestion.
Migrate: These customers want all of their existing metadata and data migrated to Iceberg. These customers are very sensitive to downtime as they have active flows ingesting and egressing data. There is anticipated downtime as we take steps to convert datasets to Iceberg.
Depending on total account activity and volume (size), we prioritized each bucket and devised a migration plan for each customer. For the purposes of this blog, we will focus on how we migrated customers (data and metadata) to Iceberg.
Migration Best Practices
Regardless of the type of data, we are migrating, the goal of a migration strategy is to enhance performance and competitiveness. Less successful migrations can result in inaccurate data that contains redundancies or is corrupted in the process. This can happen even when source data is fully usable and adherent to data policies. Furthermore, any issues that did exist in the source data can be amplified when it’s ported over to Iceberg.
A complete data migration strategy prevents a subpar experience that ends up creating more problems than it solves. Aside from missing deadlines and exceeding budgets, incomplete plans can cause migration projects to fail altogether. While planning and strategizing the work, we set foundational criteria to govern the overall migration framework. A strategic data migration plan should include consideration of these critical factors:
Minimal customer downtime: Customer requests to read and write data access should be least impacted. Essentially, the migration process should be transparent to the client and their requests should execute as expected. This also includes maintaining data ingress and egress Service Level Agreements (SLAs).
Metadata lineage: Datasets are prepared by ingesting batches of data over a period of time. We must preserve batch history (and its lineage) of metadata so clients that might be relying on known conventions can continue to do so even after the dataset is migrated to Iceberg. This lineage is not maintained in either reset scenarios because it involves dropping data or metadata or both.
Audit trail: A comprehensive migration plan shares all insights if it encounters an unknown state or even tracks the overall migration process. It maintains detailed audit trails and any additional diagnosis information, which helps troubleshoot unexpected conditions or failures. We must also audit the data post-migration in order to ensure the accuracy of the migration workflow.
Disaster recovery and rollback: During the planning and design phases, and throughout implementation and maintenance, test the migration plan to make sure it will eventually achieve the desired outcome. Migration plans must be prepared for an edge case and known contingencies. In case we hit any issues, the design of the migration strategy should permit rollbacks and recovery; we should never leave a dataset partially migrated.
Cost-effective: Regardless of the strategy we choose, there is a cost associated with migrating data. The total anticipated savings in cost realized by Iceberg should be greater than the cost of migrating the data and maintaining it in that format. The migration plan must be robust and a lightweight process to make this transition as seamless as possible. At the same time, a watered-down version might not be able to handle all complexities one might expect in production environments.
Migration Strategies
There is more than one way to build a migration strategy and most strategies fall into one of two categories: “big bang” or “trickle”. In a big bang migration, the full dataset transfer is completed within a limited window of time and data can be unavailable for the duration of the migration. On the contrary, trickle migrations complete the migration process in phases. During implementation, the old system and the new are run in parallel, which eliminates downtime or operational interruptions. In the case of Iceberg migration, we had to be more creative about the migration model we choose because:
A pure big bang approach was not acceptable as customers might see interruptions in reading or writing data operations and the downtime window was not large enough to migrate all datasets at once.
A trickle-down approach was not possible because it was not technically feasible (in the limited timeframe) to support full table scans on a dataset that has data partially written to Iceberg. As a platform, we did not see the benefit in maintaining such a hybrid model.
Based on our specific business needs and requirements, we explored two strategies to migrate customer datasets to Iceberg. Both strategies are catered towards our specific use case; choosing selectively from the big bang and trickle migration approaches.
In-Place Upgrade
An in-place migration strategy is where datasets are upgraded to Iceberg table format without the need for data restatement or reprocessing. This implies data files are not modified during migration and all Iceberg metadata (manifests, manifest lists, and snapshots) are generated outside of the purview of the data. We are essentially recreating all metadata in an isolated environment and co-locating them with the data files. This utility requires that the dataset we are migrating must be registered with Spark’s Session Catalog.
Pros
Avoids full table scans to read data as there is no data restatement.
Iceberg metadata is obtained without reading the actual data files (assumes that source data is in Parquet): Min and max statistics are obtained for each column in the schema by peeking into parquet data file footers. File path and partition metadata is obtained from Spark’s Session Catalog (in Memory Catalog).
We abstract the table format of the dataset in Catalog. This allows us to build Iceberg metadata behind the scene while the client can continue reading from this dataset in source format. We enable Iceberg for consumption once the metadata is generated for all ingested data.
Disaster rollback is simple. We only need to delete the metadata directory if we observe corruption (duplication or loss) in metadata files.
Batch lineage is maintained as we do not modify metadata stored in Catalog.
Cons
The utility assumes the dataset is not being modified (upserts or deletes). If there are new modifications to the table after we’ve started generating metadata then we need to start the process again. This process provides minimal read but a high write downtime.
Rollback is the only possible option for disaster recovery. Other recovery methods are not straightforward because metadata files (having GUIDs) are constructed on the fly and restarting the entire process requires starting from a clean slate.
The process is prone to frequent restarts due to missing checkpointing and also when write downtime is not feasible for some customers.
This process requires table metadata to be stored in Spark’s Session Catalog. We need to modify the open-source utility as Adobe Experience Platform maintains a Catalog outside of Spark’s infrastructure.
In this process, the Iceberg table created in a big bang approach with a single snapshot. Data reads will be slow initially due to a single snapshot but will gradually improve as more data is ingested (more snapshots are created).
Due to the rudimentary nature of the workflow, audits are not maintained. The process will error out in case there are failures in migration. We are also unable to track metadata generation for partitions, files, or batches (rows of data).
In case data restatement is required, this workflow will not work as source data files are not modified.
Shadow Migration
In the case of shadow migration, we’ve followed a hydration model where we would create a new dataset that shadows the source dataset in terms of batches. Once the shadow has caught up, we would flip a switch that swaps the shadowed dataset with the source. Here’s our migration architecture showcasing the critical pieces of the overall workflow.
Migration Service
The Migration Service (MS) is a stateless, scalable, and tenant agnostic migration engine. MS is designed for migrating data on the Data Lake that has been collected by touch-points enabled through Adobe’s Digital Marketing Solutions. MS maintains a pool of migrant workers and each worker is responsible for migrating an Experience Data Model (XDM) compliant dataset to Iceberg. Each worker maintains an army of dedicated helpers that are tasked to fan out the migration workflow. For instance, we migrate multiple batches of a dataset simultaneously and at the same time, MS can handle the migration of many such datasets belonging to a single client. A client’s datasets on the Data Lake can be migrated by a single or a pool of MS instances. Let’s go over a few terms used in describing the migration workflow:
Here’s a step-by-step representation:
We provide a list of datasets that we intend to migrate to Iceberg. If a list is not provided, then MS explores the Data Lake to find datasets to migrate.
We load the metadata of each Source and run a complete audit and pre-flight validations. These checks help us determine if this dataset can be migrated or is already in Iceberg or we are resuming its migration.
Once we determine the necessary checkpoint, we either create or resume migration for an existing Shadow; hidden from the customer’s purview of the Data Lake.
Each migration worker is tasked with creating a data ingestion pipeline that would feed in from the Source and write to the Shadow.
Each worker executes a migration workflow, which is a pre-defined set of actions, and uses its helpers to fanout execution of each action.
Each worker helper operates at the Batch level. It creates a Shadow Batch for each Source Batch ingested; maintaining all its meta (including tags and external id); preserving lineage.
During migration, we leverage optimizations we have enabled on our regular ingestion workflow: Write-Audit-Publish (WAP) and Buffered Writes.
As the Shadow is hydrated, the ingestion workflow will generate necessary Iceberg metadata for each Shadow batch ingested.
After the Shadow has caught up, we run audit checks to see data parity. This entails row counts match and schema check.
Once audit passes, we put the Source in maintenance mode, disabling writes to the dataset. This is a critical point where we regenerate Iceberg metadata for the Source using the metadata from Shadow as an initial blueprint.
We rewrite Shadow’s Iceberg metadata with new file and manifest paths; substituting Shadow’s path with that of the Source. All other values are copied verbatim.
Once Source iceberg metadata is generated, we rename data files to the Source path. Rename is a lightweight metadata operation (even for large datasets).
We also update metadata stored in Catalog. Shadow Batches and DatasetFiles are updated to point to the Source. Old Source Batches and DatasetFiles are deactivated to remove references to stale non-iceberg data.
We lift the maintenance flag, update the table format meta and resume writes to the dataset.
Lastly, we clean up stale artifacts such as the Shadow dataset from Catalog and rename old Source parquet data to a temporary scratchpad to garbage collect later.
Pros
Customer read and write downtimes are small ranging from a couple of seconds to minutes for GBs and TBs of data respectively.
We record metrics to track progress, which helps audit and dissect the workflow to determine limitations. We can track the migration of each Source batch via a Shadow batch, external ID, or tag.
Batch lineage is re-created on the Shadow and then ported over to the Source. Although batch identifiers are changed, lineage by external id is preserved.
Disaster rollback and recovery are possible by deleting the Shadow, which drops its related meta from Catalog and data from Data Lake.
It is possible to test different configs when migrating a Source. We can create a new Shadow for each config we want to test and evaluate its impact. Once we’ve finalized a config, we can resume migration for the Source with a specific Shadow.
MS is a stateless service and all migration checkpoints are stored in Catalog. This implies that MS can be restarted and it would resume operation from its last known checkpoint.
Due to pre-validation checks and data restatement, bugs in the existing Source are not carried forward to Iceberg, ensuring a seamless migration.
Finally, data corruption and loss are highly unlikely because:
Source data is restated and written to Shadow dataset path.
Shadow data is validated for row count and schema match.
Shadow data is renamed to a different directory in Source (so it does not collide with old parquet data).
The workflow is table format agnostic and sets the stage for future migrations. If we decide to migrate clients to a different table format, we can re-use parts of this workflow.
Cons
Since clients continued to ingest data into the existing system, it was challenging to keep data in sync between Source and Shadow.
Disaster recovery is easy until the workflow is at a point where it’s updating metadata in Catalog. If we need to roll back changes past that point, a surgical fix is needed.
We maintain an additional copy of the Source in Catalog and ADLS when migrating, which should be accounted for.
There is a cost to building plumbing for migration, which needs to be accounted for in the budget and project schedule.
We need to test all anticipated scenarios diligently to understand how the Migration Service might react.
Choosing a Strategy
Weighing our options, we decided to choose Shadow Migration instead of the In-Place Migration strategy. Here’s why:
Lessons Learnt
Migrating datasets to Apache Iceberg with Shadow Migration strategy definitely had its challenges. Here are a few pitfalls we hit.
Traffic Isolation and Scalability
The design of the Migration Service re-used the existing ingestion architecture to execute its business logic. We repurposed the existing ingestion framework — compute power and plumbing; to materialize this workflow. This implies there was a contention of resources for ingesting data that belonged to the migration process and customer-triggered ingestion traffic; either live data or a periodic backfill. We needed knobs to isolate migration process-related traffic and scale individual pieces of the workflow.
Reprioritizing batches: In the face of limited resources and maintaining ingestion SLAs, we reprioritized migration batches to be ingested at a lower priority than customer data (live or backfill) i.e batches were ingested at a lower throughput.
Isolate compute workspaces: Jobs to ingest migration batches were executed on a dedicated cluster with configurable workers. We controlled how the cluster scaled depending on the availability of a larger pool of shared resources.
Configuring Buffered writes: Iceberg is a behind-the-scene table optimization and its migration is not governed by regular ingestion SLAs. This allowed us to tune Buffered writes such that it is optimized for compaction — buffer for a longer duration and write bigger files to disk.
Rewriting Iceberg Metadata
One of the critical design elements of the Shadow migration workflow was our ability to successfully regenerate Iceberg metadata for the Source table based on that of the Shadow. Depending on the nature of the data and our configurations for buffering writes, some tables had a large number of snapshots generated and the execution of this logic was done on a single node on the cluster (driver). This exposed us to two challenges -
Metadata correctness: Iceberg’s metadata is used for driving query execution and inaccurate values in metadata can lead to queries returning incorrect results. When rewriting Iceberg metadata at scale, we had to ensure there was no corruption or duplication at Source or Shadow.
Distribute writes: To scale the rewrite metadata process for larger datasets, we leveraged Spark to read all Iceberg metadata as a Dataframe and applied path transformations as a User Defined Function (UDF). This helped distribute writes across the cluster.
References
#apache-iceberg #azure #big-data #migration #query-latency