본문 바로가기

인프라,데이터/Spark

스파크 완벽 가이드 Part 2 : 구조적 API (1)

스파크

  • 트랜스포메이션*의 처리 과정을 정의하는 분산 프로그래밍 모델입니다. 
    트랜스포메이션 : 지향성 비순환 그래프(DAG)로 표현되는 명령을 만들어냅니다. 
  • 액션 : 하나의 잡을 클러스터에서 실행하기 위해 스테이지와 태스크로 나누고 DAG 처리 프로세스를 실행합니다.
    트랜스포메이션과 액션으로 다루는 논리적 구조가 바로 DataFrame과 Dataset입니다.
    새로운 DataFrame과 Dataset을 만들려면 트랜스포메이션을 호출해야 합니다.
    연산을 시작하거나 사용한 언어에 맞는 데이터 타입으로 변환하려면 액션을 호출해야 합니다. 

 

구조적 API

  • 비정형 로그 파일부터 반정형 CSV 파일, 파케이Parquet 파일까지 다양한 유형의 데이터를 처리할 수 있습니다.
  • 구조적 API에는 Dataset, DataFrame, SQL 테이블과 뷰와 같은 3가지 분산 컬렉션 API가 있습니다. 
  • 배치와 스트리밍 처리에서 구조적 API를 사용할 수 있습니다.
    (배치 작업을 스트리밍 작업으로, 스트리밍 작업을 배치 작업으로 손쉽게 변환할 수 있습니다)
  • 데이터 흐름을 정의하는 기본 추상화 개념입니다.

 

DataFrame과 DataSet

  • 잘 정의된 로우와 컬럼을 가지는 분산 테이블 형태의 컬렉션입니다.
  • 각 컬럼은 다른 컬럼과 동일한 수의 로우를 가져야 하며,
    컬렉션의 모든 로우는 같은 데이터 타입 정보를 가지고 있어야 합니다.
  • 지연 연산의 실행 계획이며, 불변성을 가집니다.
  • DataFrame에 액션을 호출하면 스파크는 트랜스포메이션을 실제로 실행하고 결과를 반환합니다. 
  DataFrame Dataset
구조적 API 비타입형 타입형
데이터 타입의 일치 여부 확인 시기 런타임이 되어서야 확인 컴파일 타임에 확인
언어 스칼라, 자바, 파이썬, R 스칼라, 자바
  • 스파크에서의 DataFrame은 Row 타입으로 구성된 Dataset입니다.
  • JVM 데이터 타입을 사용하는 대신 자체 데이터 포맷을 사용하기 때문에 효율적인 연산이 가능합니다.

 

스키마

  • 분산 컬렉션에 저장할 데이터 타입을 정의하는 방법입니다. 
  • DataFrame의 컬럼명과 데이터 타입을 정의합니다.
  • 데이터 소스에서 얻거나 직접 정의할 수 있습니다.
  • 여러 데이터 타입으로 구성됩니다. 

 

스파크가 지원하는 파이썬 데이터 타입 매핑

스파크 데이터 타입 파이썬 데이터 타입 데이터 타입 생성 /
접근용 API
ByteType int or long
Note: Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127.
ByteType()
ShortType int or long
Note: Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767.
ShortType()
IntegerType int or long IntegerType()
LongType long
Note: Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807.Otherwise, please convert data to decimal.Decimal and use DecimalType.
LongType()
FloatType float
Note: Numbers will be converted to 4-byte single-precision floating point numbers at runtime.
FloatType()
DoubleType float DoubleType()
DecimalType decimal.Decimal DecimalType()
StringType string StringType()
BinaryType bytearray BinaryType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
DateType datetime.date DateType()
ArrayType list, tuple, or array ArrayType(elementType, [containsNull])
Note:The default value of containsNull is True.
MapType dict MapType(keyType, valueType, [valueContainsNull])
Note:The default value of valueContainsNull is True.
StructType list or tuple StructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
StructField The value type in Python of the data type of this field
(For example, Int for a StructField with the data type IntegerType)
StructField(name, dataType, [nullable])
Note: The default value of nullable is True.

 

구조적 API의 실행 과정

스파크 코드가 클러스터에서 실제 처리되는 과정은 다음과 같습니다.

