can not use output of task decorator as input for external_task_ids of ExternalTaskSensor

See original GitHub issue

Apache Airflow version

2.4.3, 2.5.0

What happened

when use output from task decorator as parameter (external_task_ids) in ExternalTaskSensor, it show up this log:

Broken DAG: [+++++/airflow/dags/TEST_NEW_PIPELINE.py] Traceback (most recent call last):
  File "+++++/env3.10.5/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 408, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "+++++/env3.10.5/lib/python3.10/site-packages/airflow/sensors/external_task.py", line 164, in __init__
    if external_task_ids and len(external_task_ids) > len(set(external_task_ids)):
TypeError: object of type 'PlainXComArg' has no len()

note: +++++ is just a mask for irrelevant information.

What you think should happen instead

this document https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html show that we can use it without any warning, note about it.

found relative problem in https://github.com/apache/airflow/issues/27328 should move all the check in __init__ into poke method.

How to reproduce

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime

configure = {"dag_id": "test_new_skeleton",
                    "schedule": None,
                    "start_date": datetime(2022,1,1),
}

@task
def preprocess_dependency() -> list:
    return ["random-task-name"]

@dag(**configure)
def pipeline(): 
    t_preprocess = preprocess_dependency()
    task_dependency = ExternalTaskSensor(task_id=f"Check_Dependency",
                                         external_dag_id='random-dag-name-that-exist',
                                         external_task_ids=t_preprocess ,
                                         poke_interval=60,
                                         mode="reschedule",
                                         timeout=172800,
                                         allowed_states=['success'],
                                         failed_states=['failed', 'skipped'],
                                         check_existence=True,)


dag = pipeline()

Operating System

REHL 7

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Issue Analytics

  • State:open
  • Created 10 months ago
  • Comments:9 (5 by maintainers)

github_iconTop GitHub Comments

1reaction
potiukcommented, Dec 9, 2022

Ah ok. Thanks. yep. Not fixed it seems.

0reactions
MaiHoangViet1809commented, Dec 9, 2022

the log from v2.5.0 is below:

Broken DAG: [/********/airflow/dags/reproduce_error_for_27952.py] Traceback (most recent call last):
  File "/********/env3.10.5/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 411, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/********/env3.10.5/lib/python3.10/site-packages/airflow/sensors/external_task.py", line 165, in __init__
    if external_task_ids and len(external_task_ids) > len(set(external_task_ids)):
TypeError: object of type 'PlainXComArg' has no len()

note: masked path with ********

Read more comments on GitHub >

github_iconTop Results From Across the Web

Airflow: ExternalTaskSensor doesn't work as expected ...
As you can see, I'm trying to emulate the situation when the parent task fails if err is specified in the text file...
Read more >
airflow.sensors.external_task — Airflow Documentation
Use this operator to indicate that a task on a different DAG depends on this task. ... By default, the ExternalTaskSensor will wait...
Read more >
Cross-DAG dependencies | Astronomer Documentation
In Airflow 2.4 and later, you can use datasets to create data-driven ... This operator allows you to have a task in one...
Read more >
Airflow 2.0: DAG Authoring Redesigned - Medium
The next thing that makes up the TaskFlow API is the @task decorator. ... that will be used to serialize the task's output...
Read more >
Python Decorator: What, Why, and How | by Zijing Zhu
Now, if we put input as 3, the ensure_ceiling function will use the wrapper function inside to produce a value, which is add_five(3)....
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found