Is It Possible For Airflow Scheduler To First Finish The Previous Day's Cycle Before Starting The Next?
Solution 1:
Might be a bit late for this answer, but I ran into the same issue and the way I resolved it is I added two extra tasks in each dag. "Previous" at the start and "Complete" at the end. Previous task is external task sensor which monitors previous job. Complete is just a dummy operator. Lets say it runs every 30 minutes so the dag would look like this:
dag = DAG(dag_id='TEST_DAG', default_args=default_args, schedule_interval=timedelta(minutes=30))
PREVIOUS = ExternalTaskSensor(
task_id='Previous_Run',
external_dag_id='TEST_DAG',
external_task_id='All_Tasks_Completed',
allowed_states=['success'],
execution_delta=timedelta(minutes=30),
dag=DAG
)
T1 = BashOperator(
task_id='TASK_01',
bash_command='echo"Hello World from Task 1"',
dag=dag
)
COMPLETE = DummyOperator(
task_id='All_Tasks_Completed',
dag=DAG
)
PREVIOUS >> T1 >> COMPLETE
So the next dag, even tho it will come into the queue, it will not let tasks run until PREVIOUS is completed.
Solution 2:
What ended up working for me is a combination of
- Adding task dependencies : wait_for_downstream=True, depends_on_past=True
- Adding max_active_runs:1 to while creating the dag. I did try to add max_active_runs as a default argument, but that did not work.
Solution 3:
if you want to just run one instance at a time then try setting max_active_runs=1
Solution 4:
You can achieve the same without any additional tasks. The trick is add a dependency from the 1st task to the last task and set wait_for_downstream=True for the 1st task
.
In that case, the 1st task of the next DAGrun will wait until the last task of the current run is complete. So the future DAGruns will be on hold. You can then set max_active_runs=1
to limit such on-hold schedules.
A detailed example is given here airflow-dag-past-run-sense
Look at the dependencies.
withDAG('dag_past_run_sense',
schedule_interval='*/2 * * * *',
default_args=default_args,
max_active_runs=1) as dag:
read_incr_data = BashOperator(
task_id='read_incr_data',
wait_for_downstream=True,
bash_command='date'
)
prepare_scd2 = BashOperator(
task_id='prepare_scd2',
bash_command='date'
)
load_final_table = PythonOperator(
task_id="load_final_table",
python_callable=load_final_table_status
)
read_incr_data >> prepare_scd2 >> load_final_table
read_incr_data >> load_final_table
Post a Comment for "Is It Possible For Airflow Scheduler To First Finish The Previous Day's Cycle Before Starting The Next?"