본문 바로가기

인프라,데이터

데이터 파이프라인 핵심 가이드 7


DAG(Directed Acyclic Graph)

  • 연결 방향이 존재하고 순회하지 않는 그래프. 
  • 한 작업이 완료된 이후에 다음 작업이 실행되며, 이전 작업으로 돌아가지 않는다. 

 

Airflow

  • 상호 종속성이 있는 여러 작업을 포함하는 워크플로(특히 데이터 파이프라인)을 모니터링
  • 파이썬으로 빌드되었지만 모든 언어 / 플랫폼에서 실행되는 작업을 실행 가능
  • 데이터베이스를 사용하여 DAG의 실행 기록, 에어플로우 구성과 관련된 메타데이터를 저장
    • 기본적으로 SQLite를 사용하지만, 상용에서는 MySQL 또는 Postgres DB를 사용하는 것이 좋음
    • sqlalchemy를 사용하여 DB에 손쉽게 연결 가능
    • SQL을 사용해 데이터 쿼리 가능 -> 파이프라인 성능을 분석하는 데 좋음
  • DAG는 파이썬 스크립트로 정의하며, 소스 파일은 작업의 일정과 정의, 작업 간의 종속성 등을 기술
  • 작업이 적절한 순서로 실행되는지 쉽게 확인할 수 있음
  • 파이프라인이 완료되거나 실행될 때 슬랙 채널에 알림, 이메일 전송, 데이터 유효성 실행 검사 등을 처리 가능

 

Airflow 실행기(Excutors)

  • 기본적으론 SequentialExecutor가 사용되지만, 한 번에 하나의 작업만 실행 가능. SQLite DB와 호환되는 유일한 실행 프로그램.
  • CeleryExecutor, DaskExecutor, KubernetesExecutor와 같은 다른 실행기를 사용하는 것이 좋음

 

연산자(Operator)

  • DAG에서 각 노드는 하나의 작업
  • 각 작업은 연산자를 구현하며, 연산자는 스크립트, 명령 및 기타 작업을 실행

 

DAG 구축

DB에서 데이터를 추출하여 DW에 로드한 다음, 데이터를 데이터 모델로 변환

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator \
    import BashOperator
from airflow.utils.dates import days_ago

dag = DAG(
    'simple_dag',
    description='A simple DAG',
    schedule_interval=timedelta(days=1), # 첫 번째 실행은 현재 날짜보다 하루 전 -> 즉시 실행됨
    start_date = days_ago(1),
)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=True,
    bash_command='sleep 3',
    dag=dag,
)

t3 = BashOperator(
    task_id='print_end',
    start_date = days_ago(1),
    depends_on_past=True,
    bash_command='echo \'end\'',
    dag=dag,
)


# 작업 간의 종속성을 정의
t1 >> t2
t2 >> t3

 

dag = DAG(
    'elt_pipeline_sample',
    description='A sample ELT pipeline',
    schedule_interval=timedelta(days=1),
    start_date = days_ago(1),
)

extract_orders_task = BashOperator(
    task_id='extract_order_data',
    bash_command='set -e; python /Users/jamesdensmore/airflow/dags/scripts/extract_orders.py',
    dag=dag,
)

 

  • PythonOperator로 파이썬 코드를 실행하려면 코드를 DAG 정의 파일에 작성하거나, DAG 정의 파일로 가져와야 함
  • 예제에서는 오케스트레이션과 오케스트레이션이 실행되는 프로세스의 로직을 분리, BashOperator를 사용
    • 코드 간에 호환되지 않는 파이썬 라이브러리의 잠재적인 문제를 피할 수 있음

 

DAG를 분할해야 하는 경우

  • 작업을 다른 일정으로 실행해야 하는 경우
    • 매일 한 번의 작업 / 30분마다 한 번씩 실행해야 하는 작업
  • 파이프라인이 관련이 없고 독립적인 경우
  • DAG가 너무 복잡해지는 경우 -> 논리적으로 분리할 수 있는가?

 

Sensor

  • 외부 작업 또는 프로세스의 상태를 확인한 다음, 확인 기준이 충족되면 DAG에서 다운스트림 종속성*을 계속 실행
    • 다운스트림 종속성 : 다운스트림은 최종 사용자에게 데이터를 전송하는 과정, 서버에서 로컬 기기로 전송되는 데이터의 흐름 등을 말하는데, 이 책에서는 DAG에서의 다음 작업을 말하는 것 같음.. 
  • DAG는 특정 일정에 따라 실행되므로 센서가 특정 DAG의 실행을 확인해야 함

 

관리형 에어플로우

  • 완전 관리형으로 제공, 구글 클라우드의 Cloud Composer와 Astronomer가 있음
  • 요금은 높아지지만 에어플로우의 관리 편의성이 높아짐
  • 자체 호스팅을 도와줄 수 있는 시스템 운영 팀이 있는지
    • 예산이 어느 정도인지
    • DAG의 작업이 복잡한지
    • 내부 데이터 및 시스템에 연결하는 것이 허용되는지에 따라 관리형 솔루션을 고려해볼 만 함

 

Luigi 및 Dagster, 머신러닝 파이프라인 오케스트레이션에 맞춰진 Kubeflow Pipelines도 좋은 대안.