+4

Sử dụng Amazon SageMaker để triển khai, huấn luyện mô hình máy học Random Forest (Scikit-learn)

Bài viết này sẽ hướng dẫn cho các bạn cách dùng Amazon SageMaker để phát triển, đào tạo, điều chỉnh và triển khai mô hình ML dựa trên Scikit-Learn (Random Forest). Mẫu dữ liệu được dùng là The California Housing dataset đã được công bố tại:

Pace, R. Kelley, and Ronald Barry. "Sparse spatial autoregressions." Statistics & Probability Letters 33.3 (1997): 291-297.

Đoạn mã cơ bản:

import datetime
import time
import tarfile

import boto3
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
import sagemaker
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_california_housing

sm_boto3 = boto3.client("sagemaker")

sess = sagemaker.Session()

region = sess.boto_session.region_name

bucket = sess.default_bucket()  # this could also be a hard-coded bucket name

print("Using bucket " + bucket)

Chuẩn bị dữ liệu

Dữ liệu tải từ sklearn, chia nhỏ và gửi đến S3

# we use the California housing dataset
data = fetch_california_housing()
X_train, X_test, y_train, y_test = train_test_split(
    data.data, data.target, test_size=0.25, random_state=42
)

trainX = pd.DataFrame(X_train, columns=data.feature_names)
trainX["target"] = y_train

testX = pd.DataFrame(X_test, columns=data.feature_names)
testX["target"] = y_test
trainX.head()
trainX.to_csv("california_housing_train.csv")
testX.to_csv("california_housing_test.csv")
# send data to S3. SageMaker will take training data from s3
trainpath = sess.upload_data(
    path="california_housing_train.csv", bucket=bucket, key_prefix="sagemaker/sklearncontainer"
)

testpath = sess.upload_data(
    path="california_housing_test.csv", bucket=bucket, key_prefix="sagemaker/sklearncontainer"
)

Mã Script Mode

Tập lệnh bên dưới chứa cả chức năng đào tạo và suy luận, đồng thời có thể chạy cả trong phần cứng SageMaker Training hoặc cục bộ (PC, sổ ghi chép SageMaker, tại chỗ, v.v.). Hướng dẫn chi tiết tại đây.

%%writefile script.py

import argparse
import joblib
import os

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor


# inference functions ---------------
def model_fn(model_dir):
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf


if __name__ == "__main__":
    print("extracting arguments")
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    # to simplify the demo we don't use all sklearn RandomForest hyperparameters
    parser.add_argument("--n-estimators", type=int, default=10)
    parser.add_argument("--min-samples-leaf", type=int, default=3)

    # Data, model, and output directories
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
    parser.add_argument("--train-file", type=str, default="california_housing_train.csv")
    parser.add_argument("--test-file", type=str, default="california_housing_test.csv")
    parser.add_argument(
        "--features", type=str
    )  # in this script we ask user to explicitly name features
    parser.add_argument(
        "--target", type=str
    )  # in this script we ask user to explicitly name the target

    args, _ = parser.parse_known_args()

    print("reading data")
    train_df = pd.read_csv(os.path.join(args.train, args.train_file))
    test_df = pd.read_csv(os.path.join(args.test, args.test_file))

    print("building training and testing datasets")
    X_train = train_df[args.features.split()]
    X_test = test_df[args.features.split()]
    y_train = train_df[args.target]
    y_test = test_df[args.target]

    # train
    print("training model")
    model = RandomForestRegressor(
        n_estimators=args.n_estimators, min_samples_leaf=args.min_samples_leaf, n_jobs=-1
    )

    model.fit(X_train, y_train)

    # print abs error
    print("validating model")
    abs_err = np.abs(model.predict(X_test) - y_test)

    # print couple perf metrics
    for q in [10, 50, 90]:
        print("AE-at-" + str(q) + "th-percentile: " + str(np.percentile(a=abs_err, q=q)))

    # persist model
    path = os.path.join(args.model_dir, "model.joblib")
    joblib.dump(model, path)
    print("model persisted at " + path)
    print(args.min_samples_leaf)

Local training

Đoạn mã cho phép xóa khỏi tập lệnh mọi cấu hình dành riêng cho SageMaker và chạy cục bộ.

! python script.py --n-estimators 100 \
                   --min-samples-leaf 2 \
                   --model-dir ./ \
                   --train ./ \
                   --test ./ \
                   --features 'MedInc HouseAge AveRooms AveBedrms Population AveOccup Latitude Longitude' \
                   --target target

SageMaker Training

Triển khai việc training với Python SDK

# We use the Estimator from the SageMaker Python SDK
from sagemaker.sklearn.estimator import SKLearn

FRAMEWORK_VERSION = "0.23-1"

sklearn_estimator = SKLearn(
    entry_point="script.py",
    role=get_execution_role(),
    instance_count=1,
    instance_type="ml.c5.xlarge",
    framework_version=FRAMEWORK_VERSION,
    base_job_name="rf-scikit",
    metric_definitions=[{"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"}],
    hyperparameters={
        "n-estimators": 100,
        "min-samples-leaf": 3,
        "features": "MedInc HouseAge AveRooms AveBedrms Population AveOccup Latitude Longitude",
        "target": "target",
    },
)
# launch training job, with asynchronous call
sklearn_estimator.fit({"train": trainpath, "test": testpath}, wait=True)

