Pyspark ถือเป็นเครื่องมือหนึ่งในการทำ Big Data แบบครบวงจร และสามารถนำไปต่อยอดสร้างโมเดล หรือ ทำ Data Visualization ได้อีกมากมาย
ดังนั้นวันนี้เราจะมาเรียนรู้การเขียน Pyspark ที่ใช้บ่อยๆ ตั้งแต่โหลดข้อมูล analyse ไปจนเซฟไฟล์เพื่อนำไปใช้ต่อเลยทีเดียว หวังว่าจะมีประโยชน์สำหรับเพื่อนๆทุกคนนะคะ
Pyspark คืออะไร
Pyspark เป็นเครื่องมือหนึ่งที่เกิดจากการรวมตัวกันระหว่าง Apache Spark กับ Python ซึ่งทำให้เราสามารถเขียน Python ใน Spark ได้ หรือเรียกง่ายๆก็คือเป็น Python API ของ Apache Spark นอกจากนี้ Pyspark ยังช่วยให้เราทำงานกับ RDDs ( Resilient Distributed Datasets) ได้ง่ายขึ้น
เทคนิคการใช้ Spark
วิธีเริ่มต้นการใช้ Spark
จุดเริ่มต้นของการใช้ Spark คือเราต้องสร้าง entry point หรือ ประตูที่สามารถให้เราเข้าไปใช้ฟังชั่นต่างๆใน Spark Core ได้ โดยเริ่มจากการอิมพอร์ท SparkContext
from pyspark import SparkContext sc = SparkContext.getOrCreate()
โดยทั่วไปแล้วเราไม่สามารถรัน Spark Context หลายอันพร้อมกันได้ และถ้ามี SparkContext อยู่แล้วก็สามารถใช้ซ้ำได้
วิธีใช้ Spark SQL
ถ้าเราอยากดึงดาต้ามาใช้ เราก็ต้องอิมพอร์ท SparkSession มาจาก Spark SQL เพื่อที่เราจะทำงานกับดาต้าได้สะดวก ถ้ายังจำได้ในบทความก่อน Apache Spark คืออะไร เครื่องมือ Big Data ที่ไม่รู้จักไม่ได้ เรามีโครงสร้างนึงเรียกว่า Unified Stack ซึ่งในนั้นก็มี Spark SQL ไว้จัดการกับ structured data
from pyspark.sql import SparkSession # ในกรณีที่ไม่มี spark context อยู่ก่อนแล้ว เราก็สร้างใหม่ if (sc is None): sc = SparkContext(master="local[*]") spark = SparkSession(sparkContext=sc)\ .builder\ .appName("How to Spark")\ .config("spark.some.config.option", "some-value") \ .getOrCreate()
เราเลือกให้ Spark รันบน local ได้ โดย เลือก * เป็นการเลือก processor ทั้งหมด หรือเราสามารถใส่เป็นตัวเลขเพื่อระบุจำนวน processor ได้ นอกจากนี้ยังเลือกตั้งชื่อแอปได้อีก ในที่นี้คือเราใช้ชื่อ “How to Spark”
วิธีที่เราสามารถโหลดดาต้าเข้ามาได้ หลักๆมีด้วยกันสองวิธี ก็คือ โหลดเข้ามาเป็น RDD โดยใช้ SparkCore หรือ โหลดเข้ามาเป็น DataFrame โดยใช้ Spark SQL
วิธีการโหลดไฟล์
วิธีการโหลดไฟล์ ให้เป็น RDD
เราสามารถโหลดไฟล์ ทั้งในฟอร์แมท text file หรือ Comma-separated values (csv) ตามแบบ
# โหลด TXT เป็น RDD twitter_rdd = sc.textFile('twitter.txt') # โหลด csv เป็น RDD mtcars_rdd = sc.textFile('mtcars.csv')
ต้องบอกว่าฟังชั่น textFile() นั้นอ่านไฟล์แล้ว ออกมาเป็น RDD ในรูปแบบ strings
อย่างที่เคยพูดไปว่า Spark SQL สามารถจัดการดาต้าที่เป็นแบบ structured และ semi-structured ได้ structured ดาต้านั้นหมายถึง ดาต้าที่มี schema ซึ่งก็คือชุดดาต้าที่มีโครงสร้างเป็นระบบระเรียบ ทำให้ Spark SQL เข้าใจโครงสร้างนั้น แล้วอ่านออกมาให้เราได้ทั้งชื่อคอลัมม์ และดาต้าในแต่ละแถว
การโหลดดาต้าเข้ามาวิธีที่สองนั้น ทำได้โดยใช้ Spark SQL ซึ่งรองรับไฟล์หลายแบบตามแบบข้างล่างนี้
วิธีการโหลดไฟล์ JSON
เริ่มจากโหลดไฟล์ JSON (JavaScript Object Notation) ซึ่งสามารถทำได้สองทางโดยใช้ read.json หรือ read.load แล้วระบุชนิดของไฟล์
# ทางที่หนึ่ง customer_df = spark.read.json("customer.json") # ทางที่สอง people_df = spark.read.load("people.json", format="json")
วิธีการโหลดไฟล์ Parquet files
การโหลดไฟล์ Parquet files สามารถทำได้โดยใช้ read.load ได้โดยตรง
users_df = spark.read.load("users.parquet")
วิธีการโหลดไฟล์ TXT
การโหลดไฟล์ TXT สามารถทำได้โดยใช้ read.text
people_df = spark.read.text("people.txt")
วิธีการโหลดไฟล์ CSV
การโหลดไฟล์ CSV (comma-separated values) สามารถทำได้โดยใช้ read.csv
mtcars_df = spark.read.csv('mtcars.csv', header=True, inferSchema=True)
วิธีเช็ค Data type
การเช็คชนิดของดาต้านั้น ทำได้โดย ชื่อของดาต้าเฟรม จุด แล้วตามด้วย dtypes ซึ่งก็จะออกมาเป็นชื่อคอลัมน์ และชนิดของมัน เช่นในตัวอย่างข้างล่างนี้ ประกอบไปด้วย สองคอลัมน์ age และ name ชนิดของสองคอลัมน์นี้ก็คือ integer และ string ตามลำดับ
df.dtypes # ยกตัวอย่าง [('age', 'int'), ('name', 'string')]
วิธีการโชว์ DataFrame
วิธีการแสดงดาต้าเฟรม สามารถทำได้หลายวิธีดังต่อไปนี้
# โดยปกติเราจะใช้วิธีแรกเพื่อแสดงเนื้อหา df.show() # ยกตัวอย่างที่ 1 +----+-------+ | age| name| +----+-------+ | 30| Andy| | 19| Justin| |null|Michael| +----+-------+ # ยกตัวอย่างที่ 2 customer_df.show() +--------------------+---+----------+---------+--------------------+ | address|age| firstName| lastName| phoneNumber| +--------------------+---+----------+---------+--------------------+ |[New York,10021,N...| 25| John| Smith|[[212 555-1234,ho...| |[New York,10021,N...| 21| Jane| Doe|[[322 888-1234,ho...| +--------------------+---+----------+---------+--------------------+ # แสดง n แถวแรก df.head(n) # ยกตัวอย่าง Row(age=30, name=u'Andy') # แสดงแถวแรกเท่านั้น df.first() # แสดง n แถวแรกเท่านั้น โดยแสดงออกมาเป็น list ของแถว df.take(2) # ยกตัวอย่าง [Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]
สังเกตว่า คำสั่ง head() และ take() จะให้ผลที่ไม่ต่างกันนัก แต่ head() ควรจะใช้ในกรณีที่ข้อมูลน้อยกว่าเพราะดาต้าทั้งหมดต้องโหลดใน memory
วิธีการแสดง schema
ถ้าอยากแสดง schema ดูว่าคอลัมน์มีอะไรบ้าง ชนิดอะไร มีค่าที่เป็น null ของดาต้าเฟรมนี้ไหม ทำได้โดย
df.printSchema() # ยกตัวอย่าง root |-- age: int (nullable = true) |-- name: string (nullable = true)
วิธีแสดง statistics (count, mean, stddev, min, max)
สมมติว่าเราอยากรู้สถิติของข้อมูลว่าแต่ละคอลัมน์เป็นอย่างไร ทำได้โดยใช้ describe() ตามด้วย show() เพื่อแสดงข้อมูล
df.describe().show() # ยกตัวอย่าง +-------+------------------+------+ |summary| age| name| +-------+------------------+------+ | count| 3| 3| | mean| 24.5| null| | stddev|2.1213203435596424| null| | min| 19|Justin| | max| 30| Andy| +-------+------------------+------+ # ในกรณีที่ต้องการให้แสดงเฉพาะคอลัมน์ก็ใส่ชื่อเข้าไปได้ df.describe(['age']).show()
วิธีแสดง column
ถ้าอยากรู้ว่าคอลัมน์มีอะไรบ้าง โดยที่จะแสดงชื่อคอลัมน์ออกมาเป็น list เราสามารถแสดงได้โดย
df.columns # ยกตัวอย่าง ['age', 'name']
วิธีนับจำนวนแถว
ถ้าอยากรู้ว่าดาต้าเฟรมของเรามีจำนวนแถวทั้งหมดกี่แถว ทำได้โดยใช้
df.count()
วิธีนับแถว unique
การนับจำนวนแถวที่ unique ทำได้โดย
df.distinct().count()
วิธีการดรอป ค่าซ้ำ
สมมติว่ามีการใส่ค่าผิด อย่างเช่น ใส่อายุผิด โดยที่ชื่อและ ความสูงถูก ทำให้มีแถวที่ซ้ำกันสองแถว เราสามารถดรอปแถวที่ซ้ำได้โดย
df.dropDuplicates() # ยกตัวอย่าง ให้ดาต้าเฟรมเต็มๆเป็นแบบนี้ +---+------+-----+ |age|height| name| +---+------+-----+ | 30| 80| Andy| | 30| 80| Andy| | 50| 80| Andy| +---+------+-----+ # พอเราดรอปค่าซ้ำ df.dropDuplicates().show() +---+------+-----+ |age|height| name| +---+------+-----+ | 30| 80| Andy| | 50| 80| Andy| +---+------+-----+ # ถ้สต้องการดรอปโดบเจาะจงคอลัมน์ df.dropDuplicates(['name', 'height']).show() +---+------+-----+ |age|height| name| +---+------+-----+ | 30| 80| Andy| +---+------+-----+
หมายเหตุ dropDuplicates() ให้ผลเหมือนกับ drop_duplicates()
วิธีการดรอป NA
การดรอป null นั้นทำได้โดยสองวิธีข้างล่าง
# ทางที่หนึ่ง df.dropna() # ทางที่สอง df.na.drop()
วิธีการเลือกข้อมูล
การเลือกข้อมูลนั้นทำได้หลายวิธี โดยใช้ Queries ได้ดังต่อไปนี้
วิธีการเลือกข้อมูลโดยใช้ Select
หลังจากเราโหลดดาตาเฟรม ตรวจสอบดาต้ากันไปสักพักว่ามีหน้าตาเป็นยังไงบ้าง วิธีการเรียกดาต้าจากดาต้าเฟรมแบบเฉพาะเจาะจงสามารถทำได้โดยใช้คำสั่ง select() ซึ่งเป็นคำสั่งที่สามารถเลือกชื่อคอลัมน์ที่เราต้องการจะดูได้ นอกจากนี้ยังจัดการดาต้าเพิ่มได้ ยกตัวอย่างการเพิ่มอายุให้ทุกคนตามแบบข้างล่างนี้
# เลือกคอลัมน์ name อย่างเดียว df.select("name").show() +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ # เลือกทั้งสองคอลัมน์ โดยบวกอายุเพิ่มให้ทุกคน 1 ปี df.select(df['name'], df['age'] + 1).show() +-------+---------+ | name|(age + 1)| +-------+---------+ |Michael| null| | Andy| 31| | Justin| 20| +-------+---------+
วิธีการเลือกข้อมูลโดยใช้ When
when เป็นคำสั่งหนึ่งที่ไว้ใช้เลือกในกรณีที่มี condition เข้ามาด้วย ยกตัวอย่างตามด้านล่าง คืออยากรู้ว่าใครอายุมากกว่า 20 บ้าง ให้แสดงเป็น 1 และให้แสดงเป็น 0 โดยใช้ otherwise แต่ถ้าเราไม่ใส่ otherwise ผลข้างล่างนี้ก็จะแสดงแค่คนที่มีอายุมากกว่า 20 ซึ่งก็คือแถวแรกแถวเดียว
นอกจากนี้เรายังสามารถใช้ when ซ้อน when ในกรณีที่มีหลาย conditions ตามตัวอย่างที่สอง
from pyspark.sql import functions as F # ตัวอย่าง 1 df.select(df.name, F.when(df.age > 20, 1).otherwise(0)).show() +-------+-------------------------------------+ | name|CASE WHEN (age > 20)THEN 1 ELSE 0 END| +-------+-------------------------------------+ | Andy| 1| |Michael| 0| +-------+-------------------------------------+ # ตัวอย่าง 2 df.select(df.name, F.when(df.age > 18, 1).when(df.age < 40, -1).otherwise(0)).show()
วิธีการเลือกข้อมูลโดยใช้ Like
like เป็นคำสั่งเมื่อเราต้องการเทียบ หรือจับคู่เหมือน ยกตัวอย่างเช่นการหาคนชื่อซ้ำ หรือหาคนนามสกุลเดียวกัน ยกตัวอย่างตามข้างล่าง คือให้แสดง ชื่อ แล้วก็นามสกุล สำหรับคนที่มีนามสกุล Smith
df.select("firstName", df.lastName.like("Smith")).show()
วิธีการเลือกข้อมูลโดยใช้ Startswith หรือ Endwith
คำสั่งนี้ก็จะคล้ายๆกันกับ like ใช้ในกรณีที่ต้องการจับคู่คำขึ้นต้นหรือ คำลงท้าย โดยตามตัวอย่างนี้จะเป็นการหาคนที่มีนามสกุลขึ้นต้นด้วย ‘Sm’ ในตัวอย่างแรก และหาคนที่มีนามสกุลลงท้ายด้วย ‘th’ ในตัวอย่างที่สอง
# ตัวอย่าง 1 df.select("firstName", df.lastName.startswith("Sm")).show() # ตัวอย่าง 2 df.select(df.lastName.endswith("th")).show()
วิธีการใช้ Substring
substring เป็นคำสั่งที่ไว้ใช้เลือกดาต้าในส่วนย่อยเข้าไปอีก โดยเลือกคอลัมน์ที่อยากให้แสดง ตามด้วยฟังชั่น substr แล้วระบุจำนวนตัวอักษรว่าให้เริ่มจากตัวที่เท่าไหร่ จบที่ตัวที่เท่าไหร่ โดยนับจากตัวแรก ตามตัวอย่างก็คือ จากตัวที่หนึ่งถึงตัวที่สาม
df.select(df.firstName.substr(1, 3).alias("name")).collect() [Row(name='And'), Row(name='Mic')]
ทุกคนอาจจะสงสัยว่า alias เอาไว้ทำอะไร จะสังเกตได้ว่ามันสามารถเอาไว้เปลี่ยนชื่อคอลัมน์ตอนแสดงออกมาได้ จาก first name เป็น name
วิธีการใช้ Between
between เป็นการเลือกข้อมูลในช่วงนับตั้งแต่ขอบเขตด้านล่าง ขึ้นไปจนถึงขอบเขตด้านบน และจะแสดงผลออกมาเป็น boolean หรือว่า true/false นั่นเอง
df.select(df.name, df.age.between(2, 4)).show() +------+-----------------------------+ | name|((age >= 19) AND (age <= 25))| +------+-----------------------------+ | Andy| false| |Justin| true| +------+-----------------------------+
วิธีการ เพิ่ม, ลบ, อัพเดท คอลัมน์ใน DataFrame
นอกจากการใช้ Queries เรียกดูดาต้าแล้ว เรายังสามารถจัดการ เพิ่ม ลด เปลี่ยนชื่อ คอลัมน์ได้ดังต่อไปนี้
วิธีการเพิ่มคอลัมน์
ถ้าเราอยากเพิ่มคอลัมน์ใหม่ ทำได้โดยใช้ withColumn ตามด้วยชื่อคอลัมน์ที่เราจะตั้ง แล้วก็ค่าในคอลัมน์นั้น เราสามารถใช้ค่าจากคอลัมน์ที่มีอยู่แล้วก็ได้ ขึ้นอยู่กับลักษณะข้อมูล ยกตัวอย่างตามข้างล่าง ตัวอย่าง 1 เป็นการสร้างคอลัมน์ชื่อ age2 โดยการเพิ่มอายุจากคอลัมน์ age ไปอีก 2 ปี
สำหรับตัวอย่าง 2 อันนี้จะเป็นการสร้างคอลัมน์ใหม่ให้กับ city, postal code และ state โดยเลือกมาจากแต่ละส่วนของ address
# ตัวอย่าง 1 df.withColumn('age2', df.age + 2) # ตัวอย่าง 2 df.withColumn('city',df.address.city) \ .withColumn('postalCode',df.address.postalCode) \ .withColumn('state',df.address.state)
วิธีการลบคอลัมน์
ถ้าอยากลบคอลัมน์ที่ไม่ต้องการ ก็ทำได้ง่ายๆ โดยใช้ drop แล้วตามด้วยชื่อคอลัมน์ ยกตัวอย่างการดรอป 2 คอลัมน์ address และ phone number ตามข้างล่างนี้ โดยที่เราสามารถทำการ ดรอปซ้อนดรอปได้ตามตัวอย่างที่ 2
# ตัวอย่าง 1 df.drop("address", "phoneNumber") # ตัวอย่าง 2 df.drop(df.address).drop(df.phoneNumber)
วิธีการอัพเดทคอลัมน์
ถ้าอยากเปลี่ยนชื่อคอลัมน์ ก็ทำได้โดยใช้ withColumnRenamed ระบุชื่อคอลัมน์ที่ต้องการจะเปลี่ยนตามด้วยชื่อใหม่ ในกรณีนี้เป็นการเปลี่ยน telePhoneNumber เป็น phoneNumber
df = df.withColumnRenamed('telePhoneNumber', 'phoneNumber')
วิธีการใช้ Operation จัดกลุ่ม กรอง เรียง ข้อมูล
เมื่อเรารู้วิธีการจัดการคอลัมน์กันแล้ว ก็มาดูวิธีการใช้ operation ต่างๆโดยเราสามารถจัดกลุ่ม กรองดาต้าตามเงื่อนไข หรือแม้แต่เรียงลำดับได้ ถึงแม้ operation พวกนี้จะดูใช้ง่าย แต่ก็มีประโยชน์มากเวลาเราวิเคราะห์ข้อมูลค่ะ
วิธีการจัดกลุ่มโดย GroupBy
groupBy เป็นคำสั่งนึงที่มีประโยชน์มากๆ โดยที่เราสามารถจัดกลุ่มดาต้า แล้วหาจำนวนในแต่ละกลุ่ม หรือค่าเฉลี่ยได้ ยกตัวอย่าง สมมติว่าเราอยากรู้ว่าคนอายุเท่านี้ มีทั้งหมดกี่คน ก็ทำได้โดยให้ groupBy อายุ แล้วนับจำนวนด้วย count
# ยกตัวอย่าง นับจำนวนคนโดยแต่ละอายุ df.groupBy("age").count().show() +----+-----+ | age|count| +----+-----+ | 19| 1| |null| 1| | 30| 1| +----+-----+
วิธีการกรองข้อมูลโดยใช้ Filter
filter เป็นอีกคำสั่งนึงที่เราสามารถกำหนด condition ได้ ยกตัวอย่างเช่น การเลือกคนทั้งหมดที่มีอายุมากกว่า น้อยกว่า หรือเท่ากับ ตัวเลขหนึ่ง โดยจะแสดงดาต้าเฟรมออกมาเฉพาะแถวที่เข้า condition นั้น
# ยกตัวอย่าง df.filter(df['age'] > 21).show() +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+
วิธีการเรียงลำดับข้อมูลโดยใช้ Sort
การเรียงลำดับข้อมูล จากมากไปน้อย (descending) หรือ จากน้อยไปมาก (ascending) ทำได้ตามความต้องการของเราตามวิธีดังต่อไปนี้
# วิธีที่ 1 จากมากไปน้อย df.sort(df.age.desc()) # วิธีที่ 2 จากมากไปน้อย โดยให้น้อยไปมาก เป็น false df.sort("age", ascending=False) # วิธีที่ 3 จากมากไปน้อยโดย age, จากน้อยไปมากโดย city df.orderBy(["age","city"],ascending=[0,1])
การเซฟไฟล์
วิธีการเปลี่ยนชนิดของข้อมูล
เราสามารถเล่นกับข้อมูลโดยเปลี่ยน ชนิดของดาต้าไปมาได้ ตั้งแต่เปลี่ยนดาต้าเฟรมเป็น RDD หรือ ให้อยู่ในรูปแบบ Pandas นอกจากเปลี่ยนไปแล้วก็ยังเปลี่ยนกลับได้ด้วย ยกตัวอย่างเช่น
# เปลี่ยน dataframe เป็น RDD rdd1 = df.rdd # เปลี่ยน dataframe เป็น string RDD df.toJSON().first() # ทำให้ spark dataframe อยู่ในรูปแบบ pandas dataframe df.toPandas() # ทำให้ pandas dataframe อยู่ในรูปแบบ spark dataframe df = spark.createDataFrame(pandas_df)
สำหรับคนที่สนใจว่า Pandas คืออะไร นำไปทำอะไรกับดาต้าเฟรมได้อีกนั้น สามารถเข้าไปดูบทความของเรา Cheatsheet วิธีใช้ และเทคนิคใน Pandas (Python) ฉบับสมบูรณ์ ได้เลยค่า
วิธีการเซฟไฟล์เป็น Parquet files
ถ้าเราอยากเซฟไฟล์เป็น parquet สามารถทำได้โดยใช้ write.save แล้วตามด้วยชื่อ
df.write.save("newFile.parquet")
วิธีการเซฟไฟล์เป็น JSON
ในกรณีของ json สามารถทำได้โดยใช้ write.save แล้วตามด้วยชื่อ จากนั้นระบุ format เข้าไปด้วย
df.write.save("newFile.json",format="json")
วิธีหยุดใช้งาน Spark
ในกรณีที่อยากหยุดการทำงานของ Spark ทำได้โดย
spark.stop()
สรุปการใช้งาน Pyspark
จากเทคนิคต่าง ๆ ด้านบน จะเห็นว่า Pyspark การใช้งานนั้นไม่ได้ยากอย่างที่คิดใช่ไหมคะ เท่านี้ทุกคนได้รู้เทคนิคการจัดการ Big Data แล้ว หวังว่าบทความนี้จะมีประโยชน์กับท่านที่กำลังหัดใช้งาน Pyspark กันอยู่นะคะ สุดท้ายนี้อ่านบทความแนะนำเพิ่มเติมสำหรับ Big Data ได้ตามนี้เลย
ส่วนถ้าใครคิดว่าบทความนี้มีประโยชน์ อยากฝากให้ช่วยแชร์ให้เพื่อน ๆ หน่อยนะคะ และถ้าอยากติดตามบทความดี ๆ ด้าน Data กันบน Facebook หรืออยากมาพูดคุย ติชมกัน เชิญได้ที่ Facebook Page: DataTH – Data Science ชิลชิล เลยนะค้าา
แล้วพบกันใหม่บทความหน้าค่ะ