+1

Giải thích và Ứng dụng của PySpark.sql.Window trong Xử lý Dữ liệu phân tán

Mayfest2023

Giới thiệu

Trong việc xử lý dữ liệu phân tán và tính toán song song, PySpark là một trong những công cụ phổ biến và mạnh mẽ nhất. Trong PySpark, pyspark.sql.Window là một lớp quan trọng trong module pyspark.sql, cho phép chúng ta thực hiện các phép tính và phân tích cửa sổ trên các DataFrame. Trên thực tế, pyspark.sql.Window cung cấp một cách tiện lợi để phân chia dữ liệu thành các nhóm (cửa sổ) và thực hiện các tính toán dựa trên cửa sổ đó. Trong bài viết này, chúng ta sẽ tìm hiểu về pyspark.sql.Window và cách áp dụng nó trong xử lý dữ liệu phân tán.

pyspark.sql.Window là gì?

Trong PySpark, pyspark.sql.Window là một lớp được sử dụng để định nghĩa và xác định cửa sổ dữ liệu trong DataFrame. Nó cho phép chúng ta xác định cách phân chia dữ liệu thành các nhóm và sắp xếp chúng trong từng nhóm. Bằng cách sử dụng pyspark.sql.Window, chúng ta có thể thực hiện các tính toán trên các nhóm dữ liệu, như tính toán tổng, trung bình, độ dốc, lệch chuẩn, v.v. Chính xác hơn, pyspark.sql.Window cho phép chúng ta định nghĩa:

  1. Các cột để phân chia dữ liệu thành các nhóm.
  2. Cách sắp xếp dữ liệu bên trong mỗi nhóm.
  3. Phạm vi của các dòng trong mỗi cửa sổ.

Các phương thức quan trọng trong pyspark.sql.Window

  1. Window.currentRow: Đại diện cho dòng hiện tại trong cửa sổ. Khi sử dụng Window.currentRow, chúng ta chỉ định rõ rằng phạm vi tính toán hoặc thực hiện hàm phân tích chỉ áp dụng cho dòng hiện tại.

  2. partitionBy(*cols): Phương thức này xác định các cột để phân chia dữ liệu thành các nhóm. Các cột được đưa vào partitionBy sẽ làm cơ sở cho việc phân chia dữ liệu và tính toán cửa sổ theo từng nhóm riêng biệt. Ví dụ, nếu chúng ta muốn tính tổng theo từng nhóm dữ liệu của cột "category", chúng ta có thể sử dụng partitionBy("category").

from pyspark.sql import Window
from pyspark.sql.functions import row_number
df = spark.createDataFrame(
     [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])
     
window = Window.partitionBy("category").orderBy("id")
df.withColumn("row_number", row_number().over(window)).show()

-----
+---+--------+----------+
| id|category|row_number|
+---+--------+----------+
|  1|       a|         1|
|  1|       a|         2|
|  2|       a|         3|
|  1|       b|         1|
|  2|       b|         2|
|  3|       b|         3|
+---+--------+----------+
  1. orderBy(*cols): Phương thức này xác định cách sắp xếp dữ liệu bên trong mỗi nhóm. Các cột được đưa vào orderBy sẽ được sắp xếp theo thứ tự tăng dần để xác định thứ tự của các dòng trong cửa sổ. Ví dụ, nếu chúng ta muốn sắp xếp theo cột "date" và "time", chúng ta có thể sử dụng orderBy("date", "time").
from pyspark.sql import Window
from pyspark.sql.functions import row_number
df = spark.createDataFrame(
     [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])

window = Window.partitionBy("id").orderBy("category")
df.withColumn("row_number", row_number().over(window)).show()

----
+---+--------+----------+
| id|category|row_number|
+---+--------+----------+
|  1|       a|         1|
|  1|       a|         2|
|  1|       b|         3|
|  2|       a|         1|
|  2|       b|         2|
|  3|       b|         1|
+---+--------+----------+
  1. rowsBetween(start, end): Phương thức này xác định phạm vi của các dòng trong mỗi cửa sổ. Các tham số startend có thể nhận các giá trị như Window.unboundedPreceding, Window.unboundedFollowing, hoặc các số nguyên đại diện cho số lượng dòng trước và sau dòng hiện tại. Ví dụ, nếu chúng ta muốn tính tổng các dòng từ dòng hiện tại trở về 2 dòng trước, chúng ta có thể sử dụng rowsBetween(-2, 0).
from pyspark.sql import Window
from pyspark.sql import functions as func
df = spark.createDataFrame(
     [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])

Tính tổng id trong phạm vi từ currentRow đến currentRow + 1 trong danh mục phân vùng

window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
df.withColumn("sum", func.sum("id").over(window)).sort("id", "category", "sum").show()