Giải pháp thay thế: triển khai training với boto3

# first compress the code and send to S3

source = "source.tar.gz"
project = "scikitlearn-train-from-boto3"

tar = tarfile.open(source, "w:gz")
tar.add("script.py")
tar.close()

s3 = boto3.client("s3")
s3.upload_file(source, bucket, project + "/" + source)

Khi sử dụng boto3 để khởi chạy training, chúng ta phải trỏ rõ ràng đến docker image.

from sagemaker import image_uris


training_image = image_uris.retrieve(
    framework="sklearn",
    region=region,
    version=FRAMEWORK_VERSION,
    py_version="py3",
    instance_type="ml.c5.xlarge",
)
print(training_image)
# launch training job

response = sm_boto3.create_training_job(
    TrainingJobName="sklearn-boto3-" + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
    HyperParameters={
        "n_estimators": "300",
        "min_samples_leaf": "3",
        "sagemaker_program": "script.py",
        "features": "MedInc HouseAge AveRooms AveBedrms Population AveOccup Latitude Longitude",
        "target": "target",
        "sagemaker_submit_directory": "s3://" + bucket + "/" + project + "/" + source,
    },
    AlgorithmSpecification={
        "TrainingImage": training_image,
        "TrainingInputMode": "File",
        "MetricDefinitions": [
            {"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"},
        ],
    },
    RoleArn=get_execution_role(),
    InputDataConfig=[
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": trainpath,
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
        },
        {
            "ChannelName": "test",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": testpath,
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
        },
    ],
    OutputDataConfig={"S3OutputPath": "s3://" + bucket + "/sagemaker-sklearn-artifact/"},
    ResourceConfig={"InstanceType": "ml.c5.xlarge", "InstanceCount": 1, "VolumeSizeInGB": 10},
    StoppingCondition={"MaxRuntimeInSeconds": 86400},
    EnableNetworkIsolation=False,
)

print(response)

Triển khai việc điều chỉnh với Python SDK

# we use the Hyperparameter Tuner
from sagemaker.tuner import IntegerParameter

# Define exploration boundaries
hyperparameter_ranges = {
    "n-estimators": IntegerParameter(20, 100),
    "min-samples-leaf": IntegerParameter(2, 6),
}

# create Optimizer
Optimizer = sagemaker.tuner.HyperparameterTuner(
    estimator=sklearn_estimator,
    hyperparameter_ranges=hyperparameter_ranges,
    base_tuning_job_name="RF-tuner",
    objective_type="Minimize",
    objective_metric_name="median-AE",
    metric_definitions=[
        {"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"}
    ],  # extract tracked metric from logs with regexp
    max_jobs=10,
    max_parallel_jobs=2,
)
Optimizer.fit({"train": trainpath, "test": testpath})
# get tuner results in a df
results = Optimizer.analytics().dataframe()
while results.empty:
    time.sleep(1)
    results = Optimizer.analytics().dataframe()
results.head()

Triển khai đến điểm cuối theo thời gian thực

Khai triển với Python SDK

Công cụ Estimator có thể được triển khai trực tiếp sau khi đào tạo, với Estimator.deploy() nhưng ở đây quy trình được trình bày mở rộng hơn trong việc tạo mô hình từ s3, có thể được sử dụng để triển khai một mô hình đã được đào tạo trong một phiên khác hoặc thậm chí là ra ngoài của SageMaker.

sklearn_estimator.latest_training_job.wait(logs="None")
artifact = sm_boto3.describe_training_job(
    TrainingJobName=sklearn_estimator.latest_training_job.name
)["ModelArtifacts"]["S3ModelArtifacts"]

print("Model artifact persisted at " + artifact)
from sagemaker.sklearn.model import SKLearnModel

model = SKLearnModel(
    model_data=artifact,
    role=get_execution_role(),
    entry_point="script.py",
    framework_version=FRAMEWORK_VERSION,
)
predictor = model.deploy(instance_type="ml.c5.large", initial_instance_count=1)

Gọi Python SDK

# the SKLearnPredictor does the serialization from pandas for us
print(predictor.predict(testX[data.feature_names]))

Cách thay thế: gọi boto3

runtime = boto3.client("sagemaker-runtime")

Cách 1: dùng CSV

# csv serialization
response = runtime.invoke_endpoint(
    EndpointName=predictor.endpoint,
    Body=testX[data.feature_names].to_csv(header=False, index=False).encode("utf-8"),
    ContentType="text/csv",
)

print(response["Body"].read())

Cách 2: dùng npy

# npy serialization
from io import BytesIO


# Serialise numpy ndarray as bytes
buffer = BytesIO()
# Assuming testX is a data frame
np.save(buffer, testX[data.feature_names].values)

response = runtime.invoke_endpoint(
    EndpointName=predictor.endpoint, Body=buffer.getvalue(), ContentType="application/x-npy"
)

print(response["Body"].read())

Đừng quên xóa điểm cuối

sm_boto3.delete_endpoint(EndpointName=predictor.endpoint)

Kham khảo:

Doc: https://sagemaker.readthedocs.io/en/stable/using_sklearn.html

SDK: https://sagemaker.readthedocs.io/en/stable/sagemaker.sklearn.html

boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#client


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí