Airflow quick start

Database and Ruby, Python, History


What’s Airflow

Airflow is a platform to programmatically author, schedule and monitor workflows.

DAGs

Directed Acyclic Graph is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Run a DAG

Source code of example_bash_operator

Click the code tab and you will see the source code.

"""Example DAG demonstrating the usage of the BashOperator."""

import datetime

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id='example_bash_operator',
    schedule_interval='0 0 * * *',
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
    tags=['example', 'example2'],
    params={"example_key": "example_value"},
) as dag:
    run_this_last = EmptyOperator(
        task_id='run_this_last',
    )

    # [START howto_operator_bash]
    run_this = BashOperator(
        task_id='run_after_loop',
        bash_command='echo 1',
    )
    # [END howto_operator_bash]

    run_this >> run_this_last

    for i in range(3):
        task = BashOperator(
            task_id='runme_' + str(i),
            bash_command='echo "" && sleep 1',
        )
        task >> run_this

    # [START howto_operator_bash_template]
    also_run_this = BashOperator(
        task_id='also_run_this',
        bash_command='echo "run_id= | dag_run="',
    )
    # [END howto_operator_bash_template]
    also_run_this >> run_this_last

# [START howto_operator_bash_skip]
this_will_skip = BashOperator(
    task_id='this_will_skip',
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last

Check the log

Click the Grid and you will see the execution history of your DAGs as well as the tasks there. Click one task and click the log link on the right, you will see the logs.

Install Airflow

export AIRFLOW_HOME=~/airflow

# Install Airflow using the constraints file
AIRFLOW_VERSION=2.3.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.7
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.7.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Initialize Database

This will create the database with sqlite3 by default. Later we will switch it to Postgresql.

airflow db init

Create admin

airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

Startup webserver and scheduler

airflow webserver --port 8080
airflow scheduler

Switch the database to Postgresql

Create the airflow_db database and airflow_user user.

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

Update the airflow.cfg file.

sql_alchemy_conn = postgresql+psycopg2://airflow_user:airflow_pass@127.0.0.1/airflow_db

Create the admin again.

```bash
airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

Create a connection

You could create connection in Airflow, then you could use the connection in your Python code.

Add a variable

You could also create variables and read it later in you Python code.

Put things together

You could get the variable via tmpl_search_path = Variable.get("sql_path"). You could use the database connection via pg_hook = PostgresHook(postgres_conn_id=conn_id).

Here is the example.


def get_sql_results(sql, conn_id):
    pg_hook = PostgresHook(
      postgres_conn_id=conn_id
    )
    conn = pg_hook.get_conn()
    cursor = conn.cursor()
    cursor.execute(sql)

    desc = cursor.description
    column_names = [col[0] for col in desc]
    results = [dict(zip(column_names, row)) for row in cursor.fetchall()]
    print(results)
    return results


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(0),
    'provide_context': True
}

tmpl_search_path = Variable.get("sql_path")

with DAG(
    'etl_demo',
    schedule_interval='*/10 * * * *',
    dagrun_timeout=timedelta(minutes=180),
    template_searchpath=tmpl_search_path,
    default_args=args,
    max_active_runs=1) as dag:

    jobs = get_sql_results('select * from public.airflow_etl_jobs where is_active is TRUE', 'pzhong_local')
    end_task = DummyOperator(task_id="end")

    for job in jobs:
        transfer = PostgresToPostgresOperator(
            sql=job['sql_path'],
            pg_table='.'.join([job['table_schema'], job['table_name']]),
            primary_key=job['primary_key'],
            src_postgres_conn_id=job['src_conn_id'],
            dest_postgres_conn_id=job['dest_conn_id'],
            pg_preoperator=job['preoperator'],
            pg_postoperator=job['postoperator'],
            parameters={ 'proceeded_to': str(job['proceeded_to']) },
            task_id=f"extract_{job['table_name']}",
            dag=dag
        )


        transfer >> end_task

Create the tasks by configuration in database

Create one table as below.

create table airflow_etl_jobs(
	id serial,
	name varchar,
	src_conn_id varchar,
	dest_conn_id varchar,
	table_schema varchar,
	table_name varchar,
	preoperator varchar,
	postoperator varchar,
	sql_path varchar,
	parameters jsonb,
	primary_key varchar,
	cdc_col varchar,
	proceeded_to varchar,
	src_cnt int8,
	dest_cnt int8,
	is_active BOOLEAN
);

and the data will look like this.

Then you could add job via inserting record in database and monitor the job status by following query.

SELECT
	table_schema,
	table_name,
	proceeded_to,
	src_cnt,
	dest_cnt,
	ROUND(dest_cnt * 100.0 / src_cnt, 4)
FROM
	airflow_etl_jobs
ORDER BY
	ROUND(dest_cnt * 100.0 / src_cnt, 4);

ETL

Initial data load

We need to consider how to implement the initial data load with ETL. You could ask DBA to load the data, it’s suitable for large tables. You could also use ETL tool to migrate the data step by step if you’re ok with the potential slowness.

Delta data

Basically, we used the column modified_at to identify the delta data. It means you need to note down the max modified_at you migrated, and seek for records with lager modified_at to migrate.

Do not do fully load because it’s slow!

Merge operation

If the records are only inserted and never updated, you could ignore this. However, it’s not true in real world. You need to take upsert into consideration. In SQL Server, you could use merge to achieve this, in MySQL and Postgresql, you could use insert into ... on conflict(id) do update ....

Bad data

Sometimes, the modified_at could be null. You may need to manually load them.

Also, it’s possible that many records have the same modified_at, and the record size is larger than the batch size. So you may see your ETL job stuck at processing one day again and again.