+4

Xử lý dữ liệu phân tán sử dụng Apache Spark và SageMaker

Apache Spark là một công cụ phân tích hợp nhất để xử lý dữ liệu quy mô lớn. Spark framework thường được sử dụng trong luồng học máy để chuyển đổi dữ liệu hoặc kỹ thuật đặc trưng trên quy mô lớn. Amazon SageMaker cung cấp một tập hợp Docker images dựng sẵn bao gồm Apache Spark và các phần phụ thuộc khác cần thiết để chạy các công việc xử lý dữ liệu phân tán trên Amazon SageMaker. Bài viết này trình bày cách sử dụng Spark images dựng sẵn trên SageMaker Treatment bằng SageMaker Python SDK.

Bài viết này cung cấp một số ví dụ minh họa chức năng của SageMaker Spark Container:

  • Chạy chương trình PySpark đơn giản sử dụng lớp PySparkProcessor của SageMaker Python SDK.
  • Xem giao diện người dùng Spark (Spark UI) thông qua chức năng start_history_server() của PySparkProcessor object.

Thiết lập

Cài đặt phiên bản SageMaker Python SDK mới nhất

!pip install -U "sagemaker>2.0"

Chạy ứng dụng PySpark cơ bản

Ví dụ đầu tiên là tập lệnh xử lý dữ liệu Spark MLlib cơ bản. Tập lệnh này sẽ lấy một tập dữ liệu thô và thực hiện một số biến đổi trên đó, chẳng hạn như lập chỉ mục chuỗi và mã hóa one hot (one hot encoding).

Thiết lập vị trí và vai trò trong S3

Đầu tiên hãy thiết lập các vị trí SageMaker bucket mặc định để lưu trữ dữ liệu đầu vào raw và kết quả của Spark job. Các vai trò phải được xác định để chạy được tất cả các SageMaker Processing jobs.

import logging
import sagemaker
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()

Tiếp đến, tải xuống tập dữ liệu mẫu từ SageMaker staging bucket.

# Fetch the dataset from the SageMaker bucket
import boto3

s3 = boto3.client("s3")
s3.download_file(
    f"sagemaker-example-files-prod-{sagemaker_session.boto_region_name}",
    "datasets/tabular/uci_abalone/abalone.csv",
    "./data/abalone.csv",
)

Viết mã PySpark

Nguồn cho tập lệnh tiền xử lý nằm trong ô bên dưới. %%writefile được sử dụng để lưu trữ lệnh cục bộ. Tập lệnh này thực hiện một số kỹ thuật cơ bản trên tập dữ liệu thô đầu vào. Trong ví dụ này, tập dữ liệu dùng từ Abalone Data Set và đoạn mã dưới đây thực hiện lập chỉ mục chuỗi, một mã one-hot, tập hợp vectơ và kết hợp chúng thành một đường dẫn để thực hiện các phép biến đổi này theo thứ tự. Sau đó, tập lệnh thực hiện phân chia 80-20 để tạo ra các tập dữ liệu huấn luyện và xác thực làm đầu ra.

%%writefile ./code/preprocess.py
from __future__ import print_function
from __future__ import unicode_literals

import argparse
import csv
import os
import shutil
import sys
import time

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
    VectorAssembler,
    VectorIndexer,
)
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    StringType,
    StructField,
    StructType,
)


def csv_line(data):
    r = ",".join(str(d) for d in data[1])
    return str(data[0]) + "," + r


def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
    parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType(
        [
            StructField("sex", StringType(), True),
            StructField("length", DoubleType(), True),
            StructField("diameter", DoubleType(), True),
            StructField("height", DoubleType(), True),
            StructField("whole_weight", DoubleType(), True),
            StructField("shucked_weight", DoubleType(), True),
            StructField("viscera_weight", DoubleType(), True),
            StructField("shell_weight", DoubleType(), True),
            StructField("rings", DoubleType(), True),
        ]
    )

    # Downloading the data from S3 into a Dataframe
    total_df = spark.read.csv(
        ("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")),
        header=False,
        schema=schema,
    )

    # StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

    # one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    # vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
    assembler = VectorAssembler(
        inputCols=[
            "sex_vec",
            "length",
            "diameter",
            "height",
            "whole_weight",
            "shucked_weight",
            "viscera_weight",
            "shell_weight",
        ],
        outputCol="features",
    )

    # The pipeline is comprised of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

    # This step trains the feature transformers
    model = pipeline.fit(total_df)

    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)

    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])

    # Convert the train dataframe to RDD to save in CSV format and upload to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train")
    )

    # Convert the validation dataframe to RDD to save in CSV format and upload to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation")
    )


if __name__ == "__main__":
    main()

Chạy SageMaker Processing Job

Kế tiếp, lớp PySparkProcessor được sử dụng và để xác định Spark job và sử dụng SageMaker Processing chạy nó. Một số điều cần lưu ý trong định nghĩa của PySparkProcessor:

  • Đây là multi-node job với hai phiên bản m5.xlarge (được chỉ định thông qua tham số instance_count và instance_type).
  • Spark framework phiên bản 3.1 được chỉ định thông qua tham số framework_version.
  • Tập lệnh PySpark được xác định ở trên được truyền qua tham số submit_app.
  • Command-line arguments đến tập lệnh PySpark (chẳng hạn như vị trí đầu vào và đầu ra của S3) được chuyển qua tham số arguments.
  • Nhật ký sự kiện Spark (Spark event logs) sẽ được tải xuống vị trí S3 được chỉ định trong spark_event_logs_s3_uri và có thể được sử dụng để xem Spark UI trong khi công việc đang được tiến hành hoặc sau nó hoàn thành.
from sagemaker.spark.processing import PySparkProcessor

# Upload the raw input dataset to a unique S3 location
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/spark-preprocess-demo/{}".format(timestamp_prefix)
input_prefix_abalone = "{}/input/raw/abalone".format(prefix)
input_preprocessed_prefix_abalone = "{}/input/preprocessed/abalone".format(prefix)

sagemaker_session.upload_data(
    path="./data/abalone.csv", bucket=bucket, key_prefix=input_prefix_abalone
)

# Run the processing job
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
)

spark_processor.run(
    submit_app="./code/preprocess.py",
    arguments=[
        "--s3_input_bucket",
        bucket,
        "--s3_input_key_prefix",
        input_prefix_abalone,
        "--s3_output_bucket",
        bucket,
        "--s3_output_key_prefix",
        input_preprocessed_prefix_abalone,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),
    logs=False,
)

Xác thực kết quả quá trình sử lý dữ liệu

Tiếp theo, xác thực đầu ra của công việc tiền xử lý dữ liệu bằng cách xem 5 hàng đầu tiên của tập dữ liệu đầu ra:

print("Top 5 rows from s3://{}/{}/train/".format(bucket, input_preprocessed_prefix_abalone))
!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5

Xem qua Spark UI

Tiếp theo, Spark UI có thể được xem bằng cách chạy cục bộ máy chủ lịch sử. (Lưu ý: tính năng này sẽ chỉ hoạt động trong môi trường phát triển cục bộ có cài đặt docker hoặc trên Sagemaker Notebook Instance. Tính năng này hiện không hoạt động trong SageMaker Studio.)

# uses docker
spark_processor.start_history_server()

Sau khi xem Spark UI, bạn có thể tắt lịch sử máy chủ trước khi tiếp tục.

spark_processor.terminate_history_server()

Kham khảo:

https://docs.aws.amazon.com/sagemaker/latest/dg/whatis.html

https://github.com/aws/amazon-sagemaker-examples/tree/main


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí