SerializedDagNotFound: DAG not found in serialized_dag table

See original GitHub issue

Apache Airflow version

2.1.4 (latest released)

Operating System

Linux 5.4.149-73.259.amzn2.x86_64

Versions of Apache Airflow Providers

No response

Deployment

Other 3rd-party Helm chart

Deployment details

AWS EKS over own helm chart

What happened

We have an issue back from 2.0.x #13504 Each time scheduler is restarted it deletes all DAGS deom serialized_dag table and trying to serialize them again from the scratch. Afterwards scheduler pod become failed with error:

[2021-10-08 20:19:40,683] {kubernetes_executor.py:761} INFO - Shutting down Kubernetes executor                                                                                            
[2021-10-08 20:19:41,705] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 32                                                                                                 
[2021-10-08 20:19:42,207] {process_utils.py:207} INFO - Waiting up to 5 seconds for processes to exit...                                                                                   
[2021-10-08 20:19:42,223] {process_utils.py:66} INFO - Process psutil.Process(pid=32, status='terminated', exitcode=0, started='20:19:40') (32) terminated with exit code 0                
[2021-10-08 20:19:42,225] {process_utils.py:66} INFO - Process psutil.Process(pid=40, status='terminated', started='20:19:40') (40) terminated with exit code None                         
[2021-10-08 20:19:42,226] {process_utils.py:66} INFO - Process psutil.Process(pid=36, status='terminated', started='20:19:40') (36) terminated with exit code None                         
[2021-10-08 20:19:42,226] {scheduler_job.py:722} INFO - Exited execute loop                                                                                                                
Traceback (most recent call last):                                                                                                                                                         
  File "/home/airflow/.local/bin/airflow", line 8, in <module>                                                                                                                             
    sys.exit(main())                                                                                                                                                                       
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/__main__.py", line 40, in main                                                                                            
    args.func(args)                                                                                                                                                                        
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command                                                                                   
    return func(*args, **kwargs)                                                                                                                                                           
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper                                                                                        
    return f(*args, **kwargs)                                                                                                                                                              
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 70, in scheduler                                                                 
    job.run()                                                                                                                                                                              
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 245, in run                                                                                       
    self._execute()                                                                                                                                                                        
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 695, in _execute                                                                             
    self._run_scheduler_loop()                                                                                                                                                             
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 788, in _run_scheduler_loop                                                                  
    num_queued_tis = self._do_scheduling(session)                                                                                                                                          
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 927, in _do_scheduling                                                                       
    num_queued_tis = self._critical_section_execute_task_instances(session=session)                                                                                                        
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 551, in _critical_section_execute_task_instances                                             
    queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)                                                                                                       
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper                                                                                    
    return func(*args, **kwargs)                                                                                                                                                           
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 431, in _executable_task_instances_to_queued                                                 
    serialized_dag = self.dagbag.get_dag(dag_id, session=session)                                                                                                                          
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper                                                                                    
    return func(*args, **kwargs)                                                                                                                                                           
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 186, in get_dag                                                                                   
    self._add_dag_from_db(dag_id=dag_id, session=session)                                                                                                                                  
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db                                                                          
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")                                                                                                       
airflow.exceptions.SerializedDagNotFound: DAG 'aws_transforms_player_hourly' not found in serialized_dag table                                                                             

causing All DAGs to be absent in serialized_dag table

Python version: 3.9.7
Airflow version: 2.1.4
Node: airflow-webserver-7b45758f99-rk8dg
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/auth.py", line 49, in decorated
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", line 97, in view_func
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", line 60, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/views.py", line 2027, in tree
    dag = current_app.dag_bag.get_dag(dag_id)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 186, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'canary_dag' not found in serialized_dag table

What you expected to happen

Scheduler shouldn’t fail

How to reproduce

restart scheduler pod observe its failure open dag in webserver observe an error

Anything else

issue is temporary gone when i’ve run “serialize” script from webserver pod until next scheduler reboot

from airflow.models import DagBag
from airflow.models.serialized_dag import SerializedDagModel

dag_bag = DagBag()

# Check DB for missing serialized DAGs, and add them if missing
for dag_id in dag_bag.dag_ids:
    if not SerializedDagModel.get(dag_id):
        dag = dag_bag.get_dag(dag_id)
        SerializedDagModel.write_dag(dag)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Issue Analytics

  • State:closed
  • Created 2 years ago
  • Reactions:1
  • Comments:12 (9 by maintainers)

github_iconTop GitHub Comments

3reactions
easontmcommented, Oct 10, 2021

I’m experiencing a similar issue. If a task is in the scheduled state and the DAG code is temporarily removed (e.g. part of DAG CICD), the DAG processor will delete the associated information in serialized_dag while the task still exists. Then if the scheduler tries to transition the task from scheduled to queued before the new code is serialized, it will crash the scheduler. Upon reboot, one of the first things the scheduler tries to do is adopt orphan tasks, and adoption is attempted before DAG serialization, resulting in a crashloop.

1reaction
uranusjrcommented, Oct 16, 2021

I believe we already fixed this in #18554 (therefore in 2.2.0). cc @ephraimbuddy

Read more comments on GitHub >

github_iconTop Results From Across the Web

Airflow 2.0 - Scheduler is unable to find ... - Stack Overflow
When I check the table manually after this error, I am able to see the DAG entry in it. This issue is not...
Read more >
airflow.models.serialized_dag — Airflow Documentation
serialized_dag table is a snapshot of DAG files synchronized by scheduler. ... If the record already exists, it checks if the Serialized DAG...
Read more >
Enabling and disabling DAG serialization | Cloud Composer
Enabling DAG serialization forces the scheduler to process DAG files before they are sent to the web server. The web server does not...
Read more >
Source code for airflow.models.serialized_dag
[docs]class SerializedDagModel(Base): """A table for serialized DAGs. ... If No or the DAG does not exists, updates / writes Serialized DAG to DB...
Read more >
[GitHub] [airflow] warrenstephens commented on issue #15607
SerializedDagNotFound : DAG 'tutorial' not found in serialized_dag table ``` -- This is an automated message from the Apache Git Service.
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found