clean data apache spark cheatsheet 2

Cheatsheet วิธีใช้ และเทคนิคใน Pyspark ฉบับสมบูรณ์

Pyspark ถือเป็นเครื่องมือหนึ่งในการทำ Big Data แบบครบวงจร และสามารถนำไปต่อยอดสร้างโมเดล หรือ ทำ Data Visualization ได้อีกมากมาย

ดังนั้นวันนี้เราจะมาเรียนรู้การเขียน Pyspark ที่ใช้บ่อยๆ ตั้งแต่โหลดข้อมูล analyse ไปจนเซฟไฟล์เพื่อนำไปใช้ต่อเลยทีเดียว หวังว่าจะมีประโยชน์สำหรับเพื่อนๆทุกคนนะคะ

Contents hide
2 เทคนิคการใช้ Spark

Pyspark คืออะไร

Pyspark เป็นเครื่องมือหนึ่งที่เกิดจากการรวมตัวกันระหว่าง Apache Spark กับ Python ซึ่งทำให้เราสามารถเขียน Python ใน Spark ได้ หรือเรียกง่ายๆก็คือเป็น Python API ของ Apache Spark นอกจากนี้ Pyspark ยังช่วยให้เราทำงานกับ RDDs ( Resilient Distributed Datasets) ได้ง่ายขึ้น

Pyspark
Python + Spark = Pyspark [ขอบคุณรูปจาก databricks.com]

เทคนิคการใช้ Spark

วิธีเริ่มต้นการใช้ Spark

จุดเริ่มต้นของการใช้ Spark คือเราต้องสร้าง entry point หรือ ประตูที่สามารถให้เราเข้าไปใช้ฟังชั่นต่างๆใน Spark Core ได้ โดยเริ่มจากการอิมพอร์ท SparkContext

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

โดยทั่วไปแล้วเราไม่สามารถรัน Spark Context หลายอันพร้อมกันได้ และถ้ามี SparkContext อยู่แล้วก็สามารถใช้ซ้ำได้

วิธีใช้ Spark SQL

ถ้าเราอยากดึงดาต้ามาใช้ เราก็ต้องอิมพอร์ท SparkSession มาจาก Spark SQL เพื่อที่เราจะทำงานกับดาต้าได้สะดวก ถ้ายังจำได้ในบทความก่อน Apache Spark คืออะไร เครื่องมือ Big Data ที่ไม่รู้จักไม่ได้ เรามีโครงสร้างนึงเรียกว่า Unified Stack ซึ่งในนั้นก็มี Spark SQL ไว้จัดการกับ structured data

from pyspark.sql import SparkSession
# ในกรณีที่ไม่มี spark context อยู่ก่อนแล้ว เราก็สร้างใหม่
if (sc is None):
    sc = SparkContext(master="local[*]")

spark = SparkSession(sparkContext=sc)\
        .builder\
        .appName("How to Spark")\
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

เราเลือกให้ Spark รันบน local ได้ โดย เลือก * เป็นการเลือก processor ทั้งหมด หรือเราสามารถใส่เป็นตัวเลขเพื่อระบุจำนวน processor ได้ นอกจากนี้ยังเลือกตั้งชื่อแอปได้อีก ในที่นี้คือเราใช้ชื่อ “How to Spark”

วิธีที่เราสามารถโหลดดาต้าเข้ามาได้ หลักๆมีด้วยกันสองวิธี ก็คือ โหลดเข้ามาเป็น RDD โดยใช้ SparkCore หรือ โหลดเข้ามาเป็น DataFrame โดยใช้ Spark SQL

วิธีการโหลดไฟล์

วิธีการโหลดไฟล์ ให้เป็น RDD

เราสามารถโหลดไฟล์ ทั้งในฟอร์แมท text file หรือ Comma-separated values (csv) ตามแบบ

# โหลด TXT เป็น RDD
twitter_rdd = sc.textFile('twitter.txt')
# โหลด csv เป็น RDD
mtcars_rdd = sc.textFile('mtcars.csv')

