Brief Introduction to Apache Airflow — Part 2

Continuing along after my first blog on Apache Airflow, the next sections of the course were about using sensors, executors, templates and putting all the information together towards building a production-quality workflow.

Sensors are operators that wait for a specific condition to be true. This can be a condition such as whether a file was created or whether an upload to a database record occurred. In addition to the standard operator attributes, sensors have unique attributes regarded how to check for a condition. Mode determines how the sensor checks for the condition. The default mode is ‘poke’ in which the sensor continuously checks until the condition is complete. A poke_interval can be set to tell the sensor how often to wait in between checks and a timeout field denotes how long to wait before failing the sensor task. Another mode is ‘reschedule’, which means a worker slot is given up and the sensor tries again later.

One example of a sensor is FileSensor, which checks to see if a file exists at a specific location within a file system. Other sensors include ExternalTaskSensor, which waits for a task within another DAG to be completed first and HTTPSensor, which checks for content within a provided web URL.

Sample code for a FileSensor task. The filepath set to ‘salesdata.csv’ means that it checks for a file with this name before continuing. This file sensor task runs after initi_sales_cleanup but before generate_report.

Sensors are useful for when you wish to check if a condition is true but are unsure when exactly is will be completed. Since you can set how often to check the condition, a sensor can continuously check until it is true. A sensor can also be added if we do not wish to immediately fail a DAG when a condition is not satisfied.

Executors are what run the tasks that are defined within your workflows. There are different executors available depending on your needs and you can even create your own if desired. The default Airflow executor is the appropriately named SequentialExecutor which runs a task one at a time. Since it is run one at a time, it can be useful for debugging workflows that may not succeed. The fact that it is run sequentially means it is not recommended for large scale workflows since it is limiting for run-time and resources.

LocalExecutor runs all tasks on a single system. Each task is treated as a process and it is able to run as many tasks concurrently as allotted by the user or the system’s available resources. This makes it ideal for any single system production system as it allows for all available resources within that system to be used for executing the tasks. CeleryExecutor utilizes Celery, a Python queueing system, to set up multiple systems as a cluster. This is more suitable for large workflow configurations that may require further scaling.

In order to report the statuses of tasks, Airflow allows for the ability to get an email alert if, for instance, a task succeeded. The expected amount of time for a task or a DAG to run is referred to as its SLA or Service Level Agreement. An SLA Miss means that a task or DAG exceeded the expected time to be completed. This miss gets chronicled through an email alert and a note made in the system logs. SLA Misses can also be viewed through the Airflow web UI. This SLA is determined either through a ‘sla’ argument on the individual task or a dictionary of arguments for any tasks, using the timedelta function from the datetime library.

Defining a task SLA as 3 hours through the sla argument
Code for creating an EmailOperator task to send an email once generate_report is completed

A useful aspect when creating multiple tasks is the ability to use templates within Airflow. Templates allow for the substitution of data when defining tasks. They are especially useful if we are creating multiple, repetitive tasks. For example, if we are creating tasks to echo a list of files, we could define a task for each file within that list, but that process can get very repetitive if we are dealing with numerous files. Instead, we can set the bash command to a previously defined templated command and pass in a dictionary of parameters for the template to use. We can define a filename string and pass it to the command and the tasks will properly run. You can also create more advanced templates, such as iterating over a list, since Airflow utilizes the Jinja templating language. Macros are variables that Airflow provides to be used for passing in objects to templates, such a datetime or timedelta object.

Comparison of code for defining multiple tasks to echo a list of files: no template command (left), templated command (middle), templated command with iterative list

The final sections of the course involved using all the knowledge gained towards implementing a workflow into a production pipeline. By the end of this course, I felt as though I had gained a respectable amount of introductory information on Airflow. Getting to see how all these parts work together and help make workflows automated was fascinating. I understand now how powerful a tool like this can be and why it is widely used. I would like to dig deeper into some of Airflow’s documentation and look into potentially setting up my own environment to tinker around with. Thank you for reading!

Data Science student and aspiring Data Analyst

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store