Airflow
Airflow는 일련의 일들을 순차적으로 진행시켜주는 프로젝트 관리자라고 생각하면 된다.
Airflow에서 알아야 하는 개념
- Workflow(워그 플로우) : Workflow는 의존성으로 연결된 작업들의 집합
- DAG(Directed Acyclic Graph, 방향이 있는 비순환 그래프)
- Task : DAG의 단위 작업
Airflow 구성 요소
- Airflow User
- 웹서버
- 스케쥴러
- metastore : 여러 DAG 정보들이 있는 저장소
- Executor : 작업 배치
- Worker
- Operator
Operator를 실행(세팅)하면 Task
추가로, Airflow는 실시간, 초단위로 진행되어야 하는 것에는 적합하지 않는다. 하루에 한번, 일주일에 한번 진행되는 공정에 관리하는데 적합
Operator 분류
- Action Operator : 간단한 연산 수행 오퍼레이터, airflow.operator 모듈 아래에서 존재
- Transfer Operator : 데이터를 옮기는 오퍼레이터, <출발>To<도착>Operator 꼴
- Sensor : Task를 언제 실행시킬 트리거(이벤트)를 기다리는 특별한 타입의 Operator
(ex> 특정 폴더에 데이터가 쌓여지기를 기다리거나, 요청에 대한 응답이 확인되기를 기다리는 등)
보통 사용하는 Operator
- PythonOperator : 파이썬 코드를 돌리는 작업할 때 사용
- BashOperator : bash 명령어 실행 작업
- SqliteOperator : SQL DB 사용관 관련된 작업
- SimpleHttpOperator : HTTP요청(request)을 보내고, 응답(response) 텍스트를 받는 작업
- HttpSensor : 응답(response)하는지 확인할 때 사용하는 센서
Airflow 환경 설정
1. 새로운 conda 환경을 하나 만들어준다. 파이썬은 원하는 버전(3.X)과, 환경이름은 원하는 환경 이름(airflow_env)으로 하면 된다. 그리고 만든 새로운 환경을 활성화해준다.
conda create -n airflow_env python=3.X
conda activate airflow_env
2. Airflow를 설치한다. 디렉토리에 airflow라는 폴더가 하나 생겼을 것이다.
pip3 install apache-airflow
3. airflow 라고 한 번 쳐서 잘 설치되었는지 확인하자. 사용할 수 있는 명령어에 대한 설명이 나온다면 성공.
코드 톺아보기(0)
0. DAG 틀 생성
~/airflow/dags 폴더 안에 파일을 만들어서 진행한다.
DAG의 기본 설정이 필요한데,
DAG id(유일), DAG를 돌릴 주기, 태그 등의 설정을 해야 한다.
DAG의 기본 설정은 아래와 같이 할 수 있다.
@dag(default_args=특정값Arguments().get(),
description="A process",
schedule_interval="5 * * * *",
start_date=pendulum.datetime(2024, 12, 27, 10, tz="UTC"),
catchup=False,
tags=["A-1", "A-2", "A-3"])
DAG 필수 파라미터
dag_id DAG의 고유 식별자로 사용된다.
default_args
: DAG에 대한 기본 인수 정의로, DAG의 start_date, owner. retries 등 설정 가능
description
: 해당 DAG를 설명하는 부분으로, UI 에서 DAG의 설명을 확인 가능
schedule_interval
: DAG 실행 스케쥴 정의 (cron 표현식)
- '@daily', '@hourly' 등과 같이 설정 가능
- 해당 "5 * * * *" 값은 매시간 5분마다 실행되는 스케쥴을 의미
start_date=pendulum.datetime(2024, 12, 27, 10, tx="UTC")
: DAG 시작 날짜 및 시간 설정
- 해당 pendulum.datetime(2024, 12, 27, 10, tz="UTC") 값은 2024년 12월 27일 오전 10시(UTC)부터 실행 시작
catchup = False
- True일 경우, start_date 부터 현재 시점가지 실행되지 않은 모든 DAG 실행을 보상 실행
- False 이니, DAG가 과거 실행 시간을 보상하지 않고, 현재 시점 이후부터 스케쥴 실행
DAG 선택 파라미터
owner
: DAG를 생선한 사람이나 팀 이름, 식별자를 나타냄
- Airflow UI에서 DAG 검색 시, 유용
- 이메일 알림 등에서 DAG 식별 유용
max_active_runs
: DAG를 동시에 실행할 수 있는 최대 활성 실행 수를 제한
- 기본값은 16
concurrency
: DAG에서 동시에 실행할 수 있는 최대 테스트 수 제한
- 기본값 16
catchup_by_default
: 새 DAG 생성 시, 기본값으로 catchup이 활성화될지 여부를 설정
- 기본값은 True
sla
: DAG에서 Service Level Agreement(SLA)를 정의, 테스크의 실행 시간 제한 등을 설정 가능
on_failure_callback, on_success_callback
: DAG/Task 실행 성공 시, 호출되는 콜백 함수를 지정
end_date
: DAG의 종료 시간 정의
retries
: 실패한 task를 다시 시도하는 횟수 설정
retry_delay
: 실패한 task를 다시 시도하기 전 대기 시간 설정
: DAG와 관련된 이메일을 보내는데 사용, 실패시 알림 메일을 보냄
email_on_failure
: DAG 실패 시, 이메일 알림을 보낼지 여부 결정
email_on_retry
: Task 재시도 시, 이메일 알림을 보낼지 여부 결정
trigger_rule
: 다른 Task가 완료될 때 실행할 지, 실패할때 실행할지 등 실행 시점으르 결정
dagrun_timeout
: DAG 실행 시간 제한 설정
depends_on_past
: 이전 DAG 실행이 성공했는지 여부에 따라 현재 DAG 실행을 조건부로 설정
default_view
: DAG의 기본 보기 모드 지정
- 'tree', 'graph', 'duration', 'gantt' 등의 값을 가질 수 있음
orientation
- DAG 그래프를 그릴 때 노드와 엣지의 방향 결정
- 'TB'(상향), 'LR'(좌우) 등의 값을 가짐
params
: DAG 내에서 사용할 수 있는 일반적인 파라미터 정의
- 파라미터는 python 사전 형식으로 전달
pool
: DAG에서 사용할 수 있는 실행자 풀 지정, 동시 실행 가능한 작업자 수 제한할때 사용
priority_weight
: DAG의 실행 우선순위를 지정
- 높은 우선순위는 다른 DAG와 비교했을 때 먼저 실행되도록 보장
queue
: DAG에서 사용할 수 있는 실행 큐 지정, 실행자 풀과 비슷
참고 및 출처
- https://velog.io/@clueless_coder/Airflow-엄청-자세한-튜토리얼-왕초심자용
- https://mungyu.tistory.com/19
'공부' 카테고리의 다른 글
DuckDB 란? (0) | 2025.03.19 |
---|---|
Airflow 에서 직면한 여러 환경 설정 오류 해결 (0) | 2025.03.12 |
[GIT] clone 시 The requested URL returned error: 403 + Support for password authentication was removed (0) | 2025.03.08 |
대용량 그래프 처리 시스템, Pregel (0) | 2025.03.05 |
[데이터 중심 애플리케이션 설계] 1장. 신뢰할 수 있고 확장 가능하며 유지보수하기 쉬운 어플리케이션 (0) | 2025.02.23 |