Connect GX Cloud and Airflow
In this quickstart, you'll learn how to use GX Cloud with Apache Airflow. You'll create a simple DAG that runs a Checkpoint that you have already set up in GX Cloud, and then trigger it through a local installation of an Airflow server.
Apache Airflow is an orchestration tool that allows you to schedule and monitor your data pipelines. For more information about Apache Airflow, see the Apache Airflow documentation.
Prerequisites
-
You have a GX Cloud account.
-
You have installed Apache Airflow and initialized the database (airflow db init).
-
You have connected GX Cloud to a Data Asset on a Data Source.
-
You have created an Expectation Suite and added Expectations.
-
You have added a Checkpoint to your Expectation.
Run Airflow Standalone to create a freshh local Airflow environment
-
The
airflow standalone
command initializes the database, creates a user, and starts all components.airflow standalone
This command will eventually output a username a password for the Airflow UI like this:
standalone | Airflow is ready
standalone | Login with username: admin password: Bpu6RgmPMMaDeeq5
standalone | Airflow Standalone is for development purposes only. Do not use this in production! -
Access Airflow UI:
Once the web server is running, open a web browser and go to http://localhost:8080 (by default) to access the Airflow UI using the username and password from the last step
Create a DAG file for your GX Cloud Checkpoint
-
Open a terminal, browse to the
airflow
folder in your home directory, and then run the following code to create a new DAG namedgx_dag.py
:Terminal inputcd ~/airflow
mkdir dags
cd dags
touch gx_dag.py -
Open the
gx_dag.py
DAG file and add the following code:Pythonimport os
import great_expectations as gx
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2023, 8, 9),
catchup=False,
)
def gx_dag_with_deco():
os.environ["NO_PROXY"] = "*" #https://github.com/apache/airflow/discussions/24463
print("Great Expectations DAG Started")
@task
def run_checkpoint():
print("Running Checkpoint")
# Replace <YOUR_ACCESS_TOKEN>, <YOUR_CLOUD_ORGANIZATION_ID>, and <CHECKPOINT_NAME> with your credentials
# You can also set GX_CLOUD_ACCESS_TOKEN and GX_CLOUD_ORGANIZATION_ID as environment variables
GX_CLOUD_ACCESS_TOKEN = ""
GX_CLOUD_ORGANIZATION_ID = ""
# alternativey set CHECKPOINT_NAME to be a runtime parameter
CHECKPOINT_NAME = ""
context = gx.get_context(
cloud_access_token=GX_CLOUD_ACCESS_TOKEN,
cloud_organization_id=GX_CLOUD_ORGANIZATION_ID,
)
checkpoint = context.get_checkpoint(name=CHECKPOINT_NAME)
checkpoint.run()
return f"Checkpoint ran: {CHECKPOINT_NAME}"
run_checkpoint()
run_this = gx_dag_with_deco() -
Save your changes and close the
gx_dag.py
DAG file.
Run the DAG (Manually)
-
Restart the airflow server to pick up the new DAG file.
-
Sign in to Airflow using the username and password from the first standalone run
-
In the Actions column, click Trigger DAG for
gx_dag
and confirm your DAG runs as expected.
Clean up local Airflow environment
-
Delete the local files and sqllite database
Terminal inputrm -rf ~/airflow