Xử lý bigdata bằng dịch vụ EMR của AWS part 3 (sử dụng hệ thống spark)

Chào các bạn, như ở phần trước chúng ta đã cùng nhau dựng và chạy hệ thống Spark. Tiếp theo đây mình sẽ cùng các bạn dựng và chạy 1 số ví dụ áp dụng spark

Chú ý: Spark có thể chạy với rất nhiều ngôn ngữ như: Scala, Python, R, Java

  • Ví dụ 1: Đếm số từ trong 1 đoạn văn bản Đầu tiên bạn cần tạo 1 file text có nội dung ngắn làm ví dụ:
$ vim sample.txt

Thêm nội dung sau

apache spark is a fast, real time and extremely expressive computing system which executes job
 in distributed (clustered) environment.

Sau đó ta cần đưa file đó vào hẹ thông thông qua lệnh

$ hdfs dfs -copyFromLocal ./sample.txt /user/root/

Kế tới bạn cần vào scala terminal để chạy lệnh tính toán

scala> val logFile = "hdfs://master.spark.com:9000/user/root/sample.txt" #khai báo đường dẫn để đọc file
logFile: String = hdfs://master.spark.com:9000/user/root/sample.txt

scala> val file = sc.textFile(logFile) #đọc file và load các từ vào hệ thống
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :23
scala> val counts = file.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at :25

scala> counts.collect()
res1: Array[(String, Int)] = Array((Python,1), (is,4), ((Installation,1), (same,1), (with,5), (MapReduce,1), (Java,3), (we,1), (This,1), ((clustered),1), (using,1), (CentOS,3), (aware,1), (post,1), (What,1), (setting,1), (computing,1), (lets,1), (computations.,1), 
(executes,1), (learn,1), (are,1), (assume,1), (YARN.,1), (provides,1), (expressive,1), (real,1), (cluster,,1), (So,,1), (Java,,1), (moving,1), (Apache,6), (how,1), (will,1), (compatible,1), (YARN,2), (as,1), ("",16), (Spark?,1), (capabilities.,1), (cluster,1), (Scala,1), (almost,1), (quite,1), (fast,,1), (Computing,1), (rich,1), (Node,1), (Spark,2), (job,1),(environment.,1), (about,1), (than,1), (7,2), (APIs,1), (on,5), (10x,1), (in,3), (which,1), (100x,1), (Install,2), (extremely,1), (along,1), (install,1), (distributed,1), ...

đây là 1 ví dụ đơn giản, Sau đây mình sẽ hướng dẫn bạn xử lý file data kiểu json (dữ liệu gốc được xuất ra từ mogodb)

Và ở ví dụ này mình sẽ sử dụng Python để xử lý (lý do chọn Python là mình có thể dựng được 1 con server rất nhanh bằng flask mà mình sẽ nói ở phần sau)

Bạn có 2 cách để chạy Spark, 1 là sử dụng terminal như ví dụ đầu, 2 là sử dụng chạy file (giống như rails c và chạy file ruby bằng lệnh ruby hello.rb)

  1. Cách 1 dùng terminal để vào spark bạn viết nhưng lệnh sau trên terminal máy
$ sudo su -
$ su - spark
$ /bin/pyspark

trong spark-shell bạn có thể chạy thử việc đọc data json và query thử

# đọc file dữ liệu json bạn có thể tự tạo tay hoặc xuất ra từ db
logs = spark.read.json("/user/spark/log.json")

# Tạo 1 view tạm giống như 1 table trên mysql
logs.createOrReplaceTempView("logs")

# đưa dữ liệu vào cache, ở đây sprak có hỗ trợ rất nhiều loại, trên ram, trên ổ cứng hoặc kết hợp, như ở ví dụ này thì mình chọn trên ổ cứng, 
# tuy nhiên nếu lượng dữ liệu là ko nhiều và tần xuất truy cập sử dụng nhiều thì bạn nên sử dụng lưu trên ram thì tốc độ đọc nhanh hơn, 
# có báo cáo trên mạng mà mình đọc được là nhanh hơn ổ đĩa > 10 lần, tuy nhiên với dữ liệu loen thì bạn có thể lưu trên ổ đĩa(ssd) cũng khá tốt. 

logs.persist(StorageLevel.DISK_ONLY)

# Khai báo query cú pháp bạn có thể thấy răng nó rất suống cú pháp sql, cái này mới chỉ là khai báo chứ chưa chạy
temp1 = spark.sql("SELECT uid, url.url FROM logs WHERE date >= '2016-05-01' and channel == 'organic_search'")

# thực hiện lệnh đã khai báo ở trên
temp1.count()
# sau khi thực hiện lệnh trên thì ta mới có thể tạo được 1 view tạm để tạo ra 1 table tên temp1
temp1.createOrReplaceTempView("temp1")

# câu lệnh sql ví dụ khác
temp2 = spark.sql("SELECT uid, url, size(url) FROM temp1 HAVING size(url) > 1")
temp2.count()
temp2.createOrReplaceTempView("temp2")

# third stage
temp_sql = "SELECT  uid, url FROM temp1 WHERE array_contains(url, '%s')" % url
temp3 = spark.sql(temp_sql)
temp3.count()
temp3.createOrReplaceTempView("temp3")

  1. cách 2 đó là tạo 1 file .py như sau
# load spark trên python
spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

# dùng spark để đọc và xử lý dữ liệu
log = spark.read.json("/user/spark/out.json")
log.persist(StorageLevel.DISK_ONLY)
#log.persist()
log.count()
log.createOrReplaceTempView("log")

kế tiếp trên terminal ta sẽ chạy lệnh :

$ /bin/spark-submit --master yarn --deploy-mode client sql.py

kết quả sẽ đc trả ra sau khi hệ thống xử lý xong

Cách sử dụng khá đơn giản, tuy nhiên nếu ta muốn áp dụng cho hệ thống web thì phải làm thế nào. 1 giải pháp mà minh đưa ra đó là sử dụng mô hình microservice đó là làm riêng 1 hệ thông server nội bộ cho spark và các request sang có thể gọi qua api/web tùy yêu cầu hệ thống. Mính sẽ demo 1 server dựng bởi python với flask (cái này bạn có thể google để biết thêm nếu muốn)

Đầu tiên ta cần cài flask vào máy server

#cài python nếu máy bạn chưa cài 
$ yum install -y python-devel  gcc  git curl  wget logrotate ntp

#tải gói pip để cài flask
$ curl "https://bootstrap.pypa.io/get-pip.py" -o "get-pip.py"
$ python get-pip.py
$ pip install flask

Sau khi cài xong ta sẽ dựng 1 con server nhỏ bằng python như sau tạo file python với tên file sql-spark.py

#sql-spark.py
from flask import Flask
from flask import request

import os
import sys

from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel

app = Flask(__name__)
#app.run(host="0.0.0.0", port="5000")

# khai báo spark như ở file python thường và load dữ liệu 
spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

# ở đây bạn có thế lấy link trực tiếp từ s3 để load data, 
# lưu ý là spark có hỗ trợ đọc cả folder nên bạn có thể xuất file log theo từng ngày vào 1 thư mục rồi đọc lại vào spark sau.
log = spark.read.json("s3://xxxxx/EMR/input/out.json") 
log.persist(StorageLevel.MEMORY_ONLY)
#log.persist()
log.count()
log.createOrReplaceTempView("log")

@app.route('/')
def hello_world():
    return 'Hello, World!'

@app.route('/query')

# khai báo cách thức xử lý khi có request gọi đến server với đường dẫn kiểu '/query'
def query():
    # lọc các params truyền sang để lấy các thông số cần thiết cho việc query
    results = []
    from_date = request.args.get('from_date')
    to_date = request.args.get('to_date')
    url = request.args.get('url')
    client_id = request.args.get('client_id')
    
    # bắt đầu thực hiện xử lý các query
    sql = "SELECT uid, url.url FROM log WHERE date['$date'] between '%s' and '%s' and client_id = '%s'" % (from_date, to_date, client_id)
    temp1 = spark.sql(sql)

    #cache temp1
    temp1.persist(StorageLevel.DISK_ONLY)

    results.append(temp1.count())
    temp1.createOrReplaceTempView("temp1")

    


    temp2 = spark.sql("SELECT uid, url, size(url) FROM temp1 HAVING size(url) > 1")

    temp2.persist(StorageLevel.DISK_ONLY)

    results.append(temp2.count())
    temp2.createOrReplaceTempView("temp2")


    temp_sql = "SELECT  uid, url FROM temp2 WHERE array_contains(url, '%s')" % url
    temp3 = spark.sql(temp_sql)
    results.append(temp3.count())
    temp3.createOrReplaceTempView("temp3")

    temp4 = spark.sql(temp_sql)

    #remove cache
    temp1.unpersist()
    temp2.unpersist()

    results.append(temp4.count())
    #for each in temp.collect():
    #    results =  each[0] 
    return 'HELLO: %s' % results
if __name__ == "__main__":
    app.run(threaded=True, host="0.0.0.0")

để bật server chạy ta dùng lệnh giống như chạy file python trước

$ /bin/spark-submit --master yarn --deploy-mode client sql-spark.py

kết quả show ra mang hình có dạng

16/11/23 17:15:28 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.13.205 instead (on interface eno16777984)
16/11/23 17:15:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/11/23 17:15:28 INFO SecurityManager: Changing view acls to: root
16/11/23 17:15:28 INFO SecurityManager: Changing modify acls to: root
16/11/23 17:15:28 INFO SecurityManager: Changing view acls groups to: 
16/11/23 17:15:28 INFO SecurityManager: Changing modify acls groups to: 
16/11/23 17:15:28 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
16/11/23 17:15:28 INFO Utils: Successfully started service 'sparkDriver' on port 54994.
16/11/23 17:15:28 INFO SparkEnv: Registering MapOutputTracker
16/11/23 17:15:28 INFO SparkEnv: Registering BlockManagerMaster
....
6/11/23 17:15:31 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.13.207:49080 (size: 3.4 KB, free: 366.3 MB)
16/11/23 17:15:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.13.206:54811 (size: 23.4 KB, free: 366.3 MB)
16/11/23 17:15:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.13.207:49080 (size: 23.4 KB, free: 366.3 MB)
16/11/23 17:15:32 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, 192.168.13.206, partition 4, PROCESS_LOCAL, 5447 bytes)
16/11/23 17:15:32 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 4 on executor id: 0 hostname: 192.168.13.206.
16/11/23 17:15:32 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, 192.168.13.206, partition 5, PROCESS_LOCAL, 5447 bytes)
16/11/23 17:15:32 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 5 on executor id: 0 hostname: 192.168.13.206.
16/11/23 17:15:32 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 2263 ms on 192.168.13.206 (1/386)
16/11/23 17:15:32 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2300 ms on 192.168.13.206 (2/386)
16/11/23 17:15:33 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, 192.168.13.207, partition 6, PROCESS_LOCAL, 5447 bytes)
16/11/23 17:15:33 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 6 on executor id: 1 hostname: 192.168.13.207.
16/11/23 17:15:33 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7, 192.168.13.206, partition 7, PROCESS_LOCAL, 5447 bytes)
16/11/23 17:15:33 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 7 on executor id: 0 hostname: 192.168.13.206.
16/11/23 17:15:33 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 433 ms on 192.168.13.206 (3/386)
16/11/23 17:15:33 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8, 192.168.13.206, partition 8, PROCESS_LOCAL, 5447 bytes)
16/11/23 17:15:33 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 8 on executor id: 0 hostname: 192.168.13.206.
16/11/23 17:15:33 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9, 192.168.13.207, partition 9, PROCESS_LOCAL, 5447 bytes)
16/11/23 17:15:33 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 9 on executor id: 1 hostname: 192.168.13.207.
16/11/23 17:15:33 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 2511 ms on 192.168.13.207 (4/386)
16/11/23 17:15:33 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 551 ms on 192.168.13.206 (5/386)
16/11/23 17:15:33 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 2517 ms on 192.168.13.207 (6/386)
16/11/23 17:15:33 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10, 192.168.13.206, partition 10, PROCESS_LOCAL, 5447 bytes)
16/11/23 17:15:33 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 10 on executor id: 0 hostname: 192.168.13.206.
16/11/23 17:15:33 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 449 ms on 192.168.13.206 (7/386)
...

như trên bạn sẽ thấy có 2 phần, 1 phần đầu sẽ là load server flask, phần sau là spark load dữ liệu, chờ mất 1 lúc, tùy thuộc vào dộ lớn của dũ liệu.

Sau khi server load xong ta có thể truy xuất dữ liệu thông qua request tới server đã dựng

[email protected] spark-2.0.1-bin-hadoop2.7]$ curl "localhost:5000/query?from_date=2016-08-15&to_date=2016-10-15&client_id=c14101400051&url=https://ferret-plus.10m/2319" -w %{time_connect}:%{time_starttransfer}:%{time_total}
HELLO: [2655342, 653206, 0, 0]0.004:12.140:12.140
[[email protected] spark-2.0.1-bin-hadoop2.7]
[[email protected] spark-2.0.1-bin-hadoop2.7]
[[email protected] spark-2.0.1-bin-hadoop2.7]
[[email protected] spark-2.0.1-bin-hadoop2.7]$ curl "localhost:5000/query?from_date=2016-08-15&to_date=2016-10-15&client_id=c14101400051&url=https://ferret-plus.10m/2319" -w %{time_connect}:%{time_starttransfer}:%{time_total}
HELLO: [2655342, 653206, 0, 0]0.004:10.600:10.600
[[email protected] spark-2.0.1-bin-hadoop2.7]
[[email protected] spark-2.0.1-bin-hadoop2.7]
[[email protected] spark-2.0.1-bin-hadoop2.7]
[[email protected] spark-2.0.1-bin-hadoop2.7]
[[email protected] spark-2.0.1-bin-hadoop2.7]$ curl "localhost:5000/query?from_date=2016-08-15&to_date=2016-10-15&client_id=c14101400051&url=https://ferret-plus.10m/2319" -w %{time_connect}:%{time_starttransfer}:%{time_total}
HELLO: [2655342, 653206, 0, 0]0.004:8.213:8.213
[[email protected] spark-2.0.1-bin-hadoop2.7]

như các bạn thấy kêt quả trả ra là các giá trị trong biến result mà ta đã khai báo ở trên server tốc độ cũng khá tốt, ở đây mình đang sử dụng 12gb dữ liệu (xấp xỉ 12 triệu bản ghi) với 1 máy server và 3 máy client

Ok vậy là những điều cơ bản để cài đặt và sử dụng spark mình đã nói ở 3 bài viết. Còn lại nếu bạn nào muốn áp dụng cao hơn thì hãy tìm hiểu trên google và trang chính của spark http://spark.apache.org/docs/latest/cluster-overview.html. Còn có rất nhiều điều thú vị khi sử dụng spark, như streaming data ... Còn ai muốn đào sâu hơn về cấu trúc, phẩn bổ bộ nhớ thì nên tìm hiểu về máy ảo java. Cám ơn bạn đã đồng hành với seri đầu tiên của Spark

All Rights Reserved