본문 바로가기

공부

Airflow 개념과 DAG 파라미터 정리

반응형

 

Airflow

Airflow는 일련의 일들을 순차적으로 진행시켜주는 프로젝트 관리자라고 생각하면 된다.

 

Airflow에서 알아야 하는 개념

- Workflow(워그 플로우) : Workflow는 의존성으로 연결된 작업들의 집합

- DAG(Directed Acyclic Graph, 방향이 있는 비순환 그래프)

- Task : DAG의 단위 작업

 

Airflow 구성 요소

- Airflow User 

- 웹서버

- 스케쥴러

- metastore : 여러 DAG 정보들이 있는 저장소

- Executor : 작업 배치

- Worker

- Operator

Operator를 실행(세팅)하면 Task

 

추가로, Airflow는 실시간, 초단위로 진행되어야 하는 것에는 적합하지 않는다. 하루에 한번, 일주일에 한번 진행되는 공정에 관리하는데 적합

 

Operator 분류

- Action Operator : 간단한 연산 수행 오퍼레이터, airflow.operator 모듈 아래에서 존재

- Transfer Operator : 데이터를 옮기는 오퍼레이터, <출발>To<도착>Operator 꼴

- Sensor : Task를 언제 실행시킬 트리거(이벤트)를 기다리는 특별한 타입의 Operator

(ex> 특정 폴더에 데이터가 쌓여지기를 기다리거나, 요청에 대한 응답이 확인되기를 기다리는 등)

 

보통 사용하는 Operator

- PythonOperator : 파이썬 코드를 돌리는 작업할 때 사용

- BashOperator : bash 명령어 실행 작업

- SqliteOperator : SQL DB 사용관 관련된 작업

- SimpleHttpOperator : HTTP요청(request)을 보내고, 응답(response) 텍스트를 받는 작업

- HttpSensor : 응답(response)하는지 확인할 때 사용하는 센서

 

 


Airflow 환경 설정

1. 새로운 conda 환경을 하나 만들어준다. 파이썬은 원하는 버전(3.X)과, 환경이름은 원하는 환경 이름(airflow_env)으로 하면 된다. 그리고 만든 새로운 환경을 활성화해준다.

conda create -n airflow_env python=3.X
conda activate airflow_env

 

2. Airflow를 설치한다. 디렉토리에 airflow라는 폴더가 하나 생겼을 것이다.

pip3 install apache-airflow

 

 

3. airflow 라고 한 번 쳐서 잘 설치되었는지 확인하자. 사용할 수 있는 명령어에 대한 설명이 나온다면 성공.

 


 

코드 톺아보기(0)

0. DAG 틀 생성

~/airflow/dags 폴더 안에 파일을 만들어서 진행한다.

 

DAG의 기본 설정이 필요한데,

DAG id(유일), DAG를 돌릴 주기, 태그 등의 설정을 해야 한다.

 

DAG의 기본 설정은 아래와 같이 할 수 있다.

@dag(default_args=특정값Arguments().get(),
    description="A process",
    schedule_interval="5 * * * *",
    start_date=pendulum.datetime(2024, 12, 27, 10, tz="UTC"),
    catchup=False,
    tags=["A-1", "A-2", "A-3"])

 

DAG 필수 파라미터

dag_id DAG의 고유 식별자로 사용된다.

 

default_args 

: DAG에 대한 기본 인수 정의로, DAG의 start_date, owner. retries 등 설정 가능

 

description

: 해당 DAG를 설명하는 부분으로, UI 에서 DAG의 설명을 확인 가능

 

schedule_interval

: DAG 실행 스케쥴 정의 (cron 표현식) 

- '@daily', '@hourly' 등과 같이 설정 가능

- 해당 "5 * * * *" 값은 매시간 5분마다 실행되는 스케쥴을 의미

 

start_date=pendulum.datetime(2024, 12, 27, 10, tx="UTC")

: DAG 시작 날짜 및 시간 설정

- 해당 pendulum.datetime(2024, 12, 27, 10, tz="UTC") 값은 2024년 12월 27일 오전 10시(UTC)부터 실행 시작

 

catchup = False

- True일 경우, start_date 부터 현재 시점가지 실행되지 않은 모든 DAG 실행을 보상 실행

- False 이니, DAG가 과거 실행 시간을 보상하지 않고, 현재 시점 이후부터 스케쥴 실행

 

DAG 선택 파라미터

owner 

: DAG를 생선한 사람이나 팀 이름, 식별자를 나타냄

- Airflow UI에서 DAG 검색 시, 유용

- 이메일 알림 등에서 DAG 식별 유용

 

max_active_runs

: DAG를 동시에 실행할 수 있는 최대 활성 실행 수를 제한

- 기본값은 16

 

concurrency

: DAG에서 동시에 실행할 수 있는 최대 테스트 수 제한

- 기본값 16

 

catchup_by_default

: 새 DAG 생성 시, 기본값으로 catchup이 활성화될지 여부를 설정

- 기본값은 True

 

sla

: DAG에서 Service Level Agreement(SLA)를 정의, 테스크의 실행 시간 제한 등을 설정 가능

 

on_failure_callback, on_success_callback

: DAG/Task 실행 성공 시, 호출되는 콜백 함수를 지정

 

end_date

: DAG의 종료 시간 정의

 

retries

: 실패한 task를 다시 시도하는 횟수 설정

retry_delay

: 실패한 task를 다시 시도하기 전 대기 시간 설정

 

email

: DAG와 관련된 이메일을 보내는데 사용, 실패시 알림 메일을 보냄

email_on_failure

: DAG 실패 시, 이메일 알림을 보낼지 여부 결정

email_on_retry

: Task 재시도 시, 이메일 알림을 보낼지 여부 결정

 

trigger_rule 

: 다른 Task가 완료될 때 실행할 지, 실패할때 실행할지 등 실행 시점으르 결정

 

dagrun_timeout 

: DAG 실행 시간 제한 설정

 

depends_on_past

: 이전 DAG 실행이 성공했는지 여부에 따라 현재 DAG 실행을 조건부로 설정

 

default_view 

: DAG의 기본 보기 모드 지정

- 'tree', 'graph', 'duration', 'gantt' 등의 값을 가질 수 있음

 

orientation

- DAG 그래프를 그릴 때 노드와 엣지의 방향 결정

- 'TB'(상향), 'LR'(좌우) 등의 값을 가짐

 

params

: DAG 내에서 사용할 수 있는 일반적인 파라미터 정의

- 파라미터는 python 사전 형식으로 전달

 

pool

: DAG에서 사용할 수 있는 실행자 풀 지정, 동시 실행 가능한 작업자 수 제한할때 사용

 

priority_weight

: DAG의 실행 우선순위를 지정

- 높은 우선순위는 다른 DAG와 비교했을 때 먼저 실행되도록 보장

 

queue

: DAG에서 사용할 수 있는 실행 큐 지정, 실행자 풀과 비슷

 

 

참고 및 출처

- https://velog.io/@clueless_coder/Airflow-엄청-자세한-튜토리얼-왕초심자용

- https://mungyu.tistory.com/19

 

반응형