can not use output of task decorator as input for external_task_ids of ExternalTaskSensor
See original GitHub issueApache Airflow version
2.4.3, 2.5.0
What happened
when use output from task decorator as parameter (external_task_ids) in ExternalTaskSensor, it show up this log:
Broken DAG: [+++++/airflow/dags/TEST_NEW_PIPELINE.py] Traceback (most recent call last):
File "+++++/env3.10.5/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 408, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File "+++++/env3.10.5/lib/python3.10/site-packages/airflow/sensors/external_task.py", line 164, in __init__
if external_task_ids and len(external_task_ids) > len(set(external_task_ids)):
TypeError: object of type 'PlainXComArg' has no len()
note: +++++ is just a mask for irrelevant information.
What you think should happen instead
this document https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html show that we can use it without any warning, note about it.
found relative problem in https://github.com/apache/airflow/issues/27328 should move all the check in __init__ into poke method.
How to reproduce
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime
configure = {"dag_id": "test_new_skeleton",
"schedule": None,
"start_date": datetime(2022,1,1),
}
@task
def preprocess_dependency() -> list:
return ["random-task-name"]
@dag(**configure)
def pipeline():
t_preprocess = preprocess_dependency()
task_dependency = ExternalTaskSensor(task_id=f"Check_Dependency",
external_dag_id='random-dag-name-that-exist',
external_task_ids=t_preprocess ,
poke_interval=60,
mode="reschedule",
timeout=172800,
allowed_states=['success'],
failed_states=['failed', 'skipped'],
check_existence=True,)
dag = pipeline()
Operating System
REHL 7
Versions of Apache Airflow Providers
No response
Deployment
Virtualenv installation
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
Issue Analytics
- State:
- Created 10 months ago
- Comments:9 (5 by maintainers)
Top Results From Across the Web
Airflow: ExternalTaskSensor doesn't work as expected ...
As you can see, I'm trying to emulate the situation when the parent task fails if err is specified in the text file...
Read more >airflow.sensors.external_task — Airflow Documentation
Use this operator to indicate that a task on a different DAG depends on this task. ... By default, the ExternalTaskSensor will wait...
Read more >Cross-DAG dependencies | Astronomer Documentation
In Airflow 2.4 and later, you can use datasets to create data-driven ... This operator allows you to have a task in one...
Read more >Airflow 2.0: DAG Authoring Redesigned - Medium
The next thing that makes up the TaskFlow API is the @task decorator. ... that will be used to serialize the task's output...
Read more >Python Decorator: What, Why, and How | by Zijing Zhu
Now, if we put input as 3, the ensure_ceiling function will use the wrapper function inside to produce a value, which is add_five(3)....
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
Ah ok. Thanks. yep. Not fixed it seems.
the log from v2.5.0 is below:
note: masked path with ********