IAS Technology Insider is dedicated to discussing computer science, software engineering, and emerging technologies. From deep dives into machine learning algorithms and cloud computing architectures to discussions on cybersecurity trends and data analytics methodologies, our tech experts offer insights and analyses that resonate with enthusiasts and professionals alike.
By Staff Engineer, IAS
In part 1 of this blog series, I discussed the design of the pipelines we built to process late arriving data. In this post, I will detail the steps we added to the pipelines to automatically process late arriving data.
How to Process Late Arriving Data
Our architecture consists of two parts: a Receiver Pipeline and a Processing Pipeline. This allows us to ingest and process late data separately.
Late Data in the Receiver
We need to ensure that all available data is ingested and partitioned correctly, regardless of how late it is. This offloads the bulk of the reprocessing complexity to the backend pipeline.
For partners that push data to our API, we process all events regardless of when we receive them. Our API handles events in realtime and partitions them based on a timestamp field in the payload.
For partners that we pull data from in batches, we implement additional steps to ensure everything is ingested. When we encounter new data that has not been ingested, we cache its metadata in a DynamoDB table (i.e. filename, ingestion date). When we poll for new batches of data, we use a rolling lookback window of several days to see if any data was added late. If that data is not already cached in DynamoDB, it is then downloaded and partitioned in S3.
Late Data in the Backend
We use Airflow to orchestrate our backend processing pipeline. Individual Airflow pipelines are called DAGs (directed acyclic graphs), which are composed of independent tasks connected together to form a dependency graph.
The pipeline consists of two separate Airflow DAGs:
- Processing DAG: Processes a single day’s worth of data. This involves reading input data from S3, running various aggregations, and writing results to the data warehouse.
- Monitoring DAG: Monitors for late arriving data in S3. When this DAG detects enough late arriving data for a previous day, it retriggers the processing DAG for that day.
Processing DAG
In order to process late arriving data, it is critical that the processing DAG is idempotent. Each DAG run is executed with a specific date range for processing data. When we rerun the pipeline to process late data, we ensure that there is no data duplication in the output.
To achieve this, each step in the pipeline overwrites the output of the previous run. The output of each step is written to S3 and partitioned by the DAG’s execution date, so it is as easy as deleting the directory in S3. The final step in the pipeline loads data from S3 to our data warehouse, Snowflake. The data in the data warehouse is also partitioned by date, so it is deleted before inserting the new output.
Lastly, we track how much input data was processed with each execution of the DAG which the monitoring DAG uses to determine whether there is late arriving data. To do this, as soon as the pipeline begins processing data, it takes a snapshot of the size of the partition in S3 in bytes. The date and size are stored in a table in DynamoDB.
Monitoring DAG
The monitoring DAG is a separate pipeline. It runs daily to check for late arriving data in S3 and excludes checking the current day that may still be processing.
We decide to reprocess data for a given day if it increases in size by a certain percentage. This percentage is configurable for each data source and determined based on the input source and potential business impact.
There are occurrences where the infrastructure costs in AWS to reprocess data are not cost effective, so we skip those. We determine the exact threshold based on historical data size and average cost to process a day’s worth of data.
In this example, let’s assume that we configured the percentage threshold to 5%. The difference between the snapshots in DynamoDB and current partition sizes in S3 falls under 5% for 2/29 and 2/28, but for 2/27 it is about 10% due to late arrivals in S3. In this case, the monitoring DAG would retrigger the processing DAG for 2/27 only.
After rerunning the DAG for 2/27, all 110GB of data is processed through the backend pipeline. The snapshot in DynamoDB is updated to reflect the most recent run.
When we reprocess data, the monitoring DAG automatically kicks off the processing DAG for the given date and waits for it to finish. In Airflow terms, this is called “clearing” a DAG run. Here is an example code snippet to accomplish this. Note that as of writing, we are using Airflow version 1.10.10:
from datetime import datetime
from airflow import AirflowException, DAG
from airflow.exceptions import AirflowSkipException
from airflow.models import BaseOperator, DagRun
class CheckRawDataStateOperator(BaseOperator):
“””This custom operator checks for late arriving data.”””
def __init__(self, execution_date: datetime, threshold_percent: int, *args, **kwargs):
BaseOperator.__init__(self, *args, **kwargs)
self.execution_date = execution_date
self.threshold_percent = threshold_percent
def execute(self, context):
# Pull data size from S3 and DynamoDB using ‘self.execution_date’
size_in_s3 = …
size_in_dynamodb = …
percentage_change = (size_in_s3 – size_in_dynamodb) / size_in_dynamodb * 100
if percentage_change < self.threshold_percent:
# Raising an ‘AirflowSkipException’ tell Airflow to skip the remaining tasks in the pipeline
raise AirflowSkipException()
class ClearDagRunOperator(BaseOperator):
“””This custom operator clears the execution of a DAG for a specific date.”””
def __init__(self, dag_to_clear: DAG, execution_date: datetime, *args, **kwargs):
BaseOperator.__init__(self, *args, **kwargs)
self.dag_to_clear = dag_to_clear
self.execution_date = execution_date
def execute(self, context):
dag_run = DagRun.find(
dag_id=self.dag_to_clear.dag_id,
execution_date=self.execution_date,
)
if dag_run is None:
raise AirflowException(f”DagRun not found for {self.dag_to_clear.dag_id} on {self.execution_date}”)
self.dag_to_clear.clear(
start_date=dag_run.execution_date,
end_date=dag_run.execution_date,
)
# We need a reference to the DAG that is going to be cleared.
processing_dag: DAG = …
# The number of days to look in the past to check for late arriving data.
# Each day runs an independent chain of tasks in the pipeline.
lookback_days = 7
for days_ago in range(1, lookback_days + 1):
# Task to check if data in S3 is larger than data in DynamoDB
# Raises an ‘AirflowSkipException’ if
check_state_task = CheckRawDataStateOperator(
task_id=”check_state_{days_ago}_days_ago”.format(days_ago=days_ago),
)
clear_dag_task = ClearDagRunOperator(
task_id=”clear_dag_run_{days_ago}”.format(days_ago=days_ago),
dag_to_clear=processing_dag,
execution_date=”{{ execution_date – macros.timedelta(days={days_ago}) }}”.format(days_ago=days_ago),
)
# Another task can be added here to monitor the output of the Processing DAG.
# Task dependency chain
check_state_task >> clear_dag_task >> …
Once the processing DAG finishes running, the monitoring DAG compares the outputs and notifies several teams of the results via email.
Results
With our new system, we monitor for late data 24/7. We also reprocess late arriving data for multiple days at a time in parallel because each pipeline runs independently in Airflow and AWS. The new pipelines save us valuable engineering time that was previously spent running scripts or checking output, decrease reprocessing time, and provide results as soon as late data arrives.
Ideas for Improvement
This solution works well for our current needs but we have ideas for design improvements:
- More sophisticated tracking of late data arrivals. We only track changes in overall file size. Depending on the type of input, we could track the total number of files or the upload dates in S3 to determine if files were overwritten.
- Versioning data that is reprocessed. With our design we are overwriting reprocessed data, but we could save a history of changes. This would allow us to do a more in depth analysis of the data before and after.
- Processing data in real-time instead of batches. We only need to support processing data on the daily level, but this could change in the future.
By leveraging the tools provided by Airflow, we built our pipelines to meet all of our requirements for processing late arriving data. We created a receiver component that is lightweight and only deals with ingesting the data made available to us. This allows for the complexity of processing late arriving data to be offloaded to the backend. We then use Airflow’s built-in features for creating pipelines that can be replayed and continuously monitored. This process means we continue to deliver on our customers’ goals with the speed and efficiency our business demands.
Article originally published on August 26, 2021.