ต้องบอกว่าฟังชั่น textFile() นั้นอ่านไฟล์แล้ว ออกมาเป็น RDD ในรูปแบบ strings

อย่างที่เคยพูดไปว่า Spark SQL สามารถจัดการดาต้าที่เป็นแบบ structured และ semi-structured ได้ structured ดาต้านั้นหมายถึง ดาต้าที่มี schema ซึ่งก็คือชุดดาต้าที่มีโครงสร้างเป็นระบบระเรียบ ทำให้ Spark SQL เข้าใจโครงสร้างนั้น แล้วอ่านออกมาให้เราได้ทั้งชื่อคอลัมม์ และดาต้าในแต่ละแถว

การโหลดดาต้าเข้ามาวิธีที่สองนั้น ทำได้โดยใช้ Spark SQL ซึ่งรองรับไฟล์หลายแบบตามแบบข้างล่างนี้

วิธีการโหลดไฟล์ JSON

เริ่มจากโหลดไฟล์ JSON (JavaScript Object Notation) ซึ่งสามารถทำได้สองทางโดยใช้ read.json หรือ read.load แล้วระบุชนิดของไฟล์

# ทางที่หนึ่ง
customer_df = spark.read.json("customer.json")
# ทางที่สอง
people_df = spark.read.load("people.json", format="json")

วิธีการโหลดไฟล์ Parquet files

การโหลดไฟล์ Parquet files สามารถทำได้โดยใช้ read.load ได้โดยตรง

users_df = spark.read.load("users.parquet")

วิธีการโหลดไฟล์ TXT

การโหลดไฟล์ TXT สามารถทำได้โดยใช้ read.text

people_df = spark.read.text("people.txt")

วิธีการโหลดไฟล์ CSV

การโหลดไฟล์ CSV (comma-separated values) สามารถทำได้โดยใช้ read.csv

mtcars_df = spark.read.csv('mtcars.csv', header=True, inferSchema=True)

วิธีเช็ค Data type

การเช็คชนิดของดาต้านั้น ทำได้โดย ชื่อของดาต้าเฟรม จุด แล้วตามด้วย dtypes ซึ่งก็จะออกมาเป็นชื่อคอลัมน์ และชนิดของมัน เช่นในตัวอย่างข้างล่างนี้ ประกอบไปด้วย สองคอลัมน์ age และ name ชนิดของสองคอลัมน์นี้ก็คือ integer และ string ตามลำดับ

df.dtypes 
# ยกตัวอย่าง
[('age', 'int'), ('name', 'string')]

วิธีการโชว์ DataFrame

วิธีการแสดงดาต้าเฟรม สามารถทำได้หลายวิธีดังต่อไปนี้

# โดยปกติเราจะใช้วิธีแรกเพื่อแสดงเนื้อหา
df.show() 
# ยกตัวอย่างที่ 1
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+

# ยกตัวอย่างที่ 2
customer_df.show()
+--------------------+---+----------+---------+--------------------+
|             address|age| firstName| lastName|         phoneNumber|
+--------------------+---+----------+---------+--------------------+
|[New York,10021,N...| 25|      John|    Smith|[[212 555-1234,ho...|
|[New York,10021,N...| 21|      Jane|      Doe|[[322 888-1234,ho...|
+--------------------+---+----------+---------+--------------------+

# แสดง n แถวแรก
df.head(n)
# ยกตัวอย่าง
Row(age=30, name=u'Andy')

# แสดงแถวแรกเท่านั้น
df.first()

# แสดง n แถวแรกเท่านั้น โดยแสดงออกมาเป็น list ของแถว
df.take(2) 
# ยกตัวอย่าง
[Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]

สังเกตว่า คำสั่ง head() และ take() จะให้ผลที่ไม่ต่างกันนัก แต่ head() ควรจะใช้ในกรณีที่ข้อมูลน้อยกว่าเพราะดาต้าทั้งหมดต้องโหลดใน memory

วิธีการแสดง schema

ถ้าอยากแสดง schema ดูว่าคอลัมน์มีอะไรบ้าง ชนิดอะไร มีค่าที่เป็น null ของดาต้าเฟรมนี้ไหม ทำได้โดย

df.printSchema()
# ยกตัวอย่าง
root
|-- age: int (nullable = true)
|-- name: string (nullable = true)

วิธีแสดง statistics (count, mean, stddev, min, max)

สมมติว่าเราอยากรู้สถิติของข้อมูลว่าแต่ละคอลัมน์เป็นอย่างไร ทำได้โดยใช้ describe() ตามด้วย show() เพื่อแสดงข้อมูล

df.describe().show() 
# ยกตัวอย่าง
+-------+------------------+------+
|summary|               age|  name|
+-------+------------------+------+
|  count|                 3|     3|
|   mean|              24.5|  null|
| stddev|2.1213203435596424|  null|
|    min|                19|Justin|
|    max|                30|  Andy|
+-------+------------------+------+
# ในกรณีที่ต้องการให้แสดงเฉพาะคอลัมน์ก็ใส่ชื่อเข้าไปได้
df.describe(['age']).show()

วิธีแสดง column

ถ้าอยากรู้ว่าคอลัมน์มีอะไรบ้าง โดยที่จะแสดงชื่อคอลัมน์ออกมาเป็น list เราสามารถแสดงได้โดย

df.columns 
# ยกตัวอย่าง
['age', 'name']

วิธีนับจำนวนแถว

ถ้าอยากรู้ว่าดาต้าเฟรมของเรามีจำนวนแถวทั้งหมดกี่แถว ทำได้โดยใช้

df.count() 

วิธีนับแถว unique

การนับจำนวนแถวที่ unique ทำได้โดย

df.distinct().count() 

วิธีการดรอป ค่าซ้ำ

สมมติว่ามีการใส่ค่าผิด อย่างเช่น ใส่อายุผิด โดยที่ชื่อและ ความสูงถูก ทำให้มีแถวที่ซ้ำกันสองแถว เราสามารถดรอปแถวที่ซ้ำได้โดย

df.dropDuplicates()

# ยกตัวอย่าง ให้ดาต้าเฟรมเต็มๆเป็นแบบนี้
+---+------+-----+
|age|height| name|
+---+------+-----+
| 30|    80| Andy|
| 30|    80| Andy|
| 50|    80| Andy|
+---+------+-----+

# พอเราดรอปค่าซ้ำ
df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 30|    80| Andy|
| 50|    80| Andy|
+---+------+-----+

# ถ้สต้องการดรอปโดบเจาะจงคอลัมน์
df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 30|    80| Andy|
+---+------+-----+

หมายเหตุ dropDuplicates() ให้ผลเหมือนกับ drop_duplicates()

วิธีการดรอป NA

การดรอป null นั้นทำได้โดยสองวิธีข้างล่าง

# ทางที่หนึ่ง
df.dropna()

# ทางที่สอง
df.na.drop()

วิธีการเลือกข้อมูล

การเลือกข้อมูลนั้นทำได้หลายวิธี โดยใช้ Queries ได้ดังต่อไปนี้

วิธีการเลือกข้อมูลโดยใช้ Select

หลังจากเราโหลดดาตาเฟรม ตรวจสอบดาต้ากันไปสักพักว่ามีหน้าตาเป็นยังไงบ้าง วิธีการเรียกดาต้าจากดาต้าเฟรมแบบเฉพาะเจาะจงสามารถทำได้โดยใช้คำสั่ง select() ซึ่งเป็นคำสั่งที่สามารถเลือกชื่อคอลัมน์ที่เราต้องการจะดูได้ นอกจากนี้ยังจัดการดาต้าเพิ่มได้ ยกตัวอย่างการเพิ่มอายุให้ทุกคนตามแบบข้างล่างนี้

# เลือกคอลัมน์ name อย่างเดียว
df.select("name").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

# เลือกทั้งสองคอลัมน์ โดยบวกอายุเพิ่มให้ทุกคน 1 ปี
df.select(df['name'], df['age'] + 1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

วิธีการเลือกข้อมูลโดยใช้ When

when เป็นคำสั่งหนึ่งที่ไว้ใช้เลือกในกรณีที่มี condition เข้ามาด้วย ยกตัวอย่างตามด้านล่าง คืออยากรู้ว่าใครอายุมากกว่า 20 บ้าง ให้แสดงเป็น 1 และให้แสดงเป็น 0 โดยใช้ otherwise แต่ถ้าเราไม่ใส่ otherwise ผลข้างล่างนี้ก็จะแสดงแค่คนที่มีอายุมากกว่า 20 ซึ่งก็คือแถวแรกแถวเดียว

นอกจากนี้เรายังสามารถใช้ when ซ้อน when ในกรณีที่มีหลาย conditions ตามตัวอย่างที่สอง

from pyspark.sql import functions as F
# ตัวอย่าง 1 
df.select(df.name, F.when(df.age > 20, 1).otherwise(0)).show()
+-------+-------------------------------------+
|   name|CASE WHEN (age > 20)THEN 1 ELSE 0 END|
+-------+-------------------------------------+
|   Andy|                                    1|
|Michael|                                    0|
+-------+-------------------------------------+

# ตัวอย่าง 2
df.select(df.name, F.when(df.age > 18, 1).when(df.age < 40, -1).otherwise(0)).show()

วิธีการเลือกข้อมูลโดยใช้ Like

like เป็นคำสั่งเมื่อเราต้องการเทียบ หรือจับคู่เหมือน ยกตัวอย่างเช่นการหาคนชื่อซ้ำ หรือหาคนนามสกุลเดียวกัน ยกตัวอย่างตามข้างล่าง คือให้แสดง ชื่อ แล้วก็นามสกุล สำหรับคนที่มีนามสกุล Smith

df.select("firstName", df.lastName.like("Smith")).show()

วิธีการเลือกข้อมูลโดยใช้ Startswith หรือ Endwith

คำสั่งนี้ก็จะคล้ายๆกันกับ like ใช้ในกรณีที่ต้องการจับคู่คำขึ้นต้นหรือ คำลงท้าย โดยตามตัวอย่างนี้จะเป็นการหาคนที่มีนามสกุลขึ้นต้นด้วย ‘Sm’ ในตัวอย่างแรก และหาคนที่มีนามสกุลลงท้ายด้วย ‘th’ ในตัวอย่างที่สอง

# ตัวอย่าง 1 
df.select("firstName", df.lastName.startswith("Sm")).show()
# ตัวอย่าง 2
df.select(df.lastName.endswith("th")).show()

วิธีการใช้ Substring

substring เป็นคำสั่งที่ไว้ใช้เลือกดาต้าในส่วนย่อยเข้าไปอีก โดยเลือกคอลัมน์ที่อยากให้แสดง ตามด้วยฟังชั่น substr แล้วระบุจำนวนตัวอักษรว่าให้เริ่มจากตัวที่เท่าไหร่ จบที่ตัวที่เท่าไหร่ โดยนับจากตัวแรก ตามตัวอย่างก็คือ จากตัวที่หนึ่งถึงตัวที่สาม

df.select(df.firstName.substr(1, 3).alias("name")).collect()
[Row(name='And'), Row(name='Mic')]

ทุกคนอาจจะสงสัยว่า alias เอาไว้ทำอะไร จะสังเกตได้ว่ามันสามารถเอาไว้เปลี่ยนชื่อคอลัมน์ตอนแสดงออกมาได้ จาก first name เป็น name

วิธีการใช้ Between

between เป็นการเลือกข้อมูลในช่วงนับตั้งแต่ขอบเขตด้านล่าง ขึ้นไปจนถึงขอบเขตด้านบน และจะแสดงผลออกมาเป็น boolean หรือว่า true/false นั่นเอง

df.select(df.name, df.age.between(2, 4)).show()
+------+-----------------------------+
|  name|((age >= 19) AND (age <= 25))|
+------+-----------------------------+
|  Andy|                        false|
|Justin|                         true|
+------+-----------------------------+

วิธีการ เพิ่ม, ลบ, อัพเดท คอลัมน์ใน DataFrame

นอกจากการใช้ Queries เรียกดูดาต้าแล้ว เรายังสามารถจัดการ เพิ่ม ลด เปลี่ยนชื่อ คอลัมน์ได้ดังต่อไปนี้

วิธีการเพิ่มคอลัมน์

ถ้าเราอยากเพิ่มคอลัมน์ใหม่ ทำได้โดยใช้ withColumn ตามด้วยชื่อคอลัมน์ที่เราจะตั้ง แล้วก็ค่าในคอลัมน์นั้น เราสามารถใช้ค่าจากคอลัมน์ที่มีอยู่แล้วก็ได้ ขึ้นอยู่กับลักษณะข้อมูล ยกตัวอย่างตามข้างล่าง ตัวอย่าง 1 เป็นการสร้างคอลัมน์ชื่อ age2 โดยการเพิ่มอายุจากคอลัมน์ age ไปอีก 2 ปี

สำหรับตัวอย่าง 2 อันนี้จะเป็นการสร้างคอลัมน์ใหม่ให้กับ city, postal code และ state โดยเลือกมาจากแต่ละส่วนของ address

# ตัวอย่าง 1 
df.withColumn('age2', df.age + 2)

# ตัวอย่าง 2 
df.withColumn('city',df.address.city) \
 .withColumn('postalCode',df.address.postalCode) \
 .withColumn('state',df.address.state) 

วิธีการลบคอลัมน์

ถ้าอยากลบคอลัมน์ที่ไม่ต้องการ ก็ทำได้ง่ายๆ โดยใช้ drop แล้วตามด้วยชื่อคอลัมน์ ยกตัวอย่างการดรอป 2 คอลัมน์ address และ phone number ตามข้างล่างนี้ โดยที่เราสามารถทำการ ดรอปซ้อนดรอปได้ตามตัวอย่างที่ 2

# ตัวอย่าง 1 
df.drop("address", "phoneNumber")
# ตัวอย่าง 2
df.drop(df.address).drop(df.phoneNumber)

วิธีการอัพเดทคอลัมน์

ถ้าอยากเปลี่ยนชื่อคอลัมน์ ก็ทำได้โดยใช้ withColumnRenamed ระบุชื่อคอลัมน์ที่ต้องการจะเปลี่ยนตามด้วยชื่อใหม่ ในกรณีนี้เป็นการเปลี่ยน telePhoneNumber เป็น phoneNumber

df = df.withColumnRenamed('telePhoneNumber', 'phoneNumber')

วิธีการใช้ Operation จัดกลุ่ม กรอง เรียง ข้อมูล

เมื่อเรารู้วิธีการจัดการคอลัมน์กันแล้ว ก็มาดูวิธีการใช้ operation ต่างๆโดยเราสามารถจัดกลุ่ม กรองดาต้าตามเงื่อนไข หรือแม้แต่เรียงลำดับได้ ถึงแม้ operation พวกนี้จะดูใช้ง่าย แต่ก็มีประโยชน์มากเวลาเราวิเคราะห์ข้อมูลค่ะ

วิธีการจัดกลุ่มโดย GroupBy

groupBy เป็นคำสั่งนึงที่มีประโยชน์มากๆ โดยที่เราสามารถจัดกลุ่มดาต้า แล้วหาจำนวนในแต่ละกลุ่ม หรือค่าเฉลี่ยได้ ยกตัวอย่าง สมมติว่าเราอยากรู้ว่าคนอายุเท่านี้ มีทั้งหมดกี่คน ก็ทำได้โดยให้ groupBy อายุ แล้วนับจำนวนด้วย count

# ยกตัวอย่าง นับจำนวนคนโดยแต่ละอายุ
df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

วิธีการกรองข้อมูลโดยใช้ Filter

filter เป็นอีกคำสั่งนึงที่เราสามารถกำหนด condition ได้ ยกตัวอย่างเช่น การเลือกคนทั้งหมดที่มีอายุมากกว่า น้อยกว่า หรือเท่ากับ ตัวเลขหนึ่ง โดยจะแสดงดาต้าเฟรมออกมาเฉพาะแถวที่เข้า condition นั้น

# ยกตัวอย่าง
df.filter(df['age'] > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

วิธีการเรียงลำดับข้อมูลโดยใช้ Sort

การเรียงลำดับข้อมูล จากมากไปน้อย (descending) หรือ จากน้อยไปมาก (ascending) ทำได้ตามความต้องการของเราตามวิธีดังต่อไปนี้

# วิธีที่ 1 จากมากไปน้อย
df.sort(df.age.desc())

# วิธีที่ 2 จากมากไปน้อย โดยให้น้อยไปมาก เป็น false
df.sort("age", ascending=False)

# วิธีที่ 3 จากมากไปน้อยโดย age, จากน้อยไปมากโดย city
df.orderBy(["age","city"],ascending=[0,1])

การเซฟไฟล์

วิธีการเปลี่ยนชนิดของข้อมูล

เราสามารถเล่นกับข้อมูลโดยเปลี่ยน ชนิดของดาต้าไปมาได้ ตั้งแต่เปลี่ยนดาต้าเฟรมเป็น RDD หรือ ให้อยู่ในรูปแบบ Pandas นอกจากเปลี่ยนไปแล้วก็ยังเปลี่ยนกลับได้ด้วย ยกตัวอย่างเช่น

# เปลี่ยน dataframe เป็น RDD
rdd1 = df.rdd 

# เปลี่ยน dataframe เป็น string RDD
df.toJSON().first() 

# ทำให้ spark dataframe อยู่ในรูปแบบ pandas dataframe
df.toPandas()

# ทำให้ pandas dataframe อยู่ในรูปแบบ spark dataframe
df = spark.createDataFrame(pandas_df)

สำหรับคนที่สนใจว่า Pandas คืออะไร นำไปทำอะไรกับดาต้าเฟรมได้อีกนั้น สามารถเข้าไปดูบทความของเรา Cheatsheet วิธีใช้ และเทคนิคใน Pandas (Python) ฉบับสมบูรณ์ ได้เลยค่า

วิธีการเซฟไฟล์เป็น Parquet files

ถ้าเราอยากเซฟไฟล์เป็น parquet สามารถทำได้โดยใช้ write.save แล้วตามด้วยชื่อ

df.write.save("newFile.parquet")

วิธีการเซฟไฟล์เป็น JSON

ในกรณีของ json สามารถทำได้โดยใช้ write.save แล้วตามด้วยชื่อ จากนั้นระบุ format เข้าไปด้วย

df.write.save("newFile.json",format="json")

วิธีหยุดใช้งาน Spark

ในกรณีที่อยากหยุดการทำงานของ Spark ทำได้โดย

spark.stop()

สรุปการใช้งาน Pyspark

จากเทคนิคต่าง ๆ ด้านบน จะเห็นว่า Pyspark การใช้งานนั้นไม่ได้ยากอย่างที่คิดใช่ไหมคะ เท่านี้ทุกคนได้รู้เทคนิคการจัดการ Big Data แล้ว หวังว่าบทความนี้จะมีประโยชน์กับท่านที่กำลังหัดใช้งาน Pyspark กันอยู่นะคะ สุดท้ายนี้อ่านบทความแนะนำเพิ่มเติมสำหรับ Big Data ได้ตามนี้เลย

ส่วนถ้าใครคิดว่าบทความนี้มีประโยชน์ อยากฝากให้ช่วยแชร์ให้เพื่อน ๆ หน่อยนะคะ และถ้าอยากติดตามบทความดี ๆ ด้าน Data กันบน Facebook หรืออยากมาพูดคุย ติชมกัน เชิญได้ที่ Facebook Page: DataTH – Data Science ชิลชิล เลยนะค้าา

แล้วพบกันใหม่บทความหน้าค่ะ

บทความแนะนำอื่น ๆ