FIX: S3 inventory data can be splitted into multiple csv files

FIX: S3 inventory data can be splitted into multiple csv files

diff --git a/lib/s3_inventory.rb b/lib/s3_inventory.rb
index 763f1d1..fab1756 100644
--- a/lib/s3_inventory.rb
+++ b/lib/s3_inventory.rb
@@ -5,7 +5,7 @@ require "csv"
 
 class S3Inventory
 
-  attr_reader :inventory_id, :csv_filename, :model
+  attr_reader :inventory_id, :model, :last_modified
 
   CSV_KEY_INDEX ||= 1
   CSV_ETAG_INDEX ||= 2
@@ -24,38 +24,29 @@ class S3Inventory
     end
   end
 
-  def file
-    @file ||= unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
-  end
-
   def list_missing
-    if file.blank?
+    if files.blank?
       error("Failed to list inventory from S3")
       return
     end
 
     DistributedMutex.synchronize("s3_inventory_list_missing_#{inventory_id}") do
-      current_db = RailsMultisite::ConnectionManagement.current_db
-      timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
-      @tmp_directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp)
-      @archive_filename = File.join(@tmp_directory, File.basename(file.key))
-      @csv_filename = @archive_filename[0...-3]
-
-      FileUtils.mkdir_p(@tmp_directory)
-      download_inventory_file_to_tmp_directory
-      decompress_inventory_file
+      download_inventory_files_to_tmp_directory
+      decompress_inventory_files
 
       begin
         table_name = "#{inventory_id}_inventory"
         connection = ActiveRecord::Base.connection.raw_connection
         connection.exec("CREATE TEMP TABLE #{table_name}(key text UNIQUE, etag text PRIMARY KEY)")
         connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
-          CSV.foreach(csv_filename, headers: false) do |row|
-            connection.put_copy_data("#{row[CSV_KEY_INDEX]},#{row[CSV_ETAG_INDEX]}\n")
+          files.each do |file|
+            CSV.foreach(file[:filename][0...-3], headers: false) do |row|
+              connection.put_copy_data("#{row[CSV_KEY_INDEX]},#{row[CSV_ETAG_INDEX]}\n")
+            end
           end
         end
 
-        uploads = model.where("created_at < ?", file.last_modified)
+        uploads = model.where("created_at < ?", last_modified)
         missing_uploads = uploads.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag").where("#{table_name}.etag is NULL")
 
         if (missing_count = missing_uploads.count) > 0
@@ -71,18 +62,21 @@ class S3Inventory
     end
   end
 
-  def download_inventory_file_to_tmp_directory
-    log "Downloading inventory file to tmp directory..."
-    failure_message = "Failed to inventory file to tmp directory."
+  def download_inventory_files_to_tmp_directory
+    files.each do |file|
+      log "Downloading inventory file '#{file[:key]}' to tmp directory..."
+      failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
 
-    @s3_helper.download_file(file.key, @archive_filename, failure_message)
+      @s3_helper.download_file(file[:key], file[:filename], failure_message)
+    end
   end
 
-  def decompress_inventory_file
-    log "Decompressing inventory file, this may take a while..."
-
-    FileUtils.cd(@tmp_directory) do
-      Discourse::Utils.execute_command('gzip', '--decompress', @archive_filename, failure_message: "Failed to decompress inventory file.")
+  def decompress_inventory_files
+    FileUtils.cd(tmp_directory) do
+      files.each do |file|
+        log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
+        Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.")
+      end
     end
   end
 
@@ -123,6 +117,34 @@ class S3Inventory
 
   private
 
+  def files
+    @files ||= begin
+      symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
+      return [] if symlink_file.blank?
+
+      @last_modified = symlink_file.last_modified
+      log "Downloading symlink file to tmp directory..."
+      failure_message = "Failed to download symlink file to tmp directory."
+      filename = File.join(tmp_directory, File.basename(symlink_file.key))
+
+      @s3_helper.download_file(symlink_file.key, filename, failure_message)
+      File.readlines(filename).map do |key|
+        key = key.sub("s3://#{bucket_name}/").sub("\n", "")
+        { key: key, filename: File.join(tmp_directory, File.basename(key)) }
+      end
+    end
+  end
+
+  def tmp_directory
+    @tmp_directory ||= begin
+      current_db = RailsMultisite::ConnectionManagement.current_db
+      timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
+      directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp)
+      FileUtils.mkdir_p(directory)
+      directory
+    end
+  end
+
   def inventory_configuration
     filter_prefix = inventory_id
     filter_prefix = File.join(bucket_folder_path, filter_prefix) if bucket_folder_path.present?
@@ -159,8 +181,9 @@ class S3Inventory
   def unsorted_files
     objects = []
 
-    @s3_helper.list(inventory_data_path).each do |obj|
-      if obj.key.match?(/\.csv\.gz$/i)
+    hive_path = File.join(inventory_path, bucket_name, inventory_id, "hive")
+    @s3_helper.list(hive_path).each do |obj|
+      if obj.key.match?(/symlink\.txt$/i)
         objects << obj
       end
     end
@@ -170,10 +193,6 @@ class S3Inventory
     log("Failed to list inventory from S3", e)
   end
 
-  def inventory_data_path
-    File.join(inventory_path, bucket_name, inventory_id, "data")
-  end
-
   def inventory_path_arn
     File.join(bucket_arn, inventory_path)
   end
diff --git a/spec/components/s3_inventory_spec.rb b/spec/components/s3_inventory_spec.rb
index fdb4a79..1156d5d 100644
--- a/spec/components/s3_inventory_spec.rb
+++ b/spec/components/s3_inventory_spec.rb
@@ -17,8 +17,7 @@ describe "S3Inventory" do
     SiteSetting.enable_s3_inventory = true
 
     client.stub_responses(:list_objects, -> (context) {
-      inventory_data_path = "#{S3Inventory::INVENTORY_PREFIX}/#{S3Inventory::INVENTORY_VERSION}/bucket/original/data"
-      expect(context.params[:prefix]).to eq(inventory_data_path)
+      expect(context.params[:prefix]).to eq("#{S3Inventory::INVENTORY_PREFIX}/#{S3Inventory::INVENTORY_VERSION}/bucket/original/hive")
 
       {
         contents: [
@@ -50,10 +49,6 @@ describe "S3Inventory" do
     })
   end
 
-  it "should return the latest inventory file name" do
-    expect(inventory.file.key).to eq("example1.csv.gz")
-  end
-
   it "should raise error if an inventory file is not found" do
     client.stub_responses(:list_objects, contents: [])
     output = capture_stdout { inventory.list_missing }
@@ -69,14 +64,14 @@ describe "S3Inventory" do
     upload = Fabricate(:upload, etag: "ETag", created_at: 1.days.ago)
     Fabricate(:upload, etag: "ETag2", created_at: Time.now)
 
-    inventory.expects(:decompress_inventory_file)
-    inventory.expects(:csv_filename).returns(csv_filename)
-    inventory.file.expects(:last_modified).returns(Time.now)
+    inventory.expects(:decompress_inventory_files)
+    inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).at_least(1)
+    inventory.expects(:last_modified).returns(Time.now)
 
     output = capture_stdout do
       inventory.list_missing
     end
 
-    expect(output).to eq("Downloading inventory file to tmp directory...\n#{upload.url}\n1 of 4 uploads are missing\n")
+    expect(output).to eq("Downloading inventory file 'Key' to tmp directory...\n#{upload.url}\n1 of 4 uploads are missing\n")
   end
 end

GitHub sha: 1045bbc3

1 Like