Airflow scheduling only 1 dag at a time leaving other dags in a queued state

See original GitHub issue

Apache Airflow version

2.1.3 (latest released)

Operating System

Linux

Versions of Apache Airflow Providers

I don’t think it is relevant but can provide them upon request.

Deployment

Other Docker-based deployment

Deployment details

No response

What happened

Completed an update to airflow 2.1.3 on Thursday and received an error report over the weekend that jobs were not being run. Upon investigating I discovered that only 1 DAG was running with everything else stuck in the queued state. Glancing at the 1 running dag was a long running backfill (30 days, ETA 2 weeks left, ~180 dag runs) that has max_active_runs=1

The logs on airflow worker were normal however the logs on the scheduler were displaying the following error raise OSError("handle is closed") (See below for complete Logs).

Anyways restarting the scheduler & workers did nothing. However upon turning off this long running dag (another 1 just like it started after turning off that one too) all of the dags began scheduling normally.

What you expected to happen

Airflow should be able to continue scheduling dags normally regardless of the existence of this dag that is slowly catching up.

Note this may be related to one of the following: https://github.com/apache/airflow/issues/17975 https://github.com/apache/airflow/issues/13542 https://github.com/apache/airflow/pull/17945

EDIT: Upon reading https://github.com/apache/airflow/pull/17945 again it seems like this will resolve our issue. I’ll mark this as closed once I can verify that change fixes the issue (i.e. a new version is released)

How to reproduce

Steps to attempt to reproduce (Apologies for not going this far).

  1. Introduce a dag that has a large amount dag runs needed to catch up to current (catch_up=true)
  2. set the max_active_runs = 1 for this dag
  3. (probably a simple dag that sleeps for an hour on a task)
  4. submit the dag with a start date 2 years ago and runs daily
  5. Test and see if the dag can schedule other dags while the original dag is turned on.

Anything else

Airflow Scheduler Logs

Logs from after the issue started and after restarting the scheduler.

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2021-09-07 02:55:51,944] {scheduler_job.py:661} INFO - Starting the scheduler
[2021-09-07 02:55:51,944] {scheduler_job.py:666} INFO - Processing each file at most -1 times
[2021-09-07 02:55:52,055] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 11
[2021-09-07 02:55:52,058] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 02:55:52,062] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 02:59:34,363] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=612)
Process ForkProcess-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 02:59:35,405] {manager.py:401} WARNING - DagFileProcessorManager (PID=11) exited with exit code 1 - re-launching
[2021-09-07 02:59:35,410] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 819
[2021-09-07 02:59:35,418] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 03:00:52,433] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 03:00:52,441] {scheduler_job.py:1219} INFO - Marked 1 SchedulerJob instances as failed
[2021-09-07 03:00:52,889] {celery_executor.py:483} INFO - Adopted the following 1 tasks from a dead executor
	<TaskInstance: queue_backfill.f_queue_time_cdc 2020-12-24 05:00:00+00:00 [running]> in state STARTED
[2021-09-07 03:03:22,623] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=1626)
[2021-09-07 03:03:22,624] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=1626)
Process ForkProcess-2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 03:03:23,543] {manager.py:401} WARNING - DagFileProcessorManager (PID=819) exited with exit code 1 - re-launching
[2021-09-07 03:03:23,548] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 1806
[2021-09-07 03:03:23,557] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 03:05:54,184] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 03:07:42,551] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=2711)
Process ForkProcess-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 03:07:42,784] {manager.py:401} WARNING - DagFileProcessorManager (PID=1806) exited with exit code 1 - re-launching

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Issue Analytics

  • State:closed
  • Created 2 years ago
  • Comments:6 (5 by maintainers)

github_iconTop GitHub Comments

2reactions
r-richmondcommented, Apr 13, 2022

@pcolladosoto after reading the linked issue I think you may be right. Thank you so much for your write up and fix. It definitely looks like you found the root cause for the OsError I saw in the logs.

Now I/we wait for 2.3 😃

2reactions
pcolladosotocommented, Apr 13, 2022

Hi! After reading through the issue I think this might be related to #22191. Should this be the case, it has been fixed on #22685 and it should make its way to Airflow 2.3.0 🎉

Read more comments on GitHub >

github_iconTop Results From Across the Web

Scheduler — Airflow Documentation - Apache Airflow
The Airflow scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete. Behind the scenes, the scheduler...
Read more >
Airflow 1.9.0 is queuing but not launching tasks - Stack Overflow
The dag is very simple with 2 independent tasks only dependent on last run. There are also tasks in the same dag that...
Read more >
The Ultimate Guide on Airflow Scheduler - Learn - Hevo Data
Airflow Scheduler is a fantastic utility to execute your tasks. It can read your DAGs, schedule the enclosed tasks, monitor task execution, ...
Read more >
DAGs, Operators, Connections, and other issues in Apache ...
Describes common errors and resolutions to Apache Airflow v2 Python dependencies, custom plugins, DAGs, Operators, Connections, tasks, and Web server errors ...
Read more >
Troubleshooting DAGs | Cloud Composer
In the Airflow web interface, clear the past states for the DAG. ... workers that communicate to each other through a task queue...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found