본문 바로가기

인프라,데이터

데이터 파이프라인 핵심 가이드 8-10

8. 파이프라인의 데이터 검증

아무리 잘 설계된 파이프라인이라도 반드시 문제가 발생하기 마련이다.

데이터 자체의 품질과 유효성을 보장하기 위해서는 데이터 검증에 투자해야 한다.
테스트되지 않은 데이터는 분석에 사용하기에 안전하지 않다고 가정하는 것이 좋다.

데이터 엔지니어라고 파이프라인에 흐르는 데이터의 내용은 무시하고, 분석가만 신뢰하면 안 된다. 
파이프라인의 각 단계에서 데이터를 검증하면 현재 단계에서 문제의 근본 원인을 찾을 수 있다.

데이터 수집 중에 잘못된 데이터가 데이터 웨어하우스로 유입될 가능성이 높다.

  • 잘못된 데이터는 소스 시스템의 작동 자체에는 영향을 미치지 않을 수 있다.
  • 레코드의 연결이 끊어져도 소스 시스템이 정상적으로 작동할 수 있다.

 

데이터 수집 프로세스 자체가 데이터 품질 문제를 야기할 가능성이 있다.

  • 수집 중 추출 또는 로드 단계에서 시스템 중단 또는 시간 초과
  • 증분 수집의 논리적 오류 - 표준 시간대 불일치, 잘못된 연산자의 사용으로도 중복 레코드를 가져올 수 있다. 
  • 데이터가 CSV와 같은 플랫 파일로 변환될 때 예기치 않은 특수 문자나 기타 인코딩이 포함된 경우가 있다.

데이터 엔지니어는 데이터 파이프라인 전반에 걸쳐 데이터 검증을 정의하고 실행하는 데 필요한 툴을 제공해야 한다.
행 개수 및 중복 레코드 파악과 같은 항목은 데이터 엔지니어가 파이프라인 초기에 검증에 참여해야 한다.

한 쌍의 SQL 스크립트를 실행하고 비교 연산자를 기반으로 둘을 비교하는 파이썬 스크립트로 검증 테스트를 할 수 있다.

# execute a test made of up two scripts
# and a comparison operator
# Returns true/false for test pass/fail
def execute_test(
        db_conn,
        script_1,
        script_2,
        comp_operator):

    # execute the 1st script and store the result
    cursor = db_conn.cursor()
    sql_file = open(script_1, 'r')
    cursor.execute(sql_file.read())

    record = cursor.fetchone()
    result_1 = record[0]
    db_conn.commit()
    cursor.close()

    # execute the 2nd script and store the result
    cursor = db_conn.cursor()
    sql_file = open(script_2, 'r')
    cursor.execute(sql_file.read())

    record = cursor.fetchone()
    result_2 = record[0]
    db_conn.commit()
    cursor.close()

    print("result 1 = " + str(result_1))
    print("result 2 = " + str(result_2))

    # compare values based on the comp_operator
    if comp_operator == "equals":
        return result_1 == result_2
    elif comp_operator == "greater_equals":
        return result_1 >= result_2
    elif comp_operator == "greater":
        return result_1 > result_2
    elif comp_operator == "less_equals":
        return result_1 <= result_2
    elif comp_operator == "less":
        return result_1 < result_2
    elif comp_operator == "not_equal":
        return result_1 != result_2

    # if we made it here, something went wrong
    return False
  • 에어플로우 DAG에서도 사용할 수 있다.
    • 유효성 검사 테스트 수는 실행되는 DAG 작업 수에 비례해야 한다.
    • 파이프라인의 각 단계를 테스트하고 있는지 확인해야 한다. 
  • 데이터 검증 테스트가 실패하고, 최종 작업에서 데이터의 오류가 일어나면 안 될 경우 DAG를 중지해야 한다.
    일반적으로 오래된 데이터가 잘못된 데이터보다 좋다.
  • 오류를 발생시키고 파이프라인을 중지할지, 슬랙 채널에 경고를 보낼지에 대한 결정은 비즈니스 상황과 데이터 사용 사례를 기반으로 이루어져야 한다. 
  • 일부 데이터 수집 도구에는 행 수 변경, 열에서 예기치 않은 값 등을 확인하는 기능이 포함되어 있다.

 

dbt

  • 데이터 모델 개발 프레임워크로, 데이터 데스트 과정이 포함되어 있다

 

9. 파이프라인 유지 관리 모범 사례

소스 시스템의 변경 사항 처리 - 추상화 도입

  • Postgres db에서 직접 데이터를 수집하는 대신 데이터베이스에서 데이터 추출을 위해 쿼리할 수 있는 REST API를 구축하는 것을 고려

 

데이터 계약 유지 관리

데이터 계약 :

  • 소스 시스템의 소유자, 해당 시스템에서 데이터를 수집하는 팀 간의 서면 계약.
  • 데이터 추출이 어떤 방법(전체, 증분)으로 얼마나 자주 이루어지며, 소스 시스템과 수집 모두에 대한 연락처가 누구(사람, 팀)인지 명시해야 한다.
  • 깃허브 또는 내부 문서 사이트와 같이 잘 알려져 있고 찾기 쉬운 위치에 저장해야 한다.
  • 가능하면 데이터 계약을 표준화 형태로 지정하여 개발 프로세스에 통합하거나, 프로그램이 방식으로 쿼리할 수 있게 한다.

PR이 제출되거나 코드가 브랜치에 커밋될 때 테이블에 대한 변경사항(스키마 또는 로직)을 찾는 Git hook을 빌드해서, 테이블의 변경 사항을 조정하기 위해 누구에게 연락해야 하는지를 기고자에게 자동으로 알린다. 

 

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.

데이터베이스를 모니터링하며 데이터베이스에 커밋된 작은 변화라도 애플리케이션에서 스트림으로 반영되게 한다. 

 

10. 파이프라인 성능 측정 및 모니터링

나와 이해 관계자에게 중요한 것을 식별한다.

  • 얼마나 많은 검증 테스트가 실행되고 실행된 총 테스트의 몇 퍼센트가 통과하는가?
  • 특정 DAG가 실행되는 빈도
  • 파이프라인의 총 런타임
  • 지표는 중요한 것만 최대 2-3개 선택하는 것이 좋다.
    측정 대상이 겹치지 않고 각각 고유한 목적을 갖도록 하는 것도 중요하다.

 

데이터 유효성 검사기에 로깅 추가

Redshift등의 DW에 데이터 유효성 검사의 스크립트와 결과를 로그로 저장한다.

def log_result(
        db_conn,
        script_1,
        script_2,
        comp_operator,
        result):
    m_query = """INSERT INTO validation_run_history(
                    script_1,
                    script_2,
                    comp_operator,
                    test_result,
                    test_run_at)
                VALUES(%s, %s, %s, %s,
                    current_timestamp);"""

    m_cursor = db_conn.cursor()
    m_cursor.execute(
                m_query,
                (script_1,
                    script_2,
                    comp_operator,
                    result)
            )
    db_conn.commit()

    m_cursor.close()
    db_conn.close()

    return
    
    
if __name__ == "__main__":
    
    # execute the validation test
    test_result = execute_test(
                    db_conn,
                    script_1,
                    script_2,
                    comp_operator)

    # log the test in the data warehouse
    log_result(
        db_conn, 
        script_1,
        script_2,
        comp_operator,
        test_result)

하지만 검증 테스트 결과와 같이 대용량 로그 데이터를 생성하려는 경우
Splunk, SumoLogic 또는 ELK 스택과 같은 로그 분석 인프라로 라우팅하는 것을 고려해보는 것이 좋다.
이러한 플랫폼은 대용량의 소규모 쓰기 작업(로그 항목이 일반적임)에서 잘 수행되도록 설계된 반면,

Snowflake 및 Redshift와 같은 데이터 웨어하우스는 대량 데이터 수집 성능이 더 우수하다. 

 

마지막으로, 검증 테스트의 결과는 데이터 팀 및 이해 관계자와 공유하는 것이 필수적이다!