1. DataFrame/Dataset/SQL을 이용해 코드를 작성합니다.


2. 정상적인 코드라면 스파크가 논리적 실행 계획으로 변환합니다.


3. 스파크는 논리적 실행 계획을 물리적 실행 계획으로 변환*하며, 그 과정에서 추가적인 최적화를 할 수 있는지 확인합니다.
* 논리적 실행 계획을 클러스터에서 실행하는 방법을 정의합니다

4. 스파크는 클러스터에서 물리적 실행 계획(RDD)을 실행합니다. 
   스파크는 DataFrame, DataSet, SQL로 정의된 쿼리를 RDD 트랜스포메이션으로 컴파일*합니다. 
   * 한 프로그래밍 언어를 다른 언어로 바꾸는 것

 


구조적 API 기본 연산

df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
  • DataFrame :  Row 타입의 레코드와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼으로 구성됩니다.
spark.read.format("json").load("/data/flight-data/json/2015-summary.json").schema
  • 스키마 :  각 컬럼명과 데이터 타입을 정의합니다.
    * ETL 작업에 스파크를 사용한다면 직접 스키마를 정의해야 합니다. 
    * 스파크는 런타임에 데이터 타입이 스키마의 데이터 타입과 일치하지 않으면 오류를 발생시킵니다. 
  • 파티셔닝 : DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의합니다.
  • 파티셔닝 스키마 : 파티션을 배치하는 방법을 정의합니다.
    파티셔닝의 분할 기준은 특정 컬럼이나 매번 변하는 값을 기반으로 설정할 수 있습니다. 

 

컬럼과 표현식

from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")
  • 컬럼 내용을 수정하려면 반드시 DataFrame의 스파크 트랜스포메이션을 사용해야 합니다.

 

표현식

  • 컬럼이 표현식입니다.
  • DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미합니다. 
  • 여러 컬럼명을 입력으로 받아 식별하고, 단일값을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수라고 생각할 수 있습니다. 
  • expr 함수로 간단히 사용할 수 있습니다.
  • 컬럼과 컬럼의 트랜스포메이션논리적 실행 계획으로 컴파일됩니다. 
expr("someCol - 5")
col("someCol") - 5
expr("someCol") - 5

위 세 코드는 모두 같은 트랜스포메이션 과정을 거칩니다. 
스파크가 연산 순서를 지정하는 논리적 트리로 컴파일*하기 때문입니다. 

 

(((col("someCol") + 5) * 200) - 6) < col("otherCol")

 

select와 selectExpr

위의 메서드를 사용하면 DataFrame에서도 SQL을 사용할 수 있습니다.

df.select("DEST_COUNTRY_NAME").show(2) # 문자열 컬럼명을 인수로 받습니다.

 

expr, col, column을 사용하면 컬럼을 참조할 수 있습니다.

from pyspark.sql.functions import expr, col, column
df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"))\
  .show(2)

 

df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

select 메서드에 컬럼을 참조하는 expr 함수를 자주 사용하는데, selectExpr 메서드를 사용하면 이런 작업을 한번에 할 수있습니다.

df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))\
  .show(2)

해당 메서드를 사용하면 새로운 DataFrame을 간단하게 생성할 수 있습니다. 

 

리터럴

from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)

lit 메서드를 사용하여 명시적인 값을 스파크가 이해할 수 있는 값으로 변환합니다. 

 

withColumn

# df.withColumn("컬럼명", "표현식")
df.withColumn("numberOne", lit(1)).show(2)
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
  .show(2)

DataFrame에 신규 컬럼을 추가합니다.

 

백틱 `

dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")\
  .show(2)

selectExpr을 사용하여 표현식으로 예약 문자(공백, -)나 키워드를 사용하는 컬럼 이름을 참조할때는 백틱(`) 문자를 사용합니다. 

 

데이터 타입 변경

df.withColumn("count2", col("count").cast("string"))

cast 메서드로 데이터 타입을 형변환할 수 있습니다.

 

로우 필터링

