Spark

[스파크] chap3. 스파크 기능 둘러보기

Ellie67 2021. 8. 3. 18:06

 

스파크는 기본 요소인 저수준 API구조적 API 그리고 추가 기능을 제공하는 일련의 표준 라이브러리로 구성되어 있다.

 

스파크의 라이브러리 

=> 그래프 분석, 머신러닝, 스트리밍 등 다양한 작업 지원 및 컴퓨팅 및 스토리지 시스템과의 통합을 돕는 역할

 

 

3.1 운영용 애플리케이션 실행하기

 

spark-submit

- 대화형 셸에서 개발한 프로그램을 운영용 애플리케이션으로 쉽게 전환할 수 있다.

- 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할을 한다. (제출된 애플리케이션은 작업이 종료되거나 에러가 발생할 때까지 실행된다.)

- 애플리케이션 실행에 필요한 자원과 실행 방식 및 다양한 옵션을 지정할 수 있다.

 

스파크 애플리케이션 -> StandAlone, Mesos, YARN 클러스터 매니저를 통해 실행된다.

 

spark-submit \--class org.apache.spark.examples.SparkPi \--master local \C:\spark-3.1.2-bin-hadoop3.2\spark-3.1.2-bin-hadoop3.2\\examples\\jars\\spark-examples_2.12-3.1.2

 

 

3.2 Dataset: 타입 안정성을 제공하는 구조적 API

 - 자바와 스칼라의 정적 데이터 타입에 맞는 정적 타입 코드를 지원하기 위해 고안된 스파크의 구조적 API

 - 동적 타입 언어인 파이썬과 R에서는 사용할 수 없고 타입 안정성을 지원하기 때문에 초기화에 사용한 클래스 대신 다른 클래스를 사용해 접근할 수 없다.

 

 DataFrame

 - 다양한 데이터 타입의 테이블형 데이터를 보관할 수 있는 Row 타입 객체로 구성된 분산 컬렉션

 

Dataset API

 : DataFrame의 레코드를 사용자가 자바나 스칼라로 정의한 클래스에 할당

 : 자바의 ArrayList 또는 스칼라의 Seq 객체 등의 고정 타입형 컬렉션으로 다룰 수 있는 기능 제공

=> 대규모 애플리케이션을 개발하는 데 유용

 

Dataset 클래스는 내부 객체의 데이터 타입을 매개변수로 사용

Dataset[Person] //  Person 클래스의 객체만 가질 수 있다. (자바: Dataset<Person>)

=> 스파크가 타입을 제한적으로 사용하는 이유 : 자동으로 타입 T를 분석한 다음 Dataset의 표 형식 데이터에 적합한 스키마를 생성해야 하기 때문

 

장점1

- Dataset은 필요한 경우(DataFrame만으로 처리가 불가능할 때)에 선택적으로 사용할 수 있다.

 

(예제)

// 데이터 타입 정의
case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt)

val flightDF = spark.read.parquet("./data/flight-data/parquet/2010-summary.parquet/")

val flight = flightDF.as[Flight]

 

// 데이터 타입 정의하고 map과 filter 함수 사용
flight
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(flight_row => flight_row) .take(5)
flight
.take(5)
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count+5))

 

 

장점2

- 스파크가 제공하는 여러 함수를 이용해 추가 처리 작업을 할 수 있다.

=> 타입 안정성을 보장하는 코드에서 저수준 API를 사용할 수 있으며, 고수준 APISQL을 사용해 빠른 분석을 할 수 있게 한다.

 

장점3

- Datasetcollect 메서드나 take 메서드를 호출하면 DataFrame을 구성하는 Row 타입의 객체가 아닌 Dataset에 매개변수로 지정한 타입의 객체를 반환한다.

=> 코드 변경 없이 타입 안정성을 보장할 수 있으며 로컬이나 분산 클러스터 환경에서 데이터를 안전하게 다룰 수 있다.

 

 

3.3 구조적 스트리밍

 - 스파크 2.2버전에서 안정화된 스트림 처리용 고수준 API

 

장점

 - 구조적 API로 개발된 배치모드의 연산을 스트리밍 방식으로 실행 가능 => 지연 시간 줄이고 증분 처리(데이터 원본에 새로 추가되거나 변경된 데이터를 대상에 반영하는 작업)할 수 있다.

 - 배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고 값을 빠르게 얻을 수 있다.

 

(예제)

여러 프로세스에서 데이터가 꾸준하게 생성되는 상황을 상상해봅니다.

지금 사용하는 데이터는 소매 데이터이고 그렇기 때문에 소매점에서 생성된 데이터가 구조적 스트리밍 잡이 읽을 수 있는 저장소로 전송되고 있다고 가정합니다.

-> 정적 데이터셋의 데이터를 분석해 DataFrame을 생성합니다. (정적 데이터셋의 스키마도 함께 생성)

 

val staticDataFrame = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")

val staticSchema = staticDataFrame.schema

 

 

- 윈도우 함수는 집계 시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성한다.

- 윈도우는 간격을 통해 처리 요건을 명시할 수 있기 때문에 날짜와 타임스탬프 처리에 유용

- 스파크는 관련 날짜의 데이터를 그룹화한다.

 

staticDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice*Quantity) as total_cost",
"InvoiceDate" )
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day") )
.sum("total_cost")
.show(5)

 


지금까지는 정적 DataFrame 버전의 코드였다.

 

이제는 스트리밍 코드인데 코드는 거의 바뀌지 않고 read 메서드 대신 readStream 메서드를 사용한다.

 

한 번에 읽을 파일 수 설정 하는 maxFilesPerTrigger 옵션을 추가로 지정

 

staticDataFrame.isStreaming // 지금까지는 DataFrame이 스트리밍 유형이 아니었기 때문에 false가 반환됨

val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 1)
.format("csv")
.option("header", "true")
.load("./data/retail-data/by-day/*.csv")

staticDataFrame.isStreaming // 스트리밍 유형이 되었기 때문에 true 반환

 

(총 판매 금액 계산 코드)

val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice*Quantity) as total_cost",
"InvoiceDate" ) .
groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day") )
.sum("total_cost")

 

=> 지연 연산이기 때문에 데이터 플로를 실행하기 위해 스트리밍 액션을 호출해야 한다.

 

purchaseByCustomerPerHour.writeStream
.format("memory") // 인메모리 테이블에 저장 
.queryName("customer_purchases") // 테이블 명 
.outputMode("complete") // 모든 카운트 수행결과를 테이블에 저장 
.start()

 

=> 스파크는 이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신

 

 

3.4 머신러닝과 고급 분석

 - 스파크에 내장된 머신러닝 알고리즘 라이브러리 MLlib을 사용해 대규모 머신러닝을 수행할 수 있다.

  => 대용량 데이터를 대상으로 전처리, 멍잉, 모델 학습, 예측을 할 수 있다.

 

스파크는 분류, 회귀, 군집화, 딥러닝에 이르기까지 머신러닝과 관련된 정교한 API 제공

 

(예제)

k-평균(데이터에서 k개의 중심이 임의로 할당되는 군집화 알고리즘)을 이용하여 기본적인 군집화 수행

 

MLlib의 머신러닝 알고리즘을 사용하기 위해서는 수치형 데이터 필요

 

staticDataFrame.printSchema() // 데이터 타입 확인

val preppedDataFrame = staticDataFrame // 데이터 타입 수치형으로 변환
.na.fill(0)
.withColumn("day_of_week", date_format($"InvoiceDate", "EEEE"))
.coalesce(5)

// 데이터를 학습 데이터셋과 테스트 데이터셋으로 분리
val trainDataFrame = preppedDataFrame
.where("InvoiceDate < '2011-07-01'")

val testDataFrame = preppedDataFrame
.where("InvoiceDate >= '2011-07-01'")

// 스파크 MLlib은 일반적인 트랜스포메이션을 자동화하는 다양한 트랜스포메이션 제공
// 요일을 수치형으로 반환-> 토요일을 6으로, 월요일을 1로 반환하게 되면 토요일이 월요일보다 더 크다는 것을 의미하는 문제
val indexer = new StringIndexer()
.setInputCol("day_of_week")
.setOutputCol("day_of_week_index")

// 위 코드에서 발생한 문제점을 보완하기 위해 각 값을 자체 컬럼으로 인코딩->불리언 타입으로 나타낼 수 있음
val encoder = new OneHotEncoder()
.setInputCol("day_of_week_index")
.setOutputCol("day_of_week_encoded")

// 스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("UnitPrice","Quantity", "day_of_week_encoded"))
.setOutputCol("features")

// 입력값으로 들어올 데이터가 같은 프로세스를 거쳐 변환되도록 파이프라인을 설정
val transformationPipeline = new Pipeline()
.setStages(Array(indexer, encoder, vectorAssembler))

 

 

학습 준비 과정

1. 우선 변환자를 데이터셋에 적합시켜야 한다. -> 학습을 위한 맞춤 파이프라인 준비

val fittedPipeline = transformationPipeline.fit(trainDataFrame)

2. StringIndexer는 인덱싱할 고유값의 수를 알아야 한다.

 

동일한 트랜스포메이션을 계속 반복할 수 없기 때문에 모델에 일부 하이퍼파라미터 튜닝값을 적용한다.

캐싱을 사용하면 중간 변환된 데이터셋의 복사본을 메모리에 저장하기 때문에 전체 파이프라인을 재실행하는 것보다 훨신 빠르게 반복적으로 데이터셋에 접근할 수 있다.

 

모델 학습

- 스파크에서 머신러닝 모델을 학습시키는 과정

-> 1. 아직 학습되지 않은 모델을 초기화

-> 2. 해당 모델을 학습시킴

 

MLlibDataFrame API에서 제공하는 모든 알고리즘 명명규칙

- 학습 전 알고리즘 명칭 : Algorithm

- 학습 후 알고리즘 명칭 : AlgorithmModel

 

val kmeans = new KMeans() .setK(20) .setSeed(1L)

=> 학습 전 알고리즘 명칭 : KMeans / 학습 후 알고리즘 명칭 : KMeansModel