[Airflow] 특정 에러만 retry 처리하는 방법
예외처리의 목적
Airflow 파이프라인에서는
자원 부족, 일시적인 네트워크 오류 등 재시도로 해결 가능한 오류가 자주 발생합니다.
예외처리의 주요 목적은 다음과 같습니다.
- 일시적인 오류에 대해 자동 재시도를 수행하여 파이프라인 연속성 유지
- 특정 오류 발생 시 파이프라인 흐름을 변경하거나 추가 작업 수행
- 재시도 대상이 아닌 오류에 대해서는 즉시 실패 처리
예외처리 방법
1. PythonOperator를 사용한 처리
PythonOperator 내부에서
원하는 예외만 다시 발생시키는 방식으로 재시도를 제어할 수 있습니다.
retries,retry_delay로 재시도 정책 설정- 특정 에러에 대해서만
raise하여 재시도 트리거 KubernetesPodOperator실행 중 발생하는 예외를 감싸 처리 가능
예시 코드
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from kubernetes.client.rest import ApiException
from datetime import timedelta
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
def run_pod():
try:
pod = KubernetesPodOperator(
namespace='default',
image='your-image',
name='your-pod-name',
task_id='your-task-id'
)
except ApiException as e:
if 'UnexpectedAdmissionError' in str(e):
raise Exception('Retrying due to UnexpectedAdmissionError')
else:
print(f'Error occurred: {str(e)}')
with DAG(
'kubernetes_pod_operator_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
pod_creation_task = PythonOperator(
task_id='pod_creation_python_task',
python_callable=run_pod,
)
장점
- Python 코드로 세밀한 조건 분기 가능
- 복잡한 예외 처리 시 유연성 높음
단점
- Kubernetes Executor 환경에서는 Pod 내부에서 또 다른 Pod를 실행하는 구조가 될 수 있음
2. on_retry_callback을 사용한 처리
Airflow에서 제공하는
on_retry_callback을 사용하면 재시도 시점에 콜백 함수를 실행할 수 있습니다.
- 재시도 발생 시 자동 호출
context객체를 통해 태스크 상태 및 예외 정보 접근 가능- Operator 자체에 콜백 로직 연결 가능
예시 코드
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.utils.state import State
from datetime import datetime, timedelta
def my_retry_callback(context):
print("Retry detected. Running retry callback logic.")
ti = context["task_instance"]
exception_raised = context.get("exception")
if exception_raised.__class__.__name__ != "UnexpectedAdmissionError":
print("Non-retryable error detected. Marking task as FAILED.")
ti.set_state(State.FAILED)
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'example_kubernetes_pod',
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False
) as dag:
my_pod = KubernetesPodOperator(
namespace='default',
image='ubuntu:16.04',
cmds=['bash', '-cx'],
arguments=['echo', "hello"],
name='my-pod',
task_id='my_pod_task',
get_logs=True,
on_retry_callback=my_retry_callback,
)
장점
- Airflow 기본 기능 활용
- Pod 중첩 실행 없이 재시도 로직 구현 가능
단점
- 재시도 시점에만 개입 가능
- 예외 발생 시점의 제어는 제한적
각 방식의 비교
| 구분 | PythonOperator | on_retry_callback |
|---|---|---|
| 유연성 | 높음. Python 코드로 세밀한 예외 제어 가능 | 낮음. 재시도 시점 중심 제어 |
| 구현 난이도 | 중간 | 낮음 |
| Pod 중첩 실행 | 발생 가능 | 발생하지 않음 |
| 추천 상황 | 복잡한 분기 로직 필요 시 | 단순 재시도 제어 시 |
마무리하며
Airflow에서의 재시도 전략은
모든 에러를 무조건 retry하는 것이 아니라,
재시도로 해결 가능한 오류만 선별하는 것이 핵심입니다.
- 인프라 이슈 → retry
- 로직/데이터 오류 → 즉시 실패
상황에 맞게
PythonOperator 또는 on_retry_callback 방식을 선택해 적용하면
보다 안정적인 파이프라인을 운영할 수 있습니다.