DATA GROUND 로고DATA GROUND
DevOps

[Airflow] 특정 에러만 retry 처리하는 방법

S.H. Yoo
마지막 업데이트: 2024-04-08

Apache Airflow에서 특정 예외(Exception)에 대해서만 재시도를 수행하는 방법을 PythonOperator와 on_retry_callback 기준으로 정리합니다.

작성일: 2024-04-08작성자: S.H. Yoo마지막 업데이트: 2024-04-08

예외처리의 목적

Airflow 파이프라인에서는
자원 부족, 일시적인 네트워크 오류 등 재시도로 해결 가능한 오류가 자주 발생합니다.

예외처리의 주요 목적은 다음과 같습니다.

  • 일시적인 오류에 대해 자동 재시도를 수행하여 파이프라인 연속성 유지
  • 특정 오류 발생 시 파이프라인 흐름을 변경하거나 추가 작업 수행
  • 재시도 대상이 아닌 오류에 대해서는 즉시 실패 처리

예외처리 방법

1. PythonOperator를 사용한 처리

PythonOperator 내부에서
원하는 예외만 다시 발생시키는 방식으로 재시도를 제어할 수 있습니다.

  • retries, retry_delay로 재시도 정책 설정
  • 특정 에러에 대해서만 raise하여 재시도 트리거
  • KubernetesPodOperator 실행 중 발생하는 예외를 감싸 처리 가능

예시 코드

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from kubernetes.client.rest import ApiException
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

def run_pod():
    try:
        pod = KubernetesPodOperator(
            namespace='default',
            image='your-image',
            name='your-pod-name',
            task_id='your-task-id'
        )
    except ApiException as e:
        if 'UnexpectedAdmissionError' in str(e):
            raise Exception('Retrying due to UnexpectedAdmissionError')
        else:
            print(f'Error occurred: {str(e)}')

with DAG(
    'kubernetes_pod_operator_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    catchup=False,
) as dag:

    pod_creation_task = PythonOperator(
        task_id='pod_creation_python_task',
        python_callable=run_pod,
    )

장점

  • Python 코드로 세밀한 조건 분기 가능
  • 복잡한 예외 처리 시 유연성 높음

단점

  • Kubernetes Executor 환경에서는 Pod 내부에서 또 다른 Pod를 실행하는 구조가 될 수 있음

2. on_retry_callback을 사용한 처리

Airflow에서 제공하는
on_retry_callback을 사용하면 재시도 시점에 콜백 함수를 실행할 수 있습니다.

  • 재시도 발생 시 자동 호출
  • context 객체를 통해 태스크 상태 및 예외 정보 접근 가능
  • Operator 자체에 콜백 로직 연결 가능

예시 코드

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.utils.state import State
from datetime import datetime, timedelta

def my_retry_callback(context):
    print("Retry detected. Running retry callback logic.")
    ti = context["task_instance"]
    exception_raised = context.get("exception")

    if exception_raised.__class__.__name__ != "UnexpectedAdmissionError":
        print("Non-retryable error detected. Marking task as FAILED.")
        ti.set_state(State.FAILED)

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 1, 1),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'example_kubernetes_pod',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    catchup=False
) as dag:

    my_pod = KubernetesPodOperator(
        namespace='default',
        image='ubuntu:16.04',
        cmds=['bash', '-cx'],
        arguments=['echo', "hello"],
        name='my-pod',
        task_id='my_pod_task',
        get_logs=True,
        on_retry_callback=my_retry_callback,
    )

장점

  • Airflow 기본 기능 활용
  • Pod 중첩 실행 없이 재시도 로직 구현 가능

단점

  • 재시도 시점에만 개입 가능
  • 예외 발생 시점의 제어는 제한적

각 방식의 비교

구분PythonOperatoron_retry_callback
유연성높음. Python 코드로 세밀한 예외 제어 가능낮음. 재시도 시점 중심 제어
구현 난이도중간낮음
Pod 중첩 실행발생 가능발생하지 않음
추천 상황복잡한 분기 로직 필요 시단순 재시도 제어 시

마무리하며

Airflow에서의 재시도 전략은
모든 에러를 무조건 retry하는 것이 아니라,
재시도로 해결 가능한 오류만 선별하는 것이 핵심입니다.

  • 인프라 이슈 → retry
  • 로직/데이터 오류 → 즉시 실패

상황에 맞게
PythonOperator 또는 on_retry_callback 방식을 선택해 적용하면
보다 안정적인 파이프라인을 운영할 수 있습니다.


참고 자료

마지막 업데이트: 2024-04-08

당신이 관심있을 만한 글