-----
+---+--------+---+
| id|category|sum|
+---+--------+---+
|  1|       a|  2|
|  1|       a|  3|
|  1|       b|  3|
|  2|       a|  2|
|  2|       b|  5|
|  3|       b|  3|
+---+--------+---+
  1. rangeBetween(start, end): Phương thức này xác định phạm vi của các dòng trong mỗi cửa sổ dựa trên giá trị của một cột được sắp xếp (thông qua orderBy). Các tham số startend có thể là các giá trị sau:

    • Window.unboundedPreceding: Đại diện cho không giới hạn phía trước. Nghĩa là phạm vi sẽ bắt đầu từ dòng đầu tiên của cửa sổ.
    • Window.unboundedFollowing: Đại diện cho không giới hạn phía sau. Nghĩa là phạm vi sẽ kết thúc ở dòng cuối cùng của cửa sổ.
    • Window.currentRow: Đại diện cho dòng hiện tại trong cửa sổ.

    Ví dụ, rangeBetween(Window.unboundedPreceding, Window.currentRow) sẽ xác định phạm vi từ dòng đầu tiên của cửa sổ đến dòng hiện tại trong cửa sổ.

from pyspark.sql import Window
from pyspark.sql import functions as func
df = spark.createDataFrame(
     [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])

Tính tổng id trong phạm vi từ id của currentRow đến id của currentRow + 1 trong danh mục phân vùng.

window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show()

-----
+---+--------+---+
| id|category|sum|
+---+--------+---+
|  1|       a|  4|
|  1|       a|  4|
|  1|       b|  3|
|  2|       a|  2|
|  2|       b|  5|
|  3|       b|  3|
+---+--------+---+
  1. Window.unboundedFollowing: Đại diện cho không giới hạn phía sau. Khi sử dụng Window.unboundedFollowing, chúng ta xác định rằng phạm vi tính toán hoặc thực hiện hàm phân tích áp dụng cho tất cả các dòng từ dòng hiện tại đến cuối cùng của cửa sổ.

  2. Window.unboundedPreceding: Đại diện cho không giới hạn phía trước. Khi sử dụng Window.unboundedPreceding, chúng ta xác định rằng phạm vi tính toán hoặc thực hiện hàm phân tích áp dụng cho tất cả các dòng từ đầu cửa sổ đến dòng hiện tại.

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

# Tạo SparkSession
spark = SparkSession.builder.getOrCreate()

# Tạo DataFrame mẫu
data = [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5)]
df = spark.createDataFrame(data, ["category", "value"])

# Định nghĩa cửa sổ
window = Window.partitionBy("category").orderBy("value")

# Sử dụng Window.unboundedPreceding để tính tổng tích lũy
df.withColumn("cumulative_sum", sum("value").over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()

# Sử dụng Window.unboundedFollowing để tính tổng tích lũy ngược
df.withColumn("reverse_cumulative_sum", sum("value").over(window.rowsBetween(Window.currentRow, Window.unboundedFollowing))).show()

# Kết quả
+--------+-----+--------------+
|category|value|cumulative_sum|
+--------+-----+--------------+
|       A|    1|             1|
|       A|    2|             3|
|       A|    3|             6|
|       B|    4|             4|
|       B|    5|             9|
+--------+-----+--------------+

+--------+-----+---------------------+
|category|value|reverse_cumulative_sum|
+--------+-----+---------------------+
|       A|    1|                    6|
|       A|    2|                    5|
|       A|    3|                    3|
|       B|    4|                    5|
|       B|    5|                    0|
+--------+-----+---------------------+

Các giá trị Window.unboundedPrecedingWindow.unboundedFollowing là cách chúng ta xác định rằng không có giới hạn trước hoặc sau đối với phạm vi tính toán hoặc hàm phân tích.

Các phương thức và giá trị trên cho phép chúng ta linh hoạt xác định phạm vi các dòng trong mỗi cửa sổ dữ liệu, từ đó tính toán và áp dụng các hàm phân tích phức tạp trên dữ liệu.

Ứng dụng trong xử lý dữ liệu phân tán

pyspark.sql.Window rất hữu ích khi chúng ta cần thực hiện các tính toán trên các nhóm dữ liệu hoặc áp dụng các hàm phân tích cửa sổ cho từng dòng dữ liệu. Ví dụ, chúng ta có thể sử dụng pyspark.sql.Window để tính toán tổng các dòng trước đó, tính toán độ dốc, lấy giá trị trước/sau của một dòng, hoặc tính toán các thống kê như trung bình, độ lệch chuẩn, v.v.

Ví dụ, để tính tổng cột "sales" theo từng nhóm dữ liệu của cột "category", chúng ta có thể sử dụng đoạn mã sau:

from pyspark.sql import Window
from pyspark.sql.functions import sum

window = Window.partitionBy("category")
df.withColumn("category_total_sales", sum("sales").over(window))

Đoạn mã trên tạo một cửa sổ dữ liệu bằng cách sử dụng partitionBy("category") và sau đó tính tổng cột "sales" trong mỗi nhóm bằng cách sử dụng hàm sum("sales").over(window).

Kết luận

pyspark.sql.Window là một công cụ mạnh mẽ trong PySpark cho phép chúng ta thực hiện tính toán và phân tích trên các nhóm dữ liệu. Bằng cách sử dụng các phương thức như partitionBy, orderBy, và rowsBetween, chúng ta có thể xác định các cửa sổ dữ liệu theo những tiêu chí cụ thể và thực hiện các tính toán phức tạp trên từng cửa sổ. Với khả năng xử lý dữ liệu phân tán và tính toán song song của PySpark, pyspark.sql.Window là một công cụ quan trọng trong kho công cụ của chúng ta khi làm việc với dữ liệu lớn và phân tán.

Kham khảo:

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/window.html


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí