스파크는 기본 요소인 저수준 API와 구조적 API 그리고 추가 기능을 제공하는 일련의 표준 라이브러리로 구성되어 있다.
스파크의 라이브러리
=> 그래프 분석, 머신러닝, 스트리밍 등 다양한 작업 지원 및 컴퓨팅 및 스토리지 시스템과의 통합을 돕는 역할
3.1 운영용 애플리케이션 실행하기
spark-submit
- 대화형 셸에서 개발한 프로그램을 운영용 애플리케이션으로 쉽게 전환할 수 있다.
- 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할을 한다. (제출된 애플리케이션은 작업이 종료되거나 에러가 발생할 때까지 실행된다.)
- 애플리케이션 실행에 필요한 자원과 실행 방식 및 다양한 옵션을 지정할 수 있다.
스파크 애플리케이션 -> StandAlone, Mesos, YARN 클러스터 매니저를 통해 실행된다.
spark-submit \--class org.apache.spark.examples.SparkPi \--master local \C:\spark-3.1.2-bin-hadoop3.2\spark-3.1.2-bin-hadoop3.2\\examples\\jars\\spark-examples_2.12-3.1.2
3.2 Dataset: 타입 안정성을 제공하는 구조적 API
- 자바와 스칼라의 정적 데이터 타입에 맞는 정적 타입 코드를 지원하기 위해 고안된 스파크의 구조적 API
- 동적 타입 언어인 파이썬과 R에서는 사용할 수 없고 타입 안정성을 지원하기 때문에 초기화에 사용한 클래스 대신 다른 클래스를 사용해 접근할 수 없다.
DataFrame
- 다양한 데이터 타입의 테이블형 데이터를 보관할 수 있는 Row 타입 객체로 구성된 분산 컬렉션
Dataset API
: DataFrame의 레코드를 사용자가 자바나 스칼라로 정의한 클래스에 할당
: 자바의 ArrayList 또는 스칼라의 Seq 객체 등의 고정 타입형 컬렉션으로 다룰 수 있는 기능 제공
=> 대규모 애플리케이션을 개발하는 데 유용
Dataset 클래스는 내부 객체의 데이터 타입을 매개변수로 사용
Dataset[Person] // Person 클래스의 객체만 가질 수 있다. (자바: Dataset<Person>)
=> 스파크가 타입을 제한적으로 사용하는 이유 : 자동으로 타입 T를 분석한 다음 Dataset의 표 형식 데이터에 적합한 스키마를 생성해야 하기 때문
장점1
- Dataset은 필요한 경우(DataFrame만으로 처리가 불가능할 때)에 선택적으로 사용할 수 있다.
(예제)
// 데이터 타입 정의
case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flightDF = spark.read.parquet("./data/flight-data/parquet/2010-summary.parquet/")
val flight = flightDF.as[Flight]
// 데이터 타입 정의하고 map과 filter 함수 사용
flight
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(flight_row => flight_row) .take(5)
flight
.take(5)
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count+5))
장점2
- 스파크가 제공하는 여러 함수를 이용해 추가 처리 작업을 할 수 있다.
=> 타입 안정성을 보장하는 코드에서 저수준 API를 사용할 수 있으며, 고수준 API의 SQL을 사용해 빠른 분석을 할 수 있게 한다.
장점3
- Dataset은 collect 메서드나 take 메서드를 호출하면 DataFrame을 구성하는 Row 타입의 객체가 아닌 Dataset에 매개변수로 지정한 타입의 객체를 반환한다.
=> 코드 변경 없이 타입 안정성을 보장할 수 있으며 로컬이나 분산 클러스터 환경에서 데이터를 안전하게 다룰 수 있다.
3.3 구조적 스트리밍
- 스파크 2.2버전에서 안정화된 스트림 처리용 고수준 API
장점
- 구조적 API로 개발된 배치모드의 연산을 스트리밍 방식으로 실행 가능 => 지연 시간 줄이고 증분 처리(데이터 원본에 새로 추가되거나 변경된 데이터를 대상에 반영하는 작업)할 수 있다.
- 배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고 값을 빠르게 얻을 수 있다.
(예제)
여러 프로세스에서 데이터가 꾸준하게 생성되는 상황을 상상해봅니다.
지금 사용하는 데이터는 소매 데이터이고 그렇기 때문에 소매점에서 생성된 데이터가 구조적 스트리밍 잡이 읽을 수 있는 저장소로 전송되고 있다고 가정합니다.
-> 정적 데이터셋의 데이터를 분석해 DataFrame을 생성합니다. (정적 데이터셋의 스키마도 함께 생성)
val staticDataFrame = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema
- 윈도우 함수는 집계 시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성한다.
- 윈도우는 간격을 통해 처리 요건을 명시할 수 있기 때문에 날짜와 타임스탬프 처리에 유용
- 스파크는 관련 날짜의 데이터를 그룹화한다.
staticDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice*Quantity) as total_cost",
"InvoiceDate" )
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day") )
.sum("total_cost")
.show(5)
지금까지는 정적 DataFrame 버전의 코드였다.
이제는 스트리밍 코드인데 코드는 거의 바뀌지 않고 read 메서드 대신 readStream 메서드를 사용한다.
한 번에 읽을 파일 수 설정 하는 maxFilesPerTrigger 옵션을 추가로 지정
staticDataFrame.isStreaming // 지금까지는 DataFrame이 스트리밍 유형이 아니었기 때문에 false가 반환됨
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 1)
.format("csv")
.option("header", "true")
.load("./data/retail-data/by-day/*.csv")
staticDataFrame.isStreaming // 스트리밍 유형이 되었기 때문에 true 반환
(총 판매 금액 계산 코드)
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice*Quantity) as total_cost",
"InvoiceDate" ) .
groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day") )
.sum("total_cost")
=> 지연 연산이기 때문에 데이터 플로를 실행하기 위해 스트리밍 액션을 호출해야 한다.
purchaseByCustomerPerHour.writeStream
.format("memory") // 인메모리 테이블에 저장
.queryName("customer_purchases") // 테이블 명
.outputMode("complete") // 모든 카운트 수행결과를 테이블에 저장
.start()
=> 스파크는 이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신
3.4 머신러닝과 고급 분석
- 스파크에 내장된 머신러닝 알고리즘 라이브러리 MLlib을 사용해 대규모 머신러닝을 수행할 수 있다.
=> 대용량 데이터를 대상으로 전처리, 멍잉, 모델 학습, 예측을 할 수 있다.
스파크는 분류, 회귀, 군집화, 딥러닝에 이르기까지 머신러닝과 관련된 정교한 API 제공
(예제)
k-평균(데이터에서 k개의 중심이 임의로 할당되는 군집화 알고리즘)을 이용하여 기본적인 군집화 수행
MLlib의 머신러닝 알고리즘을 사용하기 위해서는 수치형 데이터 필요
staticDataFrame.printSchema() // 데이터 타입 확인
val preppedDataFrame = staticDataFrame // 데이터 타입 수치형으로 변환
.na.fill(0)
.withColumn("day_of_week", date_format($"InvoiceDate", "EEEE"))
.coalesce(5)
// 데이터를 학습 데이터셋과 테스트 데이터셋으로 분리
val trainDataFrame = preppedDataFrame
.where("InvoiceDate < '2011-07-01'")
val testDataFrame = preppedDataFrame
.where("InvoiceDate >= '2011-07-01'")
// 스파크 MLlib은 일반적인 트랜스포메이션을 자동화하는 다양한 트랜스포메이션 제공
// 요일을 수치형으로 반환-> 토요일을 6으로, 월요일을 1로 반환하게 되면 토요일이 월요일보다 더 크다는 것을 의미하는 문제
val indexer = new StringIndexer()
.setInputCol("day_of_week")
.setOutputCol("day_of_week_index")
// 위 코드에서 발생한 문제점을 보완하기 위해 각 값을 자체 컬럼으로 인코딩->불리언 타입으로 나타낼 수 있음
val encoder = new OneHotEncoder()
.setInputCol("day_of_week_index")
.setOutputCol("day_of_week_encoded")
// 스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("UnitPrice","Quantity", "day_of_week_encoded"))
.setOutputCol("features")
// 입력값으로 들어올 데이터가 같은 프로세스를 거쳐 변환되도록 파이프라인을 설정
val transformationPipeline = new Pipeline()
.setStages(Array(indexer, encoder, vectorAssembler))
학습 준비 과정
1. 우선 변환자를 데이터셋에 적합시켜야 한다. -> 학습을 위한 맞춤 파이프라인 준비
val fittedPipeline = transformationPipeline.fit(trainDataFrame)
2. StringIndexer는 인덱싱할 고유값의 수를 알아야 한다.
동일한 트랜스포메이션을 계속 반복할 수 없기 때문에 모델에 일부 하이퍼파라미터 튜닝값을 적용한다.
캐싱을 사용하면 중간 변환된 데이터셋의 복사본을 메모리에 저장하기 때문에 전체 파이프라인을 재실행하는 것보다 훨신 빠르게 반복적으로 데이터셋에 접근할 수 있다.
모델 학습
- 스파크에서 머신러닝 모델을 학습시키는 과정
-> 1. 아직 학습되지 않은 모델을 초기화
-> 2. 해당 모델을 학습시킴
MLlib의 DataFrame API에서 제공하는 모든 알고리즘 명명규칙
- 학습 전 알고리즘 명칭 : Algorithm
- 학습 후 알고리즘 명칭 : AlgorithmModel
val kmeans = new KMeans() .setK(20) .setSeed(1L)
=> 학습 전 알고리즘 명칭 : KMeans / 학습 후 알고리즘 명칭 : KMeansModel
'Spark' 카테고리의 다른 글
[스파크] chap18 모니터링과 디버깅 (0) | 2021.09.15 |
---|---|
[스파크] chap1, chap2 아파치 스파크 (0) | 2021.08.09 |
[파이썬]Fatal Python error: initfsencoding: unable to load the file system codec 에러 (0) | 2021.08.03 |