<스파크 완벽 가이드> 책의 내용을 정리하는 글입니다.
Part 1의 내용은 앞으로 이 책이 어떤 내용을 다룰지, 각 내용은 어떤 것인지에 대한 개괄적인 설명으로 구성되어 있습니다.
스파크의 등장 배경
2005년까지, 컴퓨터 프로세스는 매년 더 많은 양의 명령어를 처리할 수 있게 발전했습니다. 그 결과 애플리케이션들도 빨라졌죠. 이떄까지는 대규모의 데이터 처리를 프로세서의 성능 향상에 맡겼습니다.
하지만 2005년경, 하드웨어의 성능 향상이 멈추면서 하드웨어 개발자들은 모든 코어가 같은 속도로 동작하는 병렬 CPU 코어를 더 많이 추가했습니다. 데이터를 저장하는데 드는 비용은 14개월마다 절반으로 줄었기에, 데이터 수집 비용은 극히 저렴해졌습니다. 자연스레 사람들이 수집하는 데이터의 양은 많아졌습니다.
아파치 스파크는 데이터를 연산하는 통합 컴퓨팅 엔진이며 클러스터 환경에서 데이터를 병렬로 처리하는 라이브러리 집합입니다.
(클러스터 환경이란 여러 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용하는 것을 말합니다.)
우버나 넷플릭스, NASA같이 거대한 규모의 데이터셋을 사용하는 회사나 기관이 스파크를 사용합니다.
아파치 스파크의 철학
스파크는 통합적입니다. 코드와 기존 라이브러리를 사용해 애플리케이션을 만들 수 있고, 사용자가 직접 스파크 기반의 라이브러리를 만들 수도 있습니다. 스파크의 API를 활용하면 SQL쿼리로 데이터를 읽고 ML라이브러리로 머신러닝 모델을 평가해야 하는 경우에, 기존의 두 단계를 한 단계로 병합하고 데이터를 한 번만 조회할 수 있게 합니다.
환경
스파크는 내부에 데이터를 오랜 시간 저장하지 않습니다. 대신 연산 기능에 초점을 맞춥니다.
스파크는 파이썬, 자바, 스칼라, R을 지원하며 SQL뿐만 아니라 스트리밍, 머신러닝에 이르기까지 넓은 범위의 라이브러리를 제공합니다.
단일 노트북 환경에서부터 수천 대의 서버로 구성된 클러스터까지 다양한 환경에서 실행될 수 있습니다.
지원되는 데이터 저장소
스파크는 데이터를 연산한 후 Azure, S3, Hadoop, Cassandra, Kafka 등의 저장소를 지원합니다. 여러 저장소를 지원하기 때문에 사용자는 데이터 저장 위치에 상관없이 데이터 처리에 집중할 수 있습니다.
라이브러리
스파크는 엔진에서 지원하는 표준 라이브러리와 오픈소스 커뮤니티의 다양한 외부 라이브러리를 지원합니다. 스파크 SQL, 머신러닝을 지원하는 MLlib, 스트림 처리 기능을 지원하는 스파크 스트리밍과 그래프 분석 엔진인 GraphX 라이브러리를 제공합니다.
스파크 애플리케이션
드라이버 : 클러스터 노드 중 하나에서 실행되며 main() 함수를 실행합니다.
스파크 애플리케이션의 정보의 유지 관리, 사용자 프로그램이나 입력에 대한 응답,
전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포 & 스케줄링 역할을 수행하기 때문에 필수적입니다.
애플리케이션의 수명 주기 동안 관련 정보를 모두 유지합니다.
익스큐터 : 드라이버 프로세스가 할당한 작업을 수행합니다.
드라이버가 할당한 코드를 실행하고, 진행 상황을 다시 드라이버 노드에 보고합니다.
사용자는 각 노드에 할당할 익스큐터 수를 지정할 수 있습니다.
스파크는 사용 가능한 자원을 파악하기 위해 클러스터 매니저를 사용하며,
드라이버 프로세스는 주어진 작업을 완료하기 위해 익스큐터에서 명령을 실행합니다.
DataFrame
스파크 DataFrame은 테이블의 데이터를 로우(Row)와 칼럼(Col)으로 단순하게 표현합니다.
파이썬과 R의 DataFrame은 일반적으로 분산 컴퓨터가 아닌 단일 컴퓨터에 존재하는데, 이런 상황에서는 DataFrame으로 수행할 수 있는 작업이 해당 컴퓨터가 가진 자원에 따라 제한될 수 밖에 없죠.
스파크는 파이썬과 R의 DataFrame을 스파크 DataFrame으로 손쉽게 변환하는데, 그럼 데이터가 수천 대의 컴퓨터에 분산되게 됩니다. 단일 컴퓨터에 저장하기 큰 데이터나 계산에 오랜 시간이 걸리는 데이터도 처리할 수 있습니다.
파티션
스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라 불리는 청크 단위로 데이터를 분할합니다.
파티션은 클러스터의 물리적 머신에 존재하는 로우의 집합을 의미합니다.
DataFrame의 파티션은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식을 나타냅니다.
* 만약 파티션이 하나라면, 스파크에 수천 개의 익스큐터가 있더라도 병렬성은 1이 됩니다.
* 수백 개의 파티션이 있더라도 익스큐터가 하나밖에 없다면 병렬성은 1이 됩니다.
트랜스포메이션
스파크의 핵심 데이터 구조는 불변성을 가집니다. 한 번 생성하면 변경할 수 없습니다.
DataFrame을 변경하려면 트랜스포메이션을 활용하여 원하는 변경 방법을 스파크에 알려줘야 합니다.
divisBy2 = myRange.where("number % 2 = 0")
위의 코드는 추상적인 트랜스포메이션의 예시입니다. 데이터의 범위를 divisBy2라는 변수에 할당시키는데, 이를 지정만 한 상태이기 때문에 호출하지 않으면 실제 트랜스포메이션을 수행하지 않습니다.
좁은 트랜스포메이션을 사용하면 스파크에서 파이프라이닝* 을 자동으로 수행합니다.
파이프라이닝 : 명령어를 순차적으로 실행하는 프로세서에 적용되는 기술로, 한 번에 하나의 명령어만 실행하는 것이 아니라 하나의 명령어가 실행되는 도중에 다른 명령어를 실행을 시작하는 식으로 동시에 여러 개의 명령어를 실행하는 기법
넓은 트랜스포메이션을 사용하면 클러스터에서 파티션을 교환하는 셔플의 결과가 디스크에 저장됩니다.
지연 연산 Lazy evaluation
스파크에서의 지연 연산이란 액션* 전까지 연산이 실행되지 않는 것을 말합니다.
스파크는 특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고, 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성합니다. 스파크는 코드를 실행하는 마지막 순간까지 대기하다가 DataFrame 트랜스포메이션을 간결한 물리 실행 계획으로 컴파일합니다. 이 과정을 거치면서 전체 데이터 흐름을 최적화하는 엄청난 강점을 가지고 있습니다.
액션
사용자는 트랜스포메이션으 사용해 논리적 실행 계획*을 세우는데요,
실제 연산을 수행하기 위해서는 트랜스포메이션으로부터 결과를 계산하라고 지시하는 액션 명령을 내려야 합니다.
divisBy2.count()
전체 레코드 수를 반환하는 count 메서드도 액션입니다.
액션을 지정하면 스파크 잡job이 시작됩니다.
실행 계획은 트랜스포메이션의 지향성 비순환 그래프Directed Acyclic Graph, DAG이며 액션이 호출되면 결과를 만들어냅니다. DAG의 각 단계는 불변성을 가진 신규 DataFrame을 생성합니다.
flightData2015\ #1
.groupBy("DEST_COUNTRY_NAME")\ #2
.sum("count")\ #3
.withColumnRenamed("sum(count)", "destination_total")\ #4
.sort(desc("destination_total"))\ #5
.limit(5)\ #6
.explatin() #7
위의 코드는
1) 데이터를 읽고(액션이 호출되기 전까지는 데이터를 읽지 않습니다),
2) 데이터를 그룹화하고,
3) sum 메서드로 새로운 스키마 정보를 가지는 별도의 DataFrame을 생성하고(액션이 호출되기 전까지는 연산이 일어나지 않습니다)
4) 컬럼명을 변경합니다. (역시 트랜스포메이션이기에 여전히 연산은 일어나지 않습니다.)
5) 데이터를 정렬하고, Column 객체를 반환합니다.
6) 반환 결과의 수를 제한하고,
7) 액션을 수행합니다! 이 단계에서 드디어 DataFrame의 결과를 모으는 프로세스를 시작합니다.
처리가 끝나면 코드를 작성한 언어에 맞는 리스트나 배열을 반환합니다.
스파크 기능 둘러보기
구조적 API
Dataset
- 자바와 스칼라의 정적 데이터 타입에 맞는 정적 타입 코드를 지원하기 위해 고안되었습니다. 동적 타입 언어인 파이썬과 R에서는 사용할 수 없습니다.
- 타입 안정성을 지원하므로 초기화에 사용한 클래스 대신 다른 클래스를 사용해 접근할 수 없습니다. 다수의 소프트웨어 엔지니어가 잘 정의된 인터페이스로 상호작용하는 대규모 애플리케이션을 개발하는 데 특히 유용합니다.
구조적 스트리밍
스트림 처리용 고수준 API 입니다. 배치 모드의 연산을 스트리밍 방식으로 실행할 수 있으며, 지연 시간을 줄이고 증분(변경되거나 추가된 것) 처리할 수 있습니다.
배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고 값을 빠르게 얻을 수 있다는 장점이 있습니다.
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")
window : 집계 시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성합니다.
윈도우는 간격을 통해 처리 요건을 명시할 수 있기 때문에 날짜와 타임스탬프 처리에 유용합니다.
스파크는 관련 날짜의 데이터를 그룹화합니다.
* 스파크는 데이터를 처리하는 시점이 아닌 이벤트 시간에 따라 윈도우를 구성합니다. 이 방법으로 기존 스파크 스트리밍의 단점을 구조적 스트리밍으로 보완할 수 있습니다.
'인프라,데이터 > Spark' 카테고리의 다른 글
구글 코랩에서 Pyspark 사용하기 (0) | 2022.02.27 |
---|---|
스파크 완벽 가이드 Part 2 : 구조적 API (1) (0) | 2022.02.26 |