IAS Technology Insider is dedicated to discussing computer science, software engineering, and emerging technologies. From deep dives into machine learning algorithms and cloud computing architectures to discussions on cybersecurity trends and data analytics methodologies, our tech experts offer insights and analyses that resonate with enthusiasts and professionals alike.
By Staff Engineer, IAS
At Integral Ad Science, we process high volumes of partner data through our platform every day, so that we can quickly provide meaningful insights to customers. Our partners control when the data is made available and sometimes there are delays due to API outages or planned downtime.
This two-part series will share an overview of our data intake and processing pipelines, focusing on how we built automatic monitoring and tools to handle late arriving data most effectively.
This post addresses how we designed our architecture to accommodate late arriving data, while the next outlines specific steps in the pipeline that handle reprocessing late arriving data.
Designing the Pipeline
Our pipelines use Airflow and various AWS services, including S3, EMR, and DynamoDB. Four key requirements influenced the pipeline architecture we built to handle late arriving data:
- It is critical that the entire pipeline design supports re-running against inputs that have already been processed.
- Automatically processing late arriving data is essential. By contrast, manually monitoring for late data, kicking off scripts, and verifying the results is costly, error-prone, and tedious.
- Thresholds for reprocessing data need to be configurable per data source and focused on reducing infrastructure costs.
- For compliance and tracking purposes, real-time alerts are required to indicate when late arriving data is processed and the impact on overall results.
When collecting partner data, some partners push data to the IAS platform via an API, while others require data to be pulled in batches throughout the day. We created two separate components to manage these different data sources: a Receiver Pipeline and a Processing Pipeline. The structure of these data sources is similar, so we generalized both components to work for all partners with minimal code duplication.
Receiver Pipeline
The receiver pipeline collects data from various sources and partitions the data into buckets based on timestamps. Partitions are created for each source, date, and hour of data stored in S3.
For partners that push data to IAS through an API, we stream the data through several lightweight steps to clean, enrich, and partition it.
For partners that require multiple daily data pulls, the IAS platform downloads the data in batches based on a set schedule. Our platform pulls data by downloading files rather than querying individual records from an API. When downloading files, we use DynamoDB as a cache to track files that have already been processed.
With both intake processes, partitioned data lands in an S3 bucket. We create matching partitions for the raw data in an AWS Glue database. For each partner, a Glue table is partitioned by date and hour, which is used in the backend processing pipeline to query the data from S3.
Backend Processing Pipeline
Once data is available in S3 and Glue, it is ready to process. We perform several steps to aggregate the data and generate business metrics. Regardless of how the receiving pipeline fetches raw data (push vs. pull), the backend pipeline processes it in batches. By leaving a few hours of buffer between receiving the last hour of data and processing, this helps minimize the amount of late arriving data, with the exception of data that arrives several hours or days late.
We use Airflow to build this part of the pipeline because it provides:
- Flexible scheduling and orchestration: Airflow makes it easy to schedule our jobs to run on regular intervals using cron syntax. It also provides powerful syntax for orchestrating separate tasks within the pipeline to work together and run in parallel.
- Replayability: This is a critical feature that allows us to rerun our pipeline at any point for any batch that was already processed. For example, when late data arrives for a previous date, we can kick off the Airflow job for that exact date again.
- Monitoring, error handling, and resilience: Easily monitoring progress throughout the day, tracking errors, and retrying steps that fail is critical.
We set up Airflow to integrate with various AWS services. Given the scale of data IAS processes, we use AWS EMR with Apache Spark.
The Airflow pipeline includes a sequence of tasks that orchestrate Spark jobs to process raw data. Each Spark job reads data from S3 using Glue tables created by the receiver pipeline. Once processing is complete, the final output is written to an S3 bucket and loaded into the IAS data warehouse.
We designed a series of pipelines that work together to gather data from our partners, aggregate it, and load it into a data warehouse for consumption. By using Airflow and partitioning the data, we designed the architecture to allow for late arriving data. In the next part of this blog, I will go into detail about the steps required to reprocess the late arriving data.
How to Process Late Arriving Data
Our architecture consists of two parts: a Receiver Pipeline and a Processing Pipeline. This allows us to ingest and process late data separately.
Late Data in the Receiver
We need to ensure that all available data is ingested and partitioned correctly, regardless of how late it is. This offloads the bulk of the reprocessing complexity to the backend pipeline.
For partners that push data to our API, we process all events regardless of when we receive them. Our API handles events in realtime and partitions them based on a timestamp field in the payload.
For partners that we pull data from in batches, we implement additional steps to ensure everything is ingested. When we encounter new data that has not been ingested, we cache its metadata in a DynamoDB table (i.e. filename, ingestion date). When we poll for new batches of data, we use a rolling lookback window of several days to see if any data was added late. If that data is not already cached in DynamoDB, it is then downloaded and partitioned in S3.
Late Data in the Backend
We use Airflow to orchestrate our backend processing pipeline. Individual Airflow pipelines are called DAGs (directed acyclic graphs), which are composed of independent tasks connected together to form a dependency graph.
The pipeline consists of two separate Airflow DAGs:
- Processing DAG: Processes a single day’s worth of data. This involves reading input data from S3, running various aggregations, and writing results to the data warehouse.
- Monitoring DAG: Monitors for late arriving data in S3. When this DAG detects enough late arriving data for a previous day, it retriggers the processing DAG for that day.
Processing DAG
In order to process late arriving data, it is critical that the processing DAG is idempotent. Each DAG run is executed with a specific date range for processing data. When we rerun the pipeline to process late data, we ensure that there is no data duplication in the output.
To achieve this, each step in the pipeline overwrites the output of the previous run. The output of each step is written to S3 and partitioned by the DAG’s execution date, so it is as easy as deleting the directory in S3. The final step in the pipeline loads data from S3 to our data warehouse, Snowflake. The data in the data warehouse is also partitioned by date, so it is deleted before inserting the new output.
Lastly, we track how much input data was processed with each execution of the DAG which the monitoring DAG uses to determine whether there is late arriving data. To do this, as soon as the pipeline begins processing data, it takes a snapshot of the size of the partition in S3 in bytes. The date and size are stored in a table in DynamoDB.
Monitoring DAG
The monitoring DAG is a separate pipeline. It runs daily to check for late arriving data in S3 and excludes checking the current day that may still be processing.
We decide to reprocess data for a given day if it increases in size by a certain percentage. This percentage is configurable for each data source and determined based on the input source and potential business impact.
There are occurrences where the infrastructure costs in AWS to reprocess data are not cost effective, so we skip those. We determine the exact threshold based on historical data size and average cost to process a day’s worth of data.
In this example, let’s assume that we configured the percentage threshold to 5%. The difference between the snapshots in DynamoDB and current partition sizes in S3 falls under 5% for 2/29 and 2/28, but for 2/27 it is about 10% due to late arrivals in S3. In this case, the monitoring DAG would retrigger the processing DAG for 2/27 only.
After rerunning the DAG for 2/27, all 110GB of data is processed through the backend pipeline. The snapshot in DynamoDB is updated to reflect the most recent run.
When we reprocess data, the monitoring DAG automatically kicks off the processing DAG for the given date and waits for it to finish. In Airflow terms, this is called “clearing” a DAG run. Here is an example code snippet to accomplish this. Note that as of writing, we are using Airflow version 1.10.10:
from datetime import datetime
from airflow import AirflowException, DAG
from airflow.exceptions import AirflowSkipException
from airflow.models import BaseOperator, DagRun
class CheckRawDataStateOperator(BaseOperator):
“””This custom operator checks for late arriving data.”””
def __init__(self, execution_date: datetime, threshold_percent: int, *args, **kwargs):
BaseOperator.__init__(self, *args, **kwargs)
self.execution_date = execution_date
self.threshold_percent = threshold_percent
def execute(self, context):
# Pull data size from S3 and DynamoDB using ‘self.execution_date’
size_in_s3 = …
size_in_dynamodb = …
percentage_change = (size_in_s3 – size_in_dynamodb) / size_in_dynamodb * 100
if percentage_change < self.threshold_percent:
# Raising an ‘AirflowSkipException’ tell Airflow to skip the remaining tasks in the pipeline
raise AirflowSkipException()
class ClearDagRunOperator(BaseOperator):
“””This custom operator clears the execution of a DAG for a specific date.”””
def __init__(self, dag_to_clear: DAG, execution_date: datetime, *args, **kwargs):
BaseOperator.__init__(self, *args, **kwargs)
self.dag_to_clear = dag_to_clear
self.execution_date = execution_date
def execute(self, context):
dag_run = DagRun.find(
dag_id=self.dag_to_clear.dag_id,
execution_date=self.execution_date,
)
if dag_run is None:
raise AirflowException(f”DagRun not found for {self.dag_to_clear.dag_id} on {self.execution_date}”)
self.dag_to_clear.clear(
start_date=dag_run.execution_date,
end_date=dag_run.execution_date,
)
# We need a reference to the DAG that is going to be cleared.
processing_dag: DAG = …
# The number of days to look in the past to check for late arriving data.
# Each day runs an independent chain of tasks in the pipeline.
lookback_days = 7
for days_ago in range(1, lookback_days + 1):
# Task to check if data in S3 is larger than data in DynamoDB
# Raises an ‘AirflowSkipException’ if
check_state_task = CheckRawDataStateOperator(
task_id=”check_state_{days_ago}_days_ago”.format(days_ago=days_ago),
)
clear_dag_task = ClearDagRunOperator(
task_id=”clear_dag_run_{days_ago}”.format(days_ago=days_ago),
dag_to_clear=processing_dag,
execution_date=”{{ execution_date – macros.timedelta(days={days_ago}) }}”.format(days_ago=days_ago),
)
# Another task can be added here to monitor the output of the Processing DAG.
# Task dependency chain
check_state_task >> clear_dag_task >> …
Once the processing DAG finishes running, the monitoring DAG compares the outputs and notifies several teams of the results via email.
Results
With our new system, we monitor for late data 24/7. We also reprocess late arriving data for multiple days at a time in parallel because each pipeline runs independently in Airflow and AWS. The new pipelines save us valuable engineering time that was previously spent running scripts or checking output, decrease reprocessing time, and provide results as soon as late data arrives.
Ideas for Improvement
This solution works well for our current needs but we have ideas for design improvements:
- More sophisticated tracking of late data arrivals. We only track changes in overall file size. Depending on the type of input, we could track the total number of files or the upload dates in S3 to determine if files were overwritten.
- Versioning data that is reprocessed. With our design we are overwriting reprocessed data, but we could save a history of changes. This would allow us to do a more in depth analysis of the data before and after.
- Processing data in real-time instead of batches. We only need to support processing data on the daily level, but this could change in the future.
By leveraging the tools provided by Airflow, we built our pipelines to meet all of our requirements for processing late arriving data. We created a receiver component that is lightweight and only deals with ingesting the data made available to us. This allows for the complexity of processing late arriving data to be offloaded to the backend. We then use Airflow’s built-in features for creating pipelines that can be replayed and continuously monitored. This process means we continue to deliver on our customers’ goals with the speed and efficiency our business demands.
Article originally published on July 16, 2021.