Codementor Events

Workflow with airflow

Published Nov 24, 2017Last updated May 22, 2018

Airflow is an open source project started at Airbnb. It is a tool to orchestrate the desire flow of your application dynamically which is readily scalable to infinity because of it modular architecture and message queuing mechanism.

It can be also understood as advance cron application which executes the tasks when their dependencies are fulfilled. And can even retry the task, if failed, for a certain number of time configured for it.

This is how a airflow tasks pipeline looks like:

sd.png

In the above example each block represents task and some of the task are connected to other tasks reflecting their dependencies and relationship.

Let’s say you need to develop and application which helps your customer find some common products available online at some selected e-commerce platform and generate the report then send them. For this purpose you can design a workflow where one task is designed to collect data from e-commerce platform, another task to categorise the data based on their type and so on.

These tasks are created in a python file called DAG(Directed Acyclic Graph) file. A DAG can have arbitrary number of tasks. And one DAG represents a single logical workflow.

DAG example:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

args = {
    'owner': 'owner',
    'start_date': datetime.today()
}

dag = DAG(
    dag_id='common_products',
    default_args=args,
    schedule_interval=timedelta(1)
)

init = BashOperator(
    task_id='init',
    bash_command='python /opt/jobs/init.py',
    dag=dag
)

data_config = BashOperator(
    task_id='data_config',
    bash_command='python /opt/jobs/data_config.py',
    dag=dag
)

platform_a = BashOperator(
    task_id='platform_a',
    bash_command='python /opt/jobs/collect_data.py platform_a',
    dag=dag
)

platform_b = BashOperator(
    task_id='platform_b',
    bash_command='python /opt/jobs/collect_data.py platform_b',
    dag=dag
)

platform_c = BashOperator(
    task_id='platform_c',
    bash_command='python /opt/jobs/collect_data.py platform_c',
    dag=dag
)

platform_d = BashOperator(
    task_id='platform_d',
    bash_command='python /opt/jobs/collect_data.py platform_d',
    dag=dag
)

categorise_data = BashOperator(
    task_id='categorise_data',
    bash_command='python /opt/jobs/categorise_data.py',
    dag=dag
)

find_most_common = BashOperator(
    task_id='find_most_common',
    bash_command='python /opt/jobs/find_most_common.py',
    dag=dag
)

compare_price = BashOperator(
    task_id='compare_price',
    bash_command='python /opt/jobs/compare_price.py',
    dag=dag
)

generate_report = BashOperator(
    task_id='generate_report',
    bash_command='/usr/bin/python /opt/jobs/generate_report.py',
    dag=dag
)

# setup the logical flow beetween each tasks
data_config.set_upstream(init)
platform_a.set_upstream(data_config)
platform_b.set_upstream(data_config)
platform_c.set_upstream(data_config)
platform_d.set_upstream(data_config)
categorise_data.set_upstream(platform_a)
categorise_data.set_upstream(platform_b)
categorise_data.set_upstream(platform_c)
categorise_data.set_upstream(platform_d)
find_most_common.set_upstream(categorise_data)
compare_price.set_upstream(find_most_common)
generate_report.set_upstream(compare_price)

DAG file can be saved in airflow default dag directory ~/airflow/dags. In the dag configuration line schedule_interval=timedelta(1) will tell airflow scheduler to execute this flow once everyday.

This is how this DAG will look like.

Screen Shot 2017-08-24 at 10.49.42 PM.png

Airflow has very elegant interface to monitor the workflow and see the log for individual task, really nice.

This is a very basic flow about how airflow can be used. Potentially it can be utilised to design any kind of workflow regardless of its complexity.

Discover and read more posts from feroz khan
get started
post commentsBe the first to share your opinion
yohannathomas
2 months ago

Hello. I am truly impressed with the research medical center’s dedication to advancing medical science and elevating patient care. The impact on patient care is tangible, with a noticeable improvement in treatments and outcomes. The research medical center https://ringfulhealth.com/ relentless pursuit of excellence is evident, making it a beacon of hope for those seeking top-notch medical advancements and compassionate care. Highly recommended for their transformative contributions to the field.

Baka Maka
4 months ago

The analytics provided by the Chatting Helper on ppv posts on onlyfans https://fans-crm.com/chatting-helper/ have been crucial for understanding my audience better. It offers detailed insights into follower interactions, helping me tailor my content more effectively. By analyzing trends and engagement patterns, I’ve been able to optimize my communication strategy, making sure I connect with my audience in the most impactful way.

Show more replies