이전 장에서 스파크 구조의 진화와 정당성에 대해서 이야기 했고, 특히 스파크 SQL 엔진이 상위 수준의 데이터 프레임과 데이터 세트를 위해 통합된 기반을 제공하는지 알아봤다.
이번 4장에서는 데이터 프레임, 스파크 SQL과 그 둘 간의 상호작용에 대해서 알아본다.
4장과 5장에서는 스파크 SQL이 아래 외부 구성요소들과 어떻게 소통하는지 알아본다.
스파크 SQL 특징
- 상위 수준의 정형화 API가 엔진으로 제공
- 다양한 정형 데이터를 읽고 쓰기 가능(Json, 하이브 테이블, Parquet, Avro, ORC, CSV)
- 태블로(Tableau), 파워BI(Power BI), 탈렌드(Talend)와 같은 외부 비즈니스 인텔리전스의 데이터 소스나 MySQL및 PostgreSQL과 같은 RDBMS의 데이터를 JDBC/ODBC 커넥터를 사용하여 쿼리할 수 있음
- 스파크 애플리케이션에서 DB 안에 테이블/뷰로 저장된 정형 데이터와 소통할 수 있도록 프로그래밍 인터페이스 제공
- SQL 쿼리를 정형 데이터에 실행할 수 있는 대화형 셸 제공
- ANSI SQL : 2003 호환 명령 및 HiveQL 지원
스파크 애플리케이션에서 스파크 SQL 사용
SparkSession은 정형화 API로 스파크를 프로그래밍하기 위한 unified entry point(통합된 진입점)을 제공한다. 이를 사용하면 쉽게 클래스를 가져오고 코드에서 인스턴스 생성 가능
SQL 쿼리를 실행은 SparkSession 인스턴스에서 spark.sql("SELECT * FROM TableName") 라는 SQL 함수를 사용한다. 이렇게 실행된 모든 spark.sql 쿼리는 추가 스파크 작업을 수행할 수 있도록 데이터 프레임을 반환한다.
기본 예시
날짜, 지연, 거리, 출발지, 목적지 등 미국 항공편에 대한 데이터세트(https://catalog.data.gov/dataset/airline-on-time-performance-and-causes-of-flight-delays)를 사용하여 쿼리의 예를 확인해보자.
백만 개 이상의 레코드가 있는 CSV 파일로, 스키마를 사용하여 데이터를 데이터프레임으로 읽고, 이를 임시뷰로 등록하여 SQL 쿼리로 처리해본다.
from pyspark.sql import SparkSession
# Create SparkSession
spark = (SparkSession.builder.appName("SparkSQLExampleAPP").getOrCreate())
csv_file = "데이터 경로"
# 읽고 임시뷰 생성
# 스키마 추론
df = (spark.read.format("csv")
.schema("date STRING, delay INT, distance INT, origin STRING, destination STRING")
.option("header", "true")
.load(csv_file))
df.createOrRepalceTempView("us_delay_filghts_tbl")
* 스키마를 지정할 수도 있다. 이는 DDL 형태의 문자열을 사용하면 된다.
그러면 이제 임시뷰를 사용할 수 있고, 스파크 SQL을 사용하여 SQL 쿼리를 실행할 수 있다.
이 미국 항공편 운항 지연 데이터세트는 5개 칼럼을 가진다.
- date(날짜) : 02-19 09:25AM = 02190925 식의 문자열
- delay(지연) : 계획된 도착시간과 실제 도착시간 차이 분, 조기 도착시 음수
- distance(비행거리) : 마일 단위로 출발지-도착지의 거리
- origin(출발지) : 출발지의 IATA 공항 코드
- destination(도착지) : 도착지의 IATA 공항 코드
기본 예제
데이터 프레임 및 데이터세트 API와 마찬가지로 spark.sql 인터페이스를 사용하면 데이터 분석 작업을 수행할 수 있다.
계산은 스파크 SQL 엔진에서 동일한 과정과 결과를 제공한다.
1] 비행거리가 1,000마일 이상인 모든 항공편
spark.sql("SELECT distance, origin, destination FROM us_delay_flights_tbl WHERE distance > 1000 ORDER BY distance DESC").show(10)
spark.sql 인터페이스를 사용한 쿼리이다.
us_delay_flights_tbl 테이블에서 distance, origin, destination 을 추출하는데, 조건은 distance > 1000, 즉, 비행거리가 1000마일 이상인 것을 필터링할 수 있다.
이것을 데이터 프레임 API 쿼리로도 아래와 같이 표현할 수 있다.
df.select("distance", "origin", "destination").where(col("distance") > 1000).orderBy(desc("distance")).show(10, truncate=False)
df.select 단에는 속성(attribute)은 ["distance", "origin", "destination"]
where 절로 col("distance") > 1000 : distance 콜럼의 값을 1000 보다 큰 값
.where(col("distance") > 1000)
.where("distance > 1000")
등은 다양하게 표현이 가능하다.
orderby(desc("distance")) : distance 열의 값을 순차적으로 나열한다.
.orderBy(desc("distance"))
.orderBy("distance", ascending=False)
등으로 표현이 가능하다.
위의 예제에서 볼 수 있는 것과 같이, 스파크 SQL 인터페이스와 일반 SQL을 작성하는 것과 유사하다.
결과값에서 보여주는 것과 같이, 가장 긴 비행은 호놀룰루HNL라는 것을 확인할 수 있었다.
2] 샌프란시스코SFO와 시카고ORD 간 2시간 이상 지연이 있던 항공편
spark.sql("""
SELECT date, delay, origin, destination
FROM us_delay_flights_tbl
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'
ORDER by delay DESC""").show(10)
delay의 값의 단위는 분이기 때문에, 2시간 이상이면 120분으로 생각해주면 된다.
-2시간 이상,,,,,, 이면 불포함인거겠지..? with at least a two-hour delay 번역이 아마 모호하게 된것일지도-
결과값에서 두 도시간 서로 다른 날짜에도 많은 지연 항공편을 확인할 수 있었다.
* 지연이 발생한 날짜와 달을 찾아보면, 항공 지연이 겨울 또는 휴일과 관련되었는지
3] 모든 미국 항공편에 지연에 대한 표시를 레이블로 지정해서 추가하기
spark.sql("""SELECT delay, origin, destination,
CASE
WHEN delay > 360 THEN 'Very Long Delays'
WHEN delay > 120 AND delay < 360 THEN 'Long Delays '
WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
WHEN delay = 0 THEN 'No Delays'
ELSE 'No Delays'
END AS Flight_Delays
FROM us_delay_flights_tbl
ORDER BY origin, delay DESC""").show(10, truncate=False)
이렇게 보여질 수 있다.
이 예제서 확인해볼 수 있듯이 정형 데이터를 쿼리할 수 있도록 스파크는 메모리와 디스크상에서 뷰와 데이블의 생성 및 관리하는 복잡한 작업을 관리한다.
그러면 테이블과 뷰를 생성하고 관리하는 방법은?
SQL 테이블과 뷰
테이블을 데이터를 가진다. 스파크는 각 테이블과 해당 데이터에 관련된 스카마, 설명, 테이블명, DB명, 칼럼명, 파티션, 실제 데이터의 물리적 위치 등 메타데이터를 가지고, 모든 정보는 중앙 메타스토어에 저장된다.
스파크는 스파크 테이블만의 별도 메타스토어를 생성하지 않고 /user/hive/warehouse 에 있는 아파치 하이브 메타스토어를 사용해 테이블에 대한 메타 데이터를 유지한다. 그러나 스파크 구성 변수 spark.sql.warehouse.dir을 로컬 또는 외부 분산 저장소로 설정하여 다른 위치로 기본 경로를 변경할 수 있다.
관리형 테이블과 비관리형 테이블
스파크는 관리형(managed), 비관리형(unmanaged)이라는 두가지 유형의 테이블을 만들 수 있다.
관리형 테이블 : 스파크는 메타데이터와 파일 저장소(로컬 파일 시스템, HDFS, Amazon S3, Azure Blob와 같은 객체 저장소)의 데이터를 모두 관리
비관리형 테이블 : 스파크는 오직 메타데이터만 관리하고 카산드라*[3]와 같은 외부 데이터 소스에서 데이터를 직접 관리
=> DROP TABLE <테이블명> 은 관리형 테이블일 경우, 메타+실제 데이터 모두 삭제, 반면 비관리형 테이블의 경우, 실제데이터는 그대로 두고, 메타데이터만 삭제
SQL 데이터베이스와 테이블 생성하기
스파크는 default DB 안에 테이블을 생성한다. 사용자의 DB를 새로 생성하고 싶으면, 스파크 애플리케이션이나 노트북에서 SQL 명령어를 실행할 수 있다.
초기 설정으로 learn_spark_db 라는 DB를 생성하고, 스파크에게 해당 DB를 사용하겠다 알려야 한다.
그리고 관리형(생성한 DB에서)/비관리형 테이블(접근 가능한 파일 저장소의 파케이, CSV, Json 파일)을 생성할 수 있다.
뷰 생성
테이블을 생성하는 것 외에, 스파크는 기존 테이블을 토대로 뷰를 만들수 있다. 뷰는 전역(해당 클러스터의 모든 SparkSession에서 볼 수 있음) 또는 세션 범위(단일 SparkSession에서만 볼 수 있음)일 수 있으며, 스파크 애플리케이션이 종료시 사라짐
뷰 생성은 DB내에 테이블 생성과 비슷하고, 후에 테이블처럼 쿼리할 수 있다. 차이점은 실제로 데이터를 소유하지 않아, 스파크 애플리케이션 종료 시 테이블은 유지, 뷰는 사라진다.
임시 뷰 vs 전역 임시 뷰
임시 뷰 : 스파크 애플리케이션 내의 단일 SparkSession에 연결
전역 임시 뷰 : 스파크 애플리케이션 내의 여러 SparkSession에서 볼 수 있음
단일 스파크 애플리케이션 내에 여러 SparkSession을 만들 수 있는데, 동일한 하이브 메타스토어 구성을 공유하지 않는 여러 SparkSession에서 같은 데이터에 액세스하고 결합하고자할 때 유용하게 사용가능
메타데이터 보기
스파크는 관리형/비관리형 테이블에 대한 메타데이터를 저장한다.
메타 데이터 저장을 위한 스파크 SQL의 상위 추상화 모듈인 카탈로그에 저장되며, DB, 테이블 및 뷰와 관련된 메타데이터를 검사한다.
SQL 테이블 캐싱
테이블 캐싱 전략, 데이터 프레임처럼 SQL 테이블과 뷰 또한 캐시/언캐싱을 할 수 있다.
테이블은 LAZY로 지정할 수 있으며, 테이블을 바로 캐싱하지 않고 처음 사용되는 시점에서 캐싱한다.
테이블을 데이터 프레임으로 읽기
데이터 엔지니어는 주로 일반적 데이터 수집과 ETL 프로세스 일부로 데이터 파이프라인으르 구축한다. 애플리케이션의 다운스트림에서 사용할 수 있도록 스파크 SQL DB 및 테이블으르 정리된 데이터로 로드한다.
데이터 프레임 및 SQL 테이블을 위한 데이터 소스
스파크 SQL은 다양한 데이터 소스에 대한 인터페이스를 제공한다. 데이터 소스 API를 사용하여 데이터 소스로부터 데이터를 읽고 쓰는 함수를 제공한다.
서로 다른 데이터 소스 간 의사소통하는 방법을 제공하는 상위 수준 데이터 소스 API인 DataFrameReader 및 DataFrameWriter를 알아보자.
DataFrameReader
: 데이터 소스에서 데이터 프레임으로 데이터를 읽기 위한 핵심 구조.
DataFrameReader.format(args).option("key", "value").schema(args).load()
오직 SparkSession 인스턴스를 통해 DataFrameReader에 엑세스할 수 있으며, 개별적으로 만들 수 없다.
SparkSession.read
// or
SparkSession.readStream
read는 정적 데이터 소스에서 DataFrame으로 읽기 위해 DataFrameReader에 대한 핸들을 반환
readStream은 스트리밍 소스에서 읽을 인스턴스를 반환???(책의 후반부 정형화 스트리밍에 대해서 자세히..)
아래 표4-1은 지원하는 인수의 하위 집합을 나열하며 보여준다.
DataFrameWriter
: 지정된 내장 데이터 소스에 데이터를 저장하거나 쓰는 작업을 수행
DataFrameReader와 달리 SparkSession이 아닌 저장하려는 데이터 프레임에서 인스턴스에 액세스가 가능하다.
DataFrameWriter.format(args).option(args).bucketBy(args).partitionBy(args).save(path)
DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)
인스턴스 핸들을 가져오는 것은 아래와 같다.
DataFrame.write
// or
DataFrame.writeStream
DataFrameWriter의 각 함수에 대한 인수는 아래와 같다.
그리고 책에서는
다양한 파일 형식(파케이, JSON, CSV, 에이브로(Avro)*[5], ORC, 이미지, 이진파일)에서 데이터 프레임으로 데이터를 읽는 방법, 내부 데이터 소스에서 임시 뷰 및 테이블 생성 방법을 확인함
[1] DDL(Data Definition Language) : 데이터 정의어
[2] 메타데이터(Metadata) : 일반적으로 데이터에 관한 구조화된 데이터, 대량의 정보 가운데에서 확인하고자 하는 정보를 효율적으로 검색하기 위해 원시데이터(Raw data)를 일정한 규칙에 따라 구조화 혹은 표준화한 정보를 의미
[3] 아파치 카산드라(Apache Cassandra) : 자유 오픈 소스 분산형 NoSQL 데이터베이스 관리 시스템 중 하나
[4] 파케이(parquet) : 하둡에서 칼럼방식으로 저장하는 저장 포맷, 스파크의 기본 데이터 소스 - 많은 빅데이터 처리 프레임워크 및 플랫폼에서 널리 사용되는 파케이는 다양한 I/O 최적화(저장공간 절약 및 데이터 칼럼의 빠른 액세스 허용 압축)를 제공하는 오픈소스 기반 파일 형식
[5] 에이브로(Abro) : 아파치 카프카에서 메시지를 직렬화/역직렬화할 때 사용되고, Json에 대한 직접 매칭, 속도, 효율성, 많은 프로그래밍 언어에 사용할 수 있는 바인딩을 포함한 많은 이점을 제공
요약
데이터 프레임 API와 스파크 SQL 간 상호 운용성에 대해 살펴보았고, 스파크 SQL을 사용하여 이점을 얻을 수 있다.
- 스파크 SQL 및 데이터 프레임 API를 사용하여 관리형 및 비관리형 테이블을 생성 가능
- 다양한 내장 데이터 소스 및 파일 형식 읽고 쓰기
- 스파크 SQL 테이블 또는 뷰로 저장된 정형화 데이터에 spark.sql 프로그래밍 인터페이스를 사용해 SQL 쿼리 실행 가능
- 스파크 카탈로그를 통해 테이블 및 뷰와 관련된 메타데이터 검사 가능
- DataFrameWriter 및 DataFrameReader API 사용
'공부' 카테고리의 다른 글
파이썬의 enumerate() 내장 함수로 for 루프 돌리기 (1) | 2024.09.12 |
---|---|
[SQL} SELECT 문 정리 (FROM, WHERE, GROUP BY, ORDER BY, JOIN) (0) | 2024.09.11 |
[Learning Spark] Databricks 을 통해 notebook 및 cluster 생성 방법 (2) | 2024.09.01 |
SQL INSERT INTO 문 이해하기 (0) | 2024.08.30 |
vpn이란 (0) | 2024.08.30 |