DAG(Directed Acyclic Graph)
- 연결 방향이 존재하고 순회하지 않는 그래프.
- 한 작업이 완료된 이후에 다음 작업이 실행되며, 이전 작업으로 돌아가지 않는다.
Airflow
- 상호 종속성이 있는 여러 작업을 포함하는 워크플로(특히 데이터 파이프라인)을 모니터링
- 파이썬으로 빌드되었지만 모든 언어 / 플랫폼에서 실행되는 작업을 실행 가능
- 데이터베이스를 사용하여 DAG의 실행 기록, 에어플로우 구성과 관련된 메타데이터를 저장
- 기본적으로 SQLite를 사용하지만, 상용에서는 MySQL 또는 Postgres DB를 사용하는 것이 좋음
- sqlalchemy를 사용하여 DB에 손쉽게 연결 가능
- SQL을 사용해 데이터 쿼리 가능 -> 파이프라인 성능을 분석하는 데 좋음
- DAG는 파이썬 스크립트로 정의하며, 소스 파일은 작업의 일정과 정의, 작업 간의 종속성 등을 기술
- 작업이 적절한 순서로 실행되는지 쉽게 확인할 수 있음
- 파이프라인이 완료되거나 실행될 때 슬랙 채널에 알림, 이메일 전송, 데이터 유효성 실행 검사 등을 처리 가능
Airflow 실행기(Excutors)
- 기본적으론 SequentialExecutor가 사용되지만, 한 번에 하나의 작업만 실행 가능. SQLite DB와 호환되는 유일한 실행 프로그램.
- CeleryExecutor, DaskExecutor, KubernetesExecutor와 같은 다른 실행기를 사용하는 것이 좋음
연산자(Operator)
- DAG에서 각 노드는 하나의 작업
- 각 작업은 연산자를 구현하며, 연산자는 스크립트, 명령 및 기타 작업을 실행
DAG 구축
DB에서 데이터를 추출하여 DW에 로드한 다음, 데이터를 데이터 모델로 변환
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator \
import BashOperator
from airflow.utils.dates import days_ago
dag = DAG(
'simple_dag',
description='A simple DAG',
schedule_interval=timedelta(days=1), # 첫 번째 실행은 현재 날짜보다 하루 전 -> 즉시 실행됨
start_date = days_ago(1),
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=True,
bash_command='sleep 3',
dag=dag,
)
t3 = BashOperator(
task_id='print_end',
start_date = days_ago(1),
depends_on_past=True,
bash_command='echo \'end\'',
dag=dag,
)
# 작업 간의 종속성을 정의
t1 >> t2
t2 >> t3
dag = DAG(
'elt_pipeline_sample',
description='A sample ELT pipeline',
schedule_interval=timedelta(days=1),
start_date = days_ago(1),
)
extract_orders_task = BashOperator(
task_id='extract_order_data',
bash_command='set -e; python /Users/jamesdensmore/airflow/dags/scripts/extract_orders.py',
dag=dag,
)
- PythonOperator로 파이썬 코드를 실행하려면 코드를 DAG 정의 파일에 작성하거나, DAG 정의 파일로 가져와야 함
- 예제에서는 오케스트레이션과 오케스트레이션이 실행되는 프로세스의 로직을 분리, BashOperator를 사용
- 코드 간에 호환되지 않는 파이썬 라이브러리의 잠재적인 문제를 피할 수 있음
DAG를 분할해야 하는 경우
- 작업을 다른 일정으로 실행해야 하는 경우
- 매일 한 번의 작업 / 30분마다 한 번씩 실행해야 하는 작업
- 파이프라인이 관련이 없고 독립적인 경우
- DAG가 너무 복잡해지는 경우 -> 논리적으로 분리할 수 있는가?
Sensor
- 외부 작업 또는 프로세스의 상태를 확인한 다음, 확인 기준이 충족되면 DAG에서 다운스트림 종속성*을 계속 실행
- 다운스트림 종속성 : 다운스트림은 최종 사용자에게 데이터를 전송하는 과정, 서버에서 로컬 기기로 전송되는 데이터의 흐름 등을 말하는데, 이 책에서는 DAG에서의 다음 작업을 말하는 것 같음..
- DAG는 특정 일정에 따라 실행되므로 센서가 특정 DAG의 실행을 확인해야 함
관리형 에어플로우
- 완전 관리형으로 제공, 구글 클라우드의 Cloud Composer와 Astronomer가 있음
- 요금은 높아지지만 에어플로우의 관리 편의성이 높아짐
- 자체 호스팅을 도와줄 수 있는 시스템 운영 팀이 있는지
- 예산이 어느 정도인지
- DAG의 작업이 복잡한지
- 내부 데이터 및 시스템에 연결하는 것이 허용되는지에 따라 관리형 솔루션을 고려해볼 만 함
Luigi 및 Dagster, 머신러닝 파이프라인 오케스트레이션에 맞춰진 Kubeflow Pipelines도 좋은 대안.
'인프라,데이터' 카테고리의 다른 글
데이터 중심 애플리케이션 설계 1장 정리 (0) | 2022.05.08 |
---|---|
데이터 파이프라인 핵심 가이드 8-10 (0) | 2022.04.24 |
데이터 파이프라인 핵심 가이드 6 (0) | 2022.04.10 |
데이터 파이프라인 핵심 가이드 1-3 (0) | 2022.04.03 |
스포티파이 시니어 데엔이 말하는 주니어 개발자를 위한 커리어 로드맵 (1) | 2022.03.29 |