df.filter(col("count").show(2)
df.where("count < 2").show(2)

DataFrame의 where, filter 메서드를 사용하는 것이 일반적입니다.

같은 표현식에 여러 개의 필터를 지정하려면 차례대로 필터를 연결하면 됩니다. 

df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
  .show(2)

 

고유한 로우 얻기

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

DataFrame의 모든 로우에서 중복 데이터를 제거할 수 있는 distinct 메서드를 사용하면
중복되지 않은 로우를 가진 신규 DataFrame을 반환합니다.

 

무작위 샘플 만들기

seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

sample 메서드를 사용합니다.
첫 번째 인자로 복원 추출/비복원추출,
두 번째 인자로 표본 데이터 추출 비율,
세 번째 인자로 랜덤 시드를 정합니다. 

 

임의 분할

dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False

머신러닝 알고리즘에서 학습셋, 검증셋, 테스트셋을 만들 때 주로 사용합니다. 
이 메서드는 임의성을 가지도록 설계되었으므로 시드값을 반드시 설정해야 합니다.

로우 합치기와 추가하기

from pyspark.sql import Row
schema = df.schema
newRows = [
  Row("New Country", "Other Country", 5L),
  Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows) # local python collection to RDD
newDF = spark.createDataFrame(parallelizedRows, schema)

df.union(newDF)\
  .where("count = 1")\
  .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
  .show()

DataFrame은 불변성을 가지므로, 레코드를 추가하려면 원본 DataFrame을 새로운 DataFrame과 통합해야 합니다.
통합하려는 2개의 DataFrame은 반드시 동일한 스키마와 컬럼 수를 가져야 합니다.

 

로우 정렬하기

from pyspark.sql.functions import desc, asc

df.sort("count").show(5)

df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

sort와 orderBy 메서드를 사용해 DataFrame의 최대값 혹은 최소값이 상단에 위치하도록 정렬합니다.
다수의 컬럼을 기준으로 지정할 수 있으며, 기본 동작 방식은 오름차순 정렬입니다.
정렬 기준을 명확히 지정하려면 asc나 desc 함수를 사용하여 컬럼의 정렬 순서를 지정합니다.

spark.read.format("json").load("/data/flight-data/json/*-summary.json")\
  .sortWithinPartitions("count")

트랜스포메이션을 처리하기 전에 성능을 최적화하기 위해 sortWithinPartitions 메서드로 파티션별 정렬을 수행하기도 합니다.

또 다른 최적화 기법은 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 것입니다.
파티셔닝 스키마와 파티션 수를 포함한 클러스터 전반의 물리적 데이터 구성을 제어합니다.

df.repartition(5)

repartition 메서드를 호출하면 전체 데이터를 셔플합니다.
향후에 사용할 파티션 수가 현재 파티션 수보다 많거나, 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 합니다.

df.repartition(5, col("DEST_COUNTRY_NAME"))

특정 컬럼을 기준으로 자주 필터링한다면 자주 필터링되는 컬럼을 기준으로 파티션을 재분배하는 것이 좋습니다.
첫 번째 인자로 파티션 수를 지정해줄 수도 있습니다.

df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

coalesce 메서드는 전체 데이터를 셔플하지 않고 파티션을 병합하려는 경우에 사용합니다. 
파티션 수를 줄일 때 사용합니다.

 

드라이버로 로우 데이터 수집하기

collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False) # 두 번째 인자는 truncate 여부 - 기본 20 chars
collectDF.collect()

collect 메서드는 전체 DataFrame의 모든 데이터를 수집하며,
take 메서드는 상위 N개의 로우를 반환합니다.
show 메서드는 여러 로우를 보기 좋게 출력합니다. 


다양한 데이터 타입 다루기

 

스파크 데이터 타입으로 변환하기

from pyspark.sql.functions import lit
df.select(lit(5), lit("five"), lit(5.0))

데이터 타입 변환은 lit 함수를 사용합니다. lit 함수는 다른 언어의 데이터 타입을 스파크 데이터 타입에 맞게 변환합니다. 

 

불리언 데이터 타입

from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
  .select("InvoiceNo", "Description")\
  .show(5, False)

불리언 식에는 일치 조건 뿐 아니라 작다, 크다와 같은 비교 연산도 사용할 수 있습니다. 

문자열 표현식에 조건절을 명시하여 사용하는 것이 더 명확합니다.

df.where("InvoiceNo = 536365").show(5, false)

 

from pyspark.sql.functions import instr

# instr - locate first occurrence of sub column in given string

priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

불리언 컬럼을 사용해 DataFrame을 필터링할 수도 있습니다. 

 

from pyspark.sql.functions import expr
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))\
  .where("isExpensive")\
  .select("Description", "UnitPrice").show(5)

withColumn 메서드는 컬럼을 더함으로써 새로운 DataFrame을 리턴하는데,
첫 번째 인자로 컬럼 이름을, 두 번째 인자로 컬럼을 받습니다.

 

수치형 데이터 타입 다루기

from pyspark.sql.functions import col,expr, pow

# pow는 거듭제곱!
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
df.selectExpr(
    "CustomerId",
    "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)

수치형 컬럼은 덧셈, 뺄셈, 곱셈 등의 연산이 가능합니다. 

 

반올림, 반내림

from pyspark.sql.functions import lit, round, bround

# bround - 반내림
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows

 

통계 

from pyspark.sql.functions import corr

df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()
+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+

피어슨 상관관계는 0에서 1 사이의 값을 가지며, 1에 가까울수록 완벽한 선형 상관 관계를 나타냅니다.

 

df.describe().show()
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|        InvoiceDate|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|               3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128|               null| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|               null|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|2010-12-01 08:26:00|               0.0|           12431.0|     Australia|
|    max|          C536548|              POST|ZINC WILLIE WINKI...|               600|2010-12-01 17:35:00|            607.49|           18229.0|United Kingdom|
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+

관련 컬럼에 대한 집계count, 평균mean, 표준편차stddev, 최소값, 최대값 등을 계산합니다.

정확한 수치가 필요하다면 해당 함수를 임포트하고 해당 컬럼에 적용하는 방식으로 집계를 수행합니다. 

from pyspark.sql.functions import count, mean, stddev_pop, min, max

 

StatFunctions 패키지는 다양한 통계 함수를 제공합니다. 

# approxQuantitle - 데이터의 백분위수를 정확하게 계산하거나 근사치 계산

olName = "UnitPrice"
quantileProbs = [0.5]
realError = 0.05

df.stat.approxQuantile("UnitPrice", quantileProbs, realError)
df.stat.crosstab("StockCode", "Quantity").show()
+------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|StockCode_Quantity| -1|-10|-12| -2|-24| -3| -4| -5| -6| -7|  1| 10|100| 11| 12|120|128| 13| 14|144| 15| 16| 17| 18| 19|192|  2| 20|200| 21|216| 22| 23| 24| 25|252| 27| 28|288|  3| 30| 32| 33| 34| 36|384|  4| 40|432| 47| 48|480|  5| 50| 56|  6| 60|600| 64|  7| 70| 72|  8| 80|  9| 96|
+------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|             22578|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             21327|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             22064|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             21080|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|
|             22219|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  3|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             21908|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             22818|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|           15056BL|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             72817|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             22545|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             22988|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|             22274|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             20750|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|            82616C|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             21703|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             22899|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|
|             22379|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             22422|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             22769|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|             22585|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
+------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
only showing top 20 rows

 

df.stat.freqItems(["StockCode", "Quantity"]).show()
+--------------------+--------------------+
| StockCode_freqItems|  Quantity_freqItems|
+--------------------+--------------------+
|[90214E, 20728, 2...|[200, 128, 23, 32...|
+--------------------+--------------------+

freqItems 메서드로 자주 사용하는 항목 쌍을 확인할 수 있습니다. 

from pyspark.sql.functions import monotonically_increasing_id

df.select(monotonically_increasing_id()).show(2)
+-----------------------------+
|monotonically_increasing_id()|
+-----------------------------+
|                            0|
|                            1|
+-----------------------------+
only showing top 2 rows

monotonically_increasing_id 함수는 모든 로우에 0부터 시작하는 고유 ID값을 생성합니다.

 

문자열 데이터 타입 다루기

로그 파일에 정규 표현식을 사용해 데이터 추출, 데이터 치환, 문자열 존재 여부, 대/소문자 변환 처리등의 작업을 할 수 있습니다.

# 대/소문자 변환 
# initcap - 주어진 문자열에서 공백으로 나뉘는 모든 단어의 첫 글자를 대문자로 변경
from pyspark.sql.functions import initcap

df.select(initcap(col("Description"))).show()
+--------------------+
|initcap(Description)|
+--------------------+
|White Hanging Hea...|
| White Metal Lantern|
|Cream Cupid Heart...|
|Knitted Union Fla...|
|Red Woolly Hottie...|
|Set 7 Babushka Ne...|
|Glass Star Froste...|
|Hand Warmer Union...|
|Hand Warmer Red P...|
|Assorted Colour B...|
|Poppy's Playhouse...|
|Poppy's Playhouse...|
|Feltcraft Princes...|
|Ivory Knitted Mug...|
|Box Of 6 Assorted...|
|Box Of Vintage Ji...|
|Box Of Vintage Al...|
|Home Building Blo...|
|Love Building Blo...|
|Recipe Box With M...|
+--------------------+

 

# 문자열 전체를 소문자로 변경하거나 대문자로 변경
from pyspark.sql.functions import lower, upper

df.select(col("Description"),
          lower(col("Description")),
          upper(lower(col("Description")))).show(2)
+--------------------+--------------------+-------------------------+
|         Description|  lower(Description)|upper(lower(Description))|
+--------------------+--------------------+-------------------------+
|WHITE HANGING HEA...|white hanging hea...|     WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern|      WHITE METAL LANTERN|
+--------------------+--------------------+-------------------------+
only showing top 2 rows
# 문자열 주변의 공백을 제거하거나 추가하는 작업 

from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
    ltrim(lit("    HELLO    ")).alias("ltrim"),
    rtrim(lit("    HELLO    ")).alias("rtrim"),
    trim(lit("    HELLO    ")).alias("trim"),
    lpad(lit("HELLO"), 3, " ").alias("lp"),
    rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

# lpad나 rpad 함수에 문자열의 길이보다 작은 숫자를 넘기면 문자열의 오른쪽부터 제거됨.
+---------+---------+-----+---+----------+
|    ltrim|    rtrim| trim| lp|        rp|
+---------+---------+-----+---+----------+
|HELLO    |    HELLO|HELLO|HEL|HELLO     |
|HELLO    |    HELLO|HELLO|HEL|HELLO     |
+---------+---------+-----+---+----------+
only showing top 2 rows

 

정규 표현식

정규 표현식을 사용하여 문자열에서 값을 추출하거나 다른 값으로 치환하는 데 필요한 규칙 모음을 정의할 수 있습니다. 
스파크는 자바 정규 표현식 문법을 사용합니다.

from pyspark.sql.functions import regexp_replace

regex_string = "BLACK|WHITE|RED|GREEN|BLUE"

df.select(
    regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean"),
    col("Description")).show(2)
+--------------------+--------------------+
|         color_clean|         Description|
+--------------------+--------------------+
|COLOR HANGING HEA...|WHITE HANGING HEA...|
| COLOR METAL LANTERN| WHITE METAL LANTERN|
+--------------------+--------------------+
only showing top 2 rows
from pyspark.sql.functions import translate

df.select(translate(col("Description"), "LEET", "1337"), col("Description")).show(2)
+----------------------------------+--------------------+
|translate(Description, LEET, 1337)|         Description|
+----------------------------------+--------------------+
|              WHI73 HANGING H3A...|WHITE HANGING HEA...|
|               WHI73 M37A1 1AN73RN| WHITE METAL LANTERN|
+----------------------------------+--------------------+
only showing top 2 rows

translate 메서드는 주어진 문자를 다른 문자로 치환합니다.
문자 단위로 이루어져 교체 문자열에서 색인된 문자에 해당하는 모든 문자를 치환합니다.
암호화 하기 좋겠다는 생각이 드네요.

from pyspark.sql.functions import regexp_extract

extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"
df.select(
    regexp_extract(col("Description"), extract_str, 1).alias("color_clean"),
    col("Description")).show(2)
+-----------+--------------------+
|color_clean|         Description|
+-----------+--------------------+
|      WHITE|WHITE HANGING HEA...|
|      WHITE| WHITE METAL LANTERN|
+-----------+--------------------+

정규식 추출로 처음 나타난 색상 이름을 추출하는 것 같은 작업도 할 수 있습니다. 

 

날짜와 타임스탬프 데이터 타입 다루기

날짜와 시간을 다룰 때는 계속해서 시간대timezone을 확인해야 하며, 데이터 포맷이 올바르고 유효한지 확인해야 합니다.
스파크는 자바의 날짜와 타임스탬프를 사용해서 표준 체계를 따릅니다.

from pyspark.sql.functions import current_date, current_timestamp

dateDF = spark.range(10)\
  .withColumn("today", current_date())\
  .withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()
root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)

currnet_date와 current_timestamp 메서드를 사용하여 오늘 날짜와 현재의 타임스탬프값을 구할 수 있고,

from pyspark.sql.functions import date_add, date_sub

dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)
+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2022-02-21|        2022-03-03|
+------------------+------------------+
only showing top 1 row

date_add, date_sub 메서드로 오늘을 기준으로 5일 전후의 날짜를 구할 수도 있습니다. 

두 날짜의 차이를 구할 때는 두 날짜 사이의 일 수를 반환하는 datediff 함수를 사용합니다.

from pyspark.sql.functions import datediff, months_between, to_date

dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
  .select(datediff(col("week_ago"), col("today"))).show(1)
+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+

dateDF.select(
    to_date(lit("2016-01-01")).alias("start"),
    to_date(lit("2017-05-22")).alias("end"))\
  .select(months_between(col("start"), col("end"))).show(1)
# 두 날짜 사이의 개월 수 반환
+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -16.67741935|
+--------------------------------+

 

to_date 함수는 문자열을 날짜로 변환할 수 있으며, 날짜 포맷도 함께 지정할 수 있습니다.
함수의 날짜 포맷은 반드시 자바의 SimpleDateFormat 클래스가 지원하는 포맷을 사용해야 합니다.

from pyspark.sql.functions import to_date, lit

spark.range(5).withColumn("date", lit("2017-01-01"))\
  .select(to_date(col("date"))).show(1)
+-------------+
|to_date(date)|
+-------------+
|   2017-01-01|
+-------------+

 

from pyspark.sql.functions import to_timestamp

cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()
+------------------------------+
|to_timestamp(date, yyyy-dd-MM)|
+------------------------------+
|           2017-11-12 00:00:00|
+------------------------------+

to_timestamp 함수는 반드시 날짜 포맷을 지정해야 합니다.

 

null 값 다루기

DataFrame에서 빠져 있거나 비어 있는 데이터를 표현할 때는 항상 null 값을 사용하는 것이 좋습니다. 그래야 최적화를 수행할 수 있습니다.

from pyspark.sql.functions import coalesce # returns first col that is not null

df.select(coalesce(col("Description"), col("CustomerId"))).show()

coalesce 함수와 유사한 값을 얻을 수 있는 SQL 함수들이 있습니다.

ifnull -  첫 번째 값이 null이면 두 번째 값을 반환합니다.
nullif - 두 값이 같으면 null을 반환합니다
nvl - 첫 번째 값이 null이면 두 번째 값을 반환하고, 첫 번째 값이 null이 아니면 첫 번째 값을 반환합니다.
nvl2 - 첫 번째 값이 null이 아니면 두 번째 값을, 첫 번째 값이 null이면 세 번째 인수로 지정된 값을 반환합니다.

drop

df.na.drop("any")

df.na.drop("all", subset=["StockCode", "InvoiceNo"])

null 값을 가진 모든 로우를 제거합니다. subset 을 정하여 특정 컬럼만 제거할 수도 있습니다. 

df.na.fill("All null values become this string")

df.na.fill("all", subset=["StockCode", "InvoiceNo"])

# 스칼라 Map 타입 사용 - 다수의 컬럼에 fill 메서드 적용
fill_cols_vals = {"StockCode": 5, "Description": "No Value"}
df.na.fill(fill_cols_vals)

fill 함수는 하나 이상의 컬럼을 특정 값으로 채우며,
na.fill을 사용하여 null 값을 다른 값으로 채워 넣을 수 있습니다.

replace

df.na.replace([""], ["UNKNOWN"], "Description")

replace 메서드를 사용하여 null 값을 다른 값으로 대체하고자 한다면, 변경하고자 하는 값과 원래 값의 데이터 타입이 같아야 합니다.

 

복합 데이터 타입 다루기

구조체struct

from pyspark.sql.functions import struct

complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")

DataFrame 내부의 DataFrame으로, 쿼리문에서 다수의 컬럼을 괄호로 묶어 구조체로 만들 수 있습니다.

complexDF.select("complex.Description")
complexDF.select(col("complex").getField("Description"))

DataFrame을 조회하는 것과 동일하게 사용하나, 문법에 . 을 사용하거나 getField 메서드를 사용한다는 것이 다릅니다. 

배열array

[A, B, C ... ] 형태의 데이터입니다. 

from pyspark.sql.functions import split

df.select(split(col("Description"), " ")).show(2)
+-------------------------+
|split(Description,  , -1)|
+-------------------------+
|     [WHITE, HANGING, ...|
|     [WHITE, METAL, LA...|
+-------------------------+

배열을 만들기 위해 컬럼의 데이터를 공백 기준으로 분할합니다.

df.select(split(col("Description"), " ").alias("array_col"))\
  .selectExpr("array_col[0]").show(2)
+------------+
|array_col[0]|
+------------+
|       WHITE|
|       WHITE|
+------------+

selectExpr로 해당 배열의 인덱스를 사용하여 배열값을 조회할 수 있습니다. 

from pyspark.sql.functions import split, explode

df.withColumn("splitted", split(col("Description"), " "))\
  .withColumn("exploded", explode(col("splitted")))\
  .select("Description", "InvoiceNo", "exploded").show(2)

 

맵map

컬럼의 키-값 쌍을 이용해 생성하고, 배열과 동일한 방법으로 값을 선택할 수 있습니다. 

from pyspark.sql.functions import create_map

df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
  .show(2)
+--------------------+
|         complex_map|
+--------------------+
|{WHITE HANGING HE...|
|{WHITE METAL LANT...|
+--------------------+
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
  .selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
|                            null|
|                          536365|
+--------------------------------+

 

map 타입은 분해하여 컬럼으로 변환할 수 있습니다. 

df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
  .selectExpr("explode(complex_map)").show(2)
+--------------------+------+
|                 key| value|
+--------------------+------+
|WHITE HANGING HEA...|536365|
| WHITE METAL LANTERN|536365|
+--------------------+------+

explode 메서드는 array나 map의 각 요소로 새로운 로우를 만듭니다. 

 

사용자 정의 함수

사용자 정의 함수 User Defined Function, UDF를 사용해 파이썬, 스칼라, 자바, 스파크 SQL, 그리고 외부 라이브러리를 사용해 사용자가 원하는 형태로 트랜스포메이션을 만들 수 있습니다.

어떤 언어로 UDF를 만드느냐에 따라 성능에 동작하는 방식과 성능에 영향을 미치는데요,
스칼라나 자바로 함수를 작성했다면 JVM 환경에서만 사용할 수 있습니다.


파이썬으로 함수를 작성했다면 워커 노드에 파이썬 프로세스를 실행하고, 파이썬이 이해할 수 있는 포맷으로 모든 데이터를 직렬화합니다. (자바와 스칼라에서도 마찬가지입니다.) 그리고 파이썬 프로세스에 있는 데이터의 로우마다 함수를 실행하고, 마지막으로 JVM과 스파크에 처리 결과를 반환합니다. 

데이터를 전달하기 위해 직렬화 시 큰 부하가 발생하고, 데이터가 파이썬으로 전달되면 스파크에서 워커 메모리를 관리할 수 없다는 문제가 생깁니다. 자원에 제약이 생기면 워커가 비정상적으로 종료될 가능성이 있으므로,
사용자 정의 함수를 작성하려면 자바나 스칼라를 사용하는 것이 좋습니다.

사용자 정의 함수를 스파크 SQL 함수로 등록하면 모든 프로그래밍 언어와 SQL에서 사용자 정의 함수를 사용할 수 있습니다. 

함수를 작성할 시, 스파크는 자체 데이터 타입을 사용하기 때문에 함수의 반환 타입을 지정하는 것이 좋습니다.
함수에서 반환될 실제 데이터 타입과 일치하지 않는 데이터 타입을 지정하면 스파크는 오류가 아닌 null 값을 반환합니다.