[Embulk #1] Công cụ giúp giảm nỗi đau chuyển đổi dữ liệu
Bài đăng này đã không được cập nhật trong 3 năm
Embulk là gì?
Khái niệm
Embulk là một công cụ open source có chức năng cơ bản là load các record từ database này và import sang database khác. Ngoài ra, còn có các chức năng import data vào database khác thông qua việc sử dụng các plugins làm cho công việc chuyển đổi và xử lý dữ liệu trở nên đơn giản và thuận tiện hơn.
Embulk hoạt động trên nền tảng java nên có thể dễ dàng hoạt động trên nhiều hệ điều hành khác nhau.
Ưu điểm
- Cài đặt đơn giản, hoạt động được trên nhiều hệ điều hành khác nhau
- Các chức năng được cung cấp dưới dạng plugin nên với mỗi đặc thù công việc sẽ có các plugin cần thiết.
- Các plugin luôn được cập nhật liên tục từ nhà phát triển
- Embulk và các plugin của nó được cung cấp miễn phí và cho phép người dùng thoải mái cải tiến để phù hợp với yêu cầu riêng
Cấu trúc của Embulk
Về cơ bản thì cấu trúc của embulk được chia làm 3 phần chính:
- Các input plugins data, Decoder plugins, Parser plugins: Cung cấp các phương thức input data khác nhau. ví dụ: mysql, postgresql, Amazon S3, HDFS, http(get data từ api)
- Các plugin xử lý dữ liệu: Cung cấp các plugins cho phép chọn lọc dữ liệu (filter plugins)
- Các output plugins data, Formater plugins, Encoder plugins
Một số plugins:
input plugins:
- RDBS ( mysql, postgres, jdbc... )
- NoSQL ( redis, mongodb)
- Cloud Service (redshift, s3 )
- Files (CSV, JSON ...)
- Etc ( hdfs, http, elastic search, slack-history, google analitics )
output plugins:
- RDBS ( mysql, postgres, oracle, jdbc...)
- Cloud Service ( redshift, s3, bigquery)
- NoSQL ( redis, hdfs )
- Files
- Etc ( elastic search, hdfs, swift)
filter plugins:
- column (cut the column)
- insert Add columns such as host name to the specified location
- row Extract only rows that meet certain conditions
- rearrange Reconstructs one row of data into multiple rows
File parser plugins:
- json
- xml
- csv
- apache log
- query_string
- regex
File formatter Plugin:
- json
- A plugin that formats the contents of a record in the format of jsonl (1 json 1 line)
- poi_excel
- Plugin to convert to Excel (xls, xlsx) format data
Excutor Plugin:
- mapreduce
- Plugin for running Embulk tasks on Hadoop
Chi tiết các plugins này có thể xem thêm tại đây: https://plugins.embulk.org/ . Tại đây chứa rất nhiều các flugins hữu ích.
Cách thức hoạt động
Trong trường hợp dữ liệu từ database -> database khác. Ta có sở đồ sau:
Dữ liệu sẽ được input plugin read -> xử lý dữ liệu -> output plugin sẽ tiến hành import vào database mới.
Trong trường hợp tổng quát hơn. Dữ liệu có thể được đọc từ file hay một số kiểu dữ liệu đặc thù
Cài đặt Embulk
Để cài đặt được Embulk trước tiên bạn cần cài java cho thiết bị của mình. Chú ý hiện tại Embulk mới chỉ hoạt động trên java8.
sudo apt update
sudo apt install openjdk-8-jre-headless
Cài đặt embulk
curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
Cài đặt plugin cho embulk
embulk gem install embulk-output-postgresql embulk-filter-column embulk-filter-add_time embulk-input-postgresql embulk-filter-ruby_proc embulk-input-http embulk-parser-jsonpath embulk-filter-eval
Phía trên là cài đặt một số plugin phục vụ cho việc import dữ liệu từ csv hoặc api json -> vào database postgresql. Để phục vụ mục đích khác có thể xem các plugins phía trên.
Thực thi chạy embulk với file config
embulk run --log-level warn config.yml.liquid
Một số ví dụ config cho Embulk
Import dữ liệu từ csv -> vào database postgresql
in:
type: file
path_prefix: /var/batch/data/csv_year_generic_medicine_export_pjx.csv
parser:
charset: UTF-8
newline: LF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: ''
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: true
allow_optional_columns: true
columns:
- {name: personal_id, type: string}
- {name: 対象年度, type: string}
- {name: ジェネリック医薬品数量, type: long}
- {name: 置き換え可能医薬品数量, type: long}
- {name: ジェネリック医薬品費, type: long}
- {name: ジェネリック医薬品のある先発医薬品費, type: long}
- {name: ジェネリック医薬品のない先発医薬品費, type: long}
filters:
- type: add_time
to_column:
name: create_datetime
type: timestamp
from_value:
mode: upload_time
- type: add_time
to_column:
name: update_datetime
type: timestamp
from_value:
mode: upload_time
- type: column
add_columns:
- {name: create_user, type: string, default: "SYSTEM"}
- {name: update_user, type: string, default: "SYSTEM"}
- type: rename
columns:
personal_id: personal_id
対象年度: target_year
ジェネリック医薬品数量: generic_drug_quantity
置き換え可能医薬品数量: replaceable_drug_quantity
ジェネリック医薬品費: original_drug_cost
ジェネリック医薬品のある先発医薬品費: original_drug_cost_with_generic
ジェネリック医薬品のない先発医薬品費: original_drug_cost_without_generic
out:
type: postgresql
host: {{ env.DB_HOST }}
user: {{ env.DB_USER }}
password: {{ env.DB_PASSWORD }}
database: {{ env.DB_NAME }}
table: raw_yearly_generic_medicine
mode: truncate_insert
Import dữ liệu từ api json -> vào database postgresql
in:
type: http
url: {{ env.API_URL }}
method: get
params:
- {name: limit, value: {{ env.PAGINATE }}}
pager: {from_param: offset, pages: {{ env.TOTAL_PAGE }}, start: 0, step: {{ env.PAGINATE }}}
flatten_json_array: true
parser:
charset: UTF-8
newline: LF
type: jsonpath
columns:
- {name: personal_id, type: string}
- {name: event_at, type: timestamp, format: '%Y-%m-%dT%H:%M:%S%z'}
- {name: event_name, type: string}
- {name: flow_type, type: long}
- {name: flow_type_name, type: string}
- {name: point, type: long}
- {name: id, type: string}
- {name: deleted_at, type: timestamp, format: '%Y-%m-%dT%H:%M:%S%z'}
filters:
- type: add_time
to_column:
name: create_datetime
type: timestamp
from_value:
mode: upload_time
- type: add_time
to_column:
name: update_datetime
type: timestamp
from_value:
mode: upload_time
- type: column
add_columns:
- {name: create_user, type: string, default: "SYSTEM"}
- {name: update_user, type: string, default: "SYSTEM"}
- {name: delete_flg, type: boolean, default: false}
- type: rename
columns:
id: amulet_id
out:
type: postgresql
host: {{ env.DB_HOST }}
user: {{ env.DB_USER }}
password: {{ env.DB_PASSWORD }}
database: {{ env.DB_NAME }}
table: a_point_history
mode: merge
merge_keys:
- amulet_id
Tham khảo plugin http của embulk tại đây: https://github.com/takumakanari/embulk-input-http
Tổng kết
Trên đây mình đã giới thiệu về Embulk một công cụ chuyển đổi dữ liệu xịn xò. Hi vọng nó có thể giúp các bạn giảm bớt nỗi đau
Tham khảo thêm tại:
https://dev.embulk.org/customization.html
https://qiita.com/tashiro_gaku/items/f7fa0f1a99c759d947a7#configxmlにmysqlプラグイン情報を追記
All rights reserved