Skip to content Skip to sidebar Skip to footer

Is It Possible For Airflow Scheduler To First Finish The Previous Day's Cycle Before Starting The Next?

Right now, nodes in my DAG proceeds to the next day's task before the rest of the nodes of that DAG finishes. Is there a way for it to wait for the rest of the DAG to finish before

Solution 1:

Might be a bit late for this answer, but I ran into the same issue and the way I resolved it is I added two extra tasks in each dag. "Previous" at the start and "Complete" at the end. Previous task is external task sensor which monitors previous job. Complete is just a dummy operator. Lets say it runs every 30 minutes so the dag would look like this:

dag = DAG(dag_id='TEST_DAG', default_args=default_args, schedule_interval=timedelta(minutes=30))

PREVIOUS = ExternalTaskSensor(
    task_id='Previous_Run',
    external_dag_id='TEST_DAG',
    external_task_id='All_Tasks_Completed',
    allowed_states=['success'],
    execution_delta=timedelta(minutes=30),
    dag=DAG
)

T1 = BashOperator(
    task_id='TASK_01',
    bash_command='echo"Hello World from Task 1"',
    dag=dag
)

COMPLETE = DummyOperator(
    task_id='All_Tasks_Completed',
    dag=DAG
)

PREVIOUS >> T1 >> COMPLETE

So the next dag, even tho it will come into the queue, it will not let tasks run until PREVIOUS is completed.

Solution 2:

What ended up working for me is a combination of

  1. Adding task dependencies : wait_for_downstream=True, depends_on_past=True
  2. Adding max_active_runs:1 to while creating the dag. I did try to add max_active_runs as a default argument, but that did not work.

Solution 3:

if you want to just run one instance at a time then try setting max_active_runs=1

Solution 4:

You can achieve the same without any additional tasks. The trick is add a dependency from the 1st task to the last task and set wait_for_downstream=True for the 1st task.

In that case, the 1st task of the next DAGrun will wait until the last task of the current run is complete. So the future DAGruns will be on hold. You can then set max_active_runs=1 to limit such on-hold schedules.

A detailed example is given here airflow-dag-past-run-sense

Look at the dependencies.

withDAG('dag_past_run_sense',
         schedule_interval='*/2 * * * *',
         default_args=default_args,
         max_active_runs=1) as dag:

    read_incr_data = BashOperator(
        task_id='read_incr_data',
        wait_for_downstream=True,
        bash_command='date'
    )

    prepare_scd2 = BashOperator(
        task_id='prepare_scd2',
        bash_command='date'
    )

    load_final_table = PythonOperator(
        task_id="load_final_table",
        python_callable=load_final_table_status
    )

    read_incr_data >> prepare_scd2 >> load_final_table
    read_incr_data >> load_final_table

Post a Comment for "Is It Possible For Airflow Scheduler To First Finish The Previous Day's Cycle Before Starting The Next?"