Làm việc với file CSV trong Ruby - phần 2

Ở bài viết này, mình xin giới thiệu một thư viện giúp import dữ liệu vào database với một lượng dữ liệu lớn, giúp ta cải thiện rất nhiều vấn đề về performance: activerecord-import

activerecord-import là gì?

  • activerecord-import là một thư viện hỗ trợ bulk insert dữ liệu với ActiveRecord
  • Một trong những tính năng chính là follow các active record associations và tối thiểu hoá số lượng các câu insert SQL cần thiết, tránh vấn đề N+1 insert. Ví dụ: giả sử bạn có một schema như sau:
    • Publisher has many Books
    • Book has many Reviews

Và bạn muốn bulk insert 100 publishers mới, với mỗi publisher có 10k books, kèm theo 3 reviews cho mỗi book. Thư viện này sẽ follow quan hệ giữa chúng và chỉ sinh ra 3 câu query insert: một cho publisher, một cho book và một cho review. Theo cách thông thường, với bài toán trên ActiveRecord sẽ tạo ra 100 câu insert cho các publishers, với mỗi publisher cần 10k insert book, nên sinh ra 100 * 10 000 = 1 000 000 câu query insert để insert hết đống books, và tương tự cần tới 3 triệu câu insert để insert hết đống reviews Từ hơn 4M câu query insert đã được rút gọn lại chỉ còn 3, thực sự là performance đã được cải thiện rõ rệt, trong trường hợp trên, nó đã convert time để chạy batch insert từ 18h -> nhỏ hơn 2h

Sử dụng

Để sử dụng activerecord-import chúng ta có 2 cách:

  • Import gem trong list gem file:
gem 'activerecord-import'
  • require sử dụng trong file bất kì:
require 'active_record'
require 'activerecord-import'

activerecord-import làm được những gì?

  • activerecord-import có thể làm việc với những cột và mảng các gía trị của nó(fastest)
  • activerecord-import có thể làm việc với các đối tượng model(faster)
  • activerecord-import có thể thực hiện các validations(fast)
  • activerecord-import có thể update các records(mysql hoặc postgress 9.5+)

activerecord-import cung cấp cho chúng ta những vũ khí gì?

Chúng ta sẽ tìm hiểu lần lượt các method mà thư viện activerecord-import cung cấp:

1.bulk_import(*args) ⇒ Object

  • import một tập hợp các giá trị vào database, method này đặc biệt có hiệu quả hơn việc sử dụng các method của ActiveRecord như: ActiveRecord::Base#create hoặc ActiveRecord::Base#save nhiều lần. Method này hữu ích khi bạn muốn tạo nhiều bản ghi một lúc và không quan tâm đến return kết quả mỗi khi một bản ghi được insert. Nó có thể sử dụng với option check hoặc không check validation. Một điều lưu ý ở đây là nó sẽ bỏ qua các ActiveRecord::Callback trong suốt quá trình import, cách sử dụng:
Model.import array_of_models -> import bằng việc truyền vào một mảng các đối tượng của model mà bạn muốn import
Model.import column_names, array_of_models - > params đầu tiên là mảng các cột mà bạn muốn update, params thứ 2 là mảng các object của model
Model.import array_of_hash_objects
Model.import column_names, array_of_hash_objects
Model.import column_names, array_of_values
Model.import column_names, array_of_values, options -> tham số thứ 3 là option, là một hash, gồm các options được giới thiệu bên dưới
  • Options:

    • validate: true hoặc false, quá trình import sẽ chạy validate hay không, mặc định là có
    • on_duplicate_key_ignore: true hoặc false, quá trình import sẽ bỏ qua các bản ghi trùng lặp(duplicate các primary key) và tiếp tục import hay không? Trong Postgress nó tương đương với: ON CONFLICT DO NOTHING hay trong Mysql là INSERT IGNORE
    • ignore: là một alias của on_duplicate_key_ignore
    • on_duplicate_key_update: một mảng hoặc một hash, nó tương tự như option ON DUPLICATE KEY UPDATE khi insert trong mysql, hay ON CONFLICT DO UPDATE trong Postgress.
    • synchronize: đồng bộ các bản ghi được chỉ định khi nó đc đang được import
    • batch_size: chỉ định số lượng tối đa các bản ghi mỗi lần insert. mặc định là all

ví dụ:

class BlogPost < ActiveRecord::Base ; end

# Example using array of model objects
posts = [ BlogPost.new author_name: 'Zach Dennis', title: 'AREXT',
          BlogPost.new author_name: 'Zach Dennis', title: 'AREXT2',
          BlogPost.new author_name: 'Zach Dennis', title: 'AREXT3' ]
BlogPost.import posts

# Example using array_of_hash_objects
# NOTE: column_names will be determined by using the keys of the first hash in the array. If later hashes in the
# array have different keys an exception will be raised. If you have hashes to import with different sets of keys
# we recommend grouping these into batches before importing.
values = [ {author_name: 'zdennis', title: 'test post'} ], [ {author_name: 'jdoe', title: 'another test post'} ] ]
BlogPost.import values

# Example using column_names and array_of_hash_objects
columns = [ :author_name, :title ]
values = [ {author_name: 'zdennis', title: 'test post'} ], [ {author_name: 'jdoe', title: 'another test post'} ] ]
BlogPost.import columns, values

# Example using column_names and array_of_values
columns = [ :author_name, :title ]
values = [ [ 'zdennis', 'test post' ], [ 'jdoe', 'another test post' ] ]
BlogPost.import columns, values

# Example using column_names, array_of_value and options
columns = [ :author_name, :title ]
values = [ [ 'zdennis', 'test post' ], [ 'jdoe', 'another test post' ] ]
BlogPost.import( columns, values, validate: false  )

# Example synchronizing existing instances in memory
post = BlogPost.where(author_name: 'zdennis').first
puts post.author_name # => 'zdennis'
columns = [ :author_name, :title ]
values = [ [ 'yoda', 'test post' ] ]
BlogPost.import posts, synchronize: [ post ]
puts post.author_name # => 'yoda'

# Example synchronizing unsaved/new instances in memory by using a uniqued imported field
posts = [BlogPost.new(title: "Foo"), BlogPost.new(title: "Bar")]
BlogPost.import posts, synchronize: posts, synchronize_keys: [:title]
puts posts.first.persisted? # => true

2.bulk_import!(*args) ⇒ Object

Import một tập hợp các giá trị nếu tất cả chúng là hợp lệ. Quá trình import sẽ fail khi gặp lỗi validation đầu tiên và raise lỗi ActiveRecord::RecordInvalid với instance bị lỗi:

# File 'lib/activerecord-import/import.rb', line 468

def bulk_import!(*args)
  options = args.last.is_a?( Hash ) ? args.pop : {}
  options[:validate] = true
  options[:raise_error] = true

  bulk_import(*args, options)
end

về cơ bản thì nó là sự kết hợp giữa import thông thường kèm với option validate và raise error

Và còn một số method khác nữa, các bạn có thể tìm hiểu chi tiết thêm tại: https://www.rubydoc.info/gems/activerecord-import/ActiveRecord/Base

Ví dụ:

Chúng ta hãy bắt đầu một ví dụ đơn giản và tiến hành tính toán performance cho chúng: Giả sử chúng ta có một schema đơn giản là company như sau:

class Company < ApplicationRecord
  validates :name, presence: true, uniqueness: true
end

Model company có trường name kiểu string, với validate là present và uniqueness. Ta sẽ tạo một file csv với 1 triệu dòng, gồm 500k dòng valid và 500k dòng invalid:

require 'benchmark'

def print_memory_usage
  memory_before = `ps -o rss= -p #{Process.pid}`.to_i
  yield
  memory_after = `ps -o rss= -p #{Process.pid}`.to_i

  puts "Memory: #{((memory_after - memory_before) / 1024.0).round(2)} MB"
end

def print_time_spent
  time = Benchmark.realtime do
    yield
  end

  puts "Time: #{time.round(2)}"
end

require 'csv'

headers = ['name']

print_memory_usage do
  print_time_spent do
    CSV.open('data.csv', 'w', write_headers: true, headers: headers) do |csv|
      500_000.times do |i|
        csv << ["company_name_#{i+1}"]
        csv << ["company_name_#{i+1}"]
      end
    end
  end
end

khuongs-MacBook-Pro:Ats_Project khuong$ ruby create_csv.rb 
Time: 2.27
Memory: 0.53

Với cách làm cũ, ta lần lượt load từng dòng trong file csv và tiến hành insert từng record một:

module PerformanceComputing
  private
  def print_memory_usage
    memory_before = `ps -o rss= -p #{Process.pid}`.to_i
    yield
    memory_after = `ps -o rss= -p #{Process.pid}`.to_i

    puts "Memory: #{((memory_after - memory_before) / 1024.0).round(2)} MB"
  end

  def print_time_spent
    time = Benchmark.realtime do
      yield
    end

    puts "Time: #{time.round(2)}"
  end
end


require 'benchmark'
require 'csv'

class BasicInsert
    include PerformanceComputing

  def initialize file_name
    @file_name = file_name
  end

  def perform
    print_memory_usage do
      print_time_spent do
        CSV.foreach(file_name, headers: true) do |row|
          Company.create(name: row["name"])
        end
      end
    end
  end
  
  private
  attr_reader :file_name  
end

Và kết quả là(và thực tế là tôi đã ngủ 1 giấc rùi mới dậy xem kết quả):

BasicInsert.new("data.csv").perform
Time: 4746.51
Memory: 34.87 MB
[3] pry(main)> Company.count
   (84.4ms)  SELECT COUNT(*) FROM `companies` WHERE `companies`.`deleted_at` IS NULL
=> 500000

Cần 4746s để có thể import hết file csv có 1 triệu dòng và thu được 500k bản ghi company

Bây giờ, ta sẽ sử dụng activerecord-import xem kết quả ra sau: đặt batch_size = 10k

require 'benchmark'
require 'csv'

class OptimizeInsert
  include PerformanceComputing

  HEADER_LINE = 1.freeze
  BATCH_SIZE = 10_000.freeze

  def initialize file_name
    @file_name = file_name
  end

  def perform
    print_memory_usage do
      print_time_spent do
        File.open(file_name) do |file|
          file.lazy.drop(HEADER_LINE).each_slice(BATCH_SIZE) do |lines|
            Company.import [:name], CSV.parse(lines.join)
          end
        end
      end
    end
  end
  
  private
  attr_reader :file_name  
end

Và kết quả thu được thật sự ngoài mong đợi:

OptimizeInsert.new("data.csv").perform
Time: 662.98
Memory: 56.21 MB
[30] pry(main)> Company.count
   (92.4ms)  SELECT COUNT(*) FROM `companies` WHERE `companies`.`deleted_at` IS NULL
=> 500000

Chỉ mất 662s để cho kết quả tương tự (honho)

Lời kết

Trên đây là một phương pháp để tối ưu khi bạn muốn import một lượng dữ liệu lớn vào database. Hy vọng sẽ giúp ích được các bạn trong công việc. To be continue... Tham khảo: https://github.com/zdennis/activerecord-import