FIX: IMAP sync email update uniqueness across groups and minor improvements (#10332)

FIX: IMAP sync email update uniqueness across groups and minor improvements (#10332)

Adds a imap_group_id column to IncomingEmail to deal with an issue where we were trying to update emails in the mailbox, calling IncomingEmail.where(imap_sync: true). However UID and UIDVALIDITY could be the same across accounts. So if group A used IMAP details for Gmail account A, and group B used IMAP details for Gmail account B, and both tried to sync changes to an email with UID of 3 (e.g. changing Labels), one account could affect the other. This even applied to Archiving!

Also in this PR:

  • Fix error occurring if we do a uid_fetch and no emails are returned
  • Allow for creating labels within the target mailbox (previously we would not do this, only use existing labels)
  • Improve consistency for log messages
  • Add specs for generic IMAP provider (Gmail specs still to come)
  • Add custom archiving support for Gmail
  • Only use Message-ID for uniqueness of IncomingEmail if it was generated by us
  • Various refactors and improvements
diff --git a/app/models/incoming_email.rb b/app/models/incoming_email.rb
index e608abb..c18739d 100644
--- a/app/models/incoming_email.rb
+++ b/app/models/incoming_email.rb
@@ -4,6 +4,7 @@ class IncomingEmail < ActiveRecord::Base
   belongs_to :user
   belongs_to :topic
   belongs_to :post
+  belongs_to :group, foreign_key: :imap_group_id, class_name: 'Group'
 
   scope :errored,  -> { where("NOT is_bounce AND error IS NOT NULL") }
 
@@ -52,13 +53,15 @@ end
 #  imap_uid_validity :integer
 #  imap_uid          :integer
 #  imap_sync         :boolean
+#  imap_group_id     :bigint
 #
 # Indexes
 #
-#  index_incoming_emails_on_created_at  (created_at)
-#  index_incoming_emails_on_error       (error)
-#  index_incoming_emails_on_imap_sync   (imap_sync)
-#  index_incoming_emails_on_message_id  (message_id)
-#  index_incoming_emails_on_post_id     (post_id)
-#  index_incoming_emails_on_user_id     (user_id) WHERE (user_id IS NOT NULL)
+#  index_incoming_emails_on_created_at     (created_at)
+#  index_incoming_emails_on_error          (error)
+#  index_incoming_emails_on_imap_group_id  (imap_group_id)
+#  index_incoming_emails_on_imap_sync      (imap_sync)
+#  index_incoming_emails_on_message_id     (message_id)
+#  index_incoming_emails_on_post_id        (post_id)
+#  index_incoming_emails_on_user_id        (user_id) WHERE (user_id IS NOT NULL)
 #
diff --git a/db/migrate/20200728072038_add_imap_group_id_to_incoming_email.rb b/db/migrate/20200728072038_add_imap_group_id_to_incoming_email.rb
new file mode 100644
index 0000000..139a231
--- /dev/null
+++ b/db/migrate/20200728072038_add_imap_group_id_to_incoming_email.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+class AddImapGroupIdToIncomingEmail < ActiveRecord::Migration[6.0]
+  disable_ddl_transaction!
+
+  def up
+    execute <<~SQL
+      ALTER TABLE incoming_emails ADD COLUMN IF NOT EXISTS imap_group_id bigint NULL
+    SQL
+
+    execute <<~SQL
+      CREATE INDEX CONCURRENTLY IF NOT EXISTS
+      index_incoming_emails_on_imap_group_id ON incoming_emails USING btree (imap_group_id)
+    SQL
+  end
+
+  def down
+    execute <<~SQL
+      ALTER TABLE incoming_emails DROP COLUMN IF EXISTS imap_group_id
+    SQL
+  end
+end
diff --git a/lib/demon/email_sync.rb b/lib/demon/email_sync.rb
index 015daba..e4c7f15 100644
--- a/lib/demon/email_sync.rb
+++ b/lib/demon/email_sync.rb
@@ -23,6 +23,7 @@ class Demon::EmailSync < ::Demon::Base
   def start_thread(db, group)
     Thread.new do
       RailsMultisite::ConnectionManagement.with_connection(db) do
+        puts "[EmailSync] Thread started for group #{group.name} (id = #{group.id}) in db #{db}"
         begin
           obj = Imap::Sync.for_group(group)
         rescue Net::IMAP::NoResponseError => e
@@ -36,12 +37,15 @@ class Demon::EmailSync < ::Demon::Base
         idle = false
 
         while @running && group.reload.imap_mailbox_name.present? do
+          puts "[EmailSync] Processing IMAP mailbox for group #{group.name} (id = #{group.id}) in db #{db}"
           status = obj.process(
             idle: obj.can_idle? && status && status[:remaining] == 0,
             old_emails_limit: status && status[:remaining] > 0 ? 0 : nil,
           )
 
           if !obj.can_idle? && status[:remaining] == 0
+            puts "[EmailSync] Going to sleep for group #{group.name} (id = #{group.id}) in db #{db} to wait for new emails."
+
             # Thread goes into sleep for a bit so it is better to return any
             # connection back to the pool.
             ActiveRecord::Base.connection_handler.clear_active_connections!
@@ -74,14 +78,14 @@ class Demon::EmailSync < ::Demon::Base
   end
 
   def after_fork
-    puts "Loading EmailSync in process id #{Process.pid}"
+    puts "[EmailSync] Loading EmailSync in process id #{Process.pid}"
 
     loop do
       break if Discourse.redis.set(HEARTBEAT_KEY, Time.now.to_i, ex: HEARTBEAT_INTERVAL, nx: true)
       sleep HEARTBEAT_INTERVAL
     end
 
-    puts "Starting EmailSync main thread"
+    puts "[EmailSync] Starting EmailSync main thread"
 
     @running = true
     @sync_data = {}
@@ -122,7 +126,7 @@ class Demon::EmailSync < ::Demon::Base
               if !groups[group_id]
                 puts("[EmailSync] Killing thread for group (id = #{group_id}) because mailbox is no longer synced")
               else
-                puts("[EmailSync] Thread for group #{groups[group_id].name} is dead")
+                puts("[EmailSync] Thread for group #{groups[group_id].name} (id = #{group_id}) is dead")
               end
 
               data[:thread].kill
@@ -135,8 +139,10 @@ class Demon::EmailSync < ::Demon::Base
             # Spawn new threads for groups that are now synchronized.
             groups.each do |group_id, group|
               if !@sync_data[db][group_id]
-                puts("[EmailSync] Starting thread for group #{group.name} and mailbox #{group.imap_mailbox_name}")
-                @sync_data[db][group_id] = { thread: start_thread(db, group), obj: nil }
+                puts("[EmailSync] Starting thread for group #{group.name} (id = #{group.id}) and mailbox #{group.imap_mailbox_name}")
+                @sync_data[db][group_id] = {
+                  thread: start_thread(db, group), obj: nil
+                }
               end
             end
           end
diff --git a/lib/email/receiver.rb b/lib/email/receiver.rb
index fb48708..31aec68 100644
--- a/lib/email/receiver.rb
+++ b/lib/email/receiver.rb
@@ -66,11 +66,13 @@ module Email
       id_hash = Digest::SHA1.hexdigest(@message_id)
       DistributedMutex.synchronize("process_email_#{id_hash}") do
         begin
-          @incoming_email = IncomingEmail.find_by(message_id: @message_id)
-          if @incoming_email
-            @incoming_email.update(imap_uid_validity: @opts[:uid_validity], imap_uid: @opts[:uid], imap_sync: false)
-            return
-          end
+
+          # if we find an existing incoming email record with the
+          # exact same message id, be sure to update it with the correct IMAP
+          # metadata based on sync. this is so we do not double-create emails.
+          @incoming_email = find_existing_and_update_imap
+          return if @incoming_email
+
           ensure_valid_address_lists
           ensure_valid_date
           @from_email, @from_display_name = parse_from_field
@@ -89,6 +91,32 @@ module Email
       end
     end
 
+    def find_existing_and_update_imap
+      incoming_email = IncomingEmail.find_by(message_id: @message_id)
+
+      # if we are not doing this for IMAP purposes, then we do not want
+      # to double-process the same Message-ID
+      if @opts[:imap_uid].blank?
+        return incoming_email
+      end
+
+      return if !incoming_email
+
+      # if the message_id matches the post id regexp then we
+      # generated the message_id not the imap server, e.g. in GroupSmtpEmail,
+      # so we want to just update the incoming email. Otherwise the
+      # incoming email is a completely new one from the IMAP server.
+      return if (@message_id =~ message_id_post_id_regexp).nil?
+
+      incoming_email.update(
+        imap_uid_validity: @opts[:imap_uid_validity],
+        imap_uid: @opts[:imap_uid],
+        imap_group_id: @opts[:imap_group_id],
+        imap_sync: false
+      )
+      incoming_email
+    end
+
     def ensure_valid_address_lists
       [:to, :cc, :bcc].each do |field|
         addresses = @mail[field]
@@ -118,8 +146,9 @@ module Email
         from_address: @from_email,
         to_addresses: @mail.to&.map(&:downcase)&.join(";"),
         cc_addresses: @mail.cc&.map(&:downcase)&.join(";"),
-        imap_uid_validity: @opts[:uid_validity],
-        imap_uid: @opts[:uid],
+        imap_uid_validity: @opts[:imap_uid_validity],
+        imap_uid: @opts[:imap_uid],
+        imap_group_id: @opts[:imap_group_id],
         imap_sync: false
       )
     end
@@ -913,12 +942,8 @@ module Email

[... diff too long, it was truncated ...]

GitHub sha: 2920988b

1 Like

This commit appears in #10332 which was approved by eviltrout. It was merged by martin.