PERF: Preload S3 inventory data for multisite clusters

PERF: Preload S3 inventory data for multisite clusters

diff --git a/app/jobs/scheduled/ensure_s3_uploads_existence.rb b/app/jobs/scheduled/ensure_s3_uploads_existence.rb
index ccde386..6e2d671 100644
--- a/app/jobs/scheduled/ensure_s3_uploads_existence.rb
+++ b/app/jobs/scheduled/ensure_s3_uploads_existence.rb
@@ -5,9 +5,38 @@ module Jobs
   class EnsureS3UploadsExistence < ::Jobs::Scheduled
     every 1.day
 
+    def perform(*args)
+      super
+    ensure
+      if @db_inventories
+        @db_inventories.values.each { |f| f.close; f.unlink }
+      end
+    end
+
+    def prepare_for_all_sites
+      inventory = S3Inventory.new(s3_helper, :upload)
+      @db_inventories = inventory.prepare_for_all_sites
+      @inventory_date = inventory.inventory_date
+    end
+
     def execute(args)
       return unless SiteSetting.enable_s3_inventory
-      Discourse.store.list_missing_uploads(skip_optimized: true)
+      require 's3_inventory'
+
+      if !@db_inventories && Rails.configuration.multisite && GlobalSetting.use_s3?
+        prepare_for_all_sites
+      end
+
+      if @db_inventories && preloaded_inventory_file = @db_inventories[RailsMultisite::ConnectionManagement.current_db]
+        S3Inventory.new(
+          s3_helper,
+          :upload,
+          preloaded_inventory_file: preloaded_inventory_file,
+          preloaded_inventory_date: @inventory_date
+        ).backfill_etags_and_list_missing
+      else
+        S3Inventory.new(s3_helper, :upload).backfill_etags_and_list_missing
+      end
     end
   end
 end
diff --git a/lib/s3_inventory.rb b/lib/s3_inventory.rb
index c448d81..1869f0e 100644
--- a/lib/s3_inventory.rb
+++ b/lib/s3_inventory.rb
@@ -12,9 +12,15 @@ class S3Inventory
   INVENTORY_PREFIX ||= "inventory"
   INVENTORY_VERSION ||= "1"
 
-  def initialize(s3_helper, type)
+  def initialize(s3_helper, type, preloaded_inventory_file: nil, preloaded_inventory_date: nil)
     @s3_helper = s3_helper
 
+    if preloaded_inventory_file && preloaded_inventory_date
+      # Data preloaded, so we don't need to fetch it again
+      @preloaded_inventory_file = preloaded_inventory_file
+      @inventory_date = preloaded_inventory_date
+    end
+
     if type == :upload
       @type = "original"
       @model = Upload
@@ -25,32 +31,25 @@ class S3Inventory
   end
 
   def backfill_etags_and_list_missing
-    if files.blank?
+    if !@preloaded_inventory_file && files.blank?
       error("Failed to list inventory from S3")
       return
     end
 
     DistributedMutex.synchronize("s3_inventory_list_missing_#{type}", validity: 30.minutes) do
       begin
-        files.each do |file|
-          next if File.exists?(file[:filename][0...-3])
-
-          download_inventory_file_to_tmp_directory(file)
-          decompress_inventory_file(file)
-        end
+        download_and_decompress_files if !@preloaded_inventory_file
 
         multisite_prefix = Discourse.store.upload_path
         ActiveRecord::Base.transaction do
           begin
             connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
             connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
-              files.each do |file|
-                CSV.foreach(file[:filename][0...-3], headers: false) do |row|
-                  key = row[CSV_KEY_INDEX]
-                  next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
-                  url = File.join(Discourse.store.absolute_base_url, key)
-                  connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
-                end
+              for_each_inventory_row do |row|
+                key = row[CSV_KEY_INDEX]
+                next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
+                url = File.join(Discourse.store.absolute_base_url, key)
+                connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
               end
             end
 
@@ -87,6 +86,16 @@ class S3Inventory
     end
   end
 
+  def for_each_inventory_row
+    if @preloaded_inventory_file
+      CSV.foreach(@preloaded_inventory_file) { |row| yield(row) }
+    else
+      files.each do |file|
+        CSV.foreach(file[:filename][0...-3]) { |row| yield(row) }
+      end
+    end
+  end
+
   def download_inventory_file_to_tmp_directory(file)
     return if File.exists?(file[:filename])
 
@@ -136,9 +145,36 @@ class S3Inventory
     )
   end
 
+  def prepare_for_all_sites
+    db_names = RailsMultisite::ConnectionManagement.all_dbs
+    db_files = {}
+
+    db_names.each do |db|
+      db_files[db] = Tempfile.new("#{db}-inventory.csv")
+    end
+
+    download_and_decompress_files
+    for_each_inventory_row do |row|
+      key = row[CSV_KEY_INDEX]
+      row_db = key.match(/uploads\/([^\/]+)\//)&.[](1)
+      if row_db && file = db_files[row_db]
+        file.write(row.to_csv)
+      end
+    end
+
+    db_names.each do |db|
+      db_files[db].rewind
+    end
+
+    db_files
+  ensure
+    cleanup!
+  end
+
   private
 
   def cleanup!
+    return if @preloaded_inventory_file
     files.each do |file|
       File.delete(file[:filename]) if File.exists?(file[:filename])
       File.delete(file[:filename][0...-3]) if File.exists?(file[:filename][0...-3])
@@ -154,6 +190,7 @@ class S3Inventory
   end
 
   def files
+    return if @preloaded_inventory_file
     @files ||= begin
       symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
       return [] if symlink_file.blank?
@@ -171,6 +208,15 @@ class S3Inventory
     end
   end
 
+  def download_and_decompress_files
+    files.each do |file|
+      next if File.exists?(file[:filename][0...-3])
+
+      download_inventory_file_to_tmp_directory(file)
+      decompress_inventory_file(file)
+    end
+  end
+
   def tmp_directory
     @tmp_directory ||= begin
       current_db = RailsMultisite::ConnectionManagement.current_db
diff --git a/spec/components/s3_inventory_spec.rb b/spec/components/s3_inventory_spec.rb
index 4e70329..eb1f2c9 100644
--- a/spec/components/s3_inventory_spec.rb
+++ b/spec/components/s3_inventory_spec.rb
@@ -62,6 +62,7 @@ describe "S3Inventory" do
     freeze_time
 
     CSV.foreach(csv_filename, headers: false) do |row|
+      next unless row[S3Inventory::CSV_KEY_INDEX].include?("default")
       Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], updated_at: 2.days.ago)
     end
 
@@ -82,8 +83,8 @@ describe "S3Inventory" do
 
   it "should backfill etags to uploads table correctly" do
     files = [
-      ["#{Discourse.store.absolute_base_url}/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg", "defcaac0b4aca535c284e95f30d608d0"],
-      ["#{Discourse.store.absolute_base_url}/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png", "25c02eaceef4cb779fc17030d33f7f06"]
+      ["#{Discourse.store.absolute_base_url}/uploads/default/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg", "defcaac0b4aca535c284e95f30d608d0"],
+      ["#{Discourse.store.absolute_base_url}/uploads/default/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png", "25c02eaceef4cb779fc17030d33f7f06"]
     ]
     files.each { |file| Fabricate(:upload, url: file[0]) }
 
@@ -95,4 +96,45 @@ describe "S3Inventory" do
 
     expect(Upload.by_users.order(:url).pluck(:url, :etag)).to eq(files)
   end
+
+  it "should work when passed preloaded data" do
+    freeze_time
+
+    CSV.foreach(csv_filename, headers: false) do |row|
+      next unless row[S3Inventory::CSV_KEY_INDEX].include?("default")
+      Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], updated_at: 2.days.ago)
+    end
+
+    upload = Fabricate(:upload, etag: "ETag", updated_at: 1.days.ago)
+    Fabricate(:upload, etag: "ETag2", updated_at: Time.now)
+    no_etag = Fabricate(:upload, updated_at: 2.days.ago)
+
+    output = capture_stdout do
+      File.open(csv_filename) do |f|
+        preloaded_inventory = S3Inventory.new(
+          helper,
+          :upload,
+          preloaded_inventory_file: f,

[... diff too long, it was truncated ...]

GitHub sha: 16c65a94

1 Like

@davidtaylorhq Nothing major but I thought it’ll be nice if the commit message provided a summary of what changed to make this faster and by how much.

2 Likes

@tgxworld you’re right, I should have been more detailed there.

In case a comment is useful for future travellers, the commit message should have looked something like:

In a multisite cluster where the S3 config is shared across all sites, the inventory is also shared. Previously, we were downloading, decompressing and reading that inventory once for every site. On one of CDCK’s large clusters, these inventory files easily exceed 100mb and more than 1 million rows.

Since the files are the same for every site, we can save a significant amount of work by fetching/decompressing/reading the large inventory file once, and then creating much smaller per-site inventory files for the job to use.

For a typical CDCK cluster (~400 sites, > 1 million total uploads) this commit reduced the job time from 3 hours to 5 minutes.

1 Like