FIX: Prevent race condition when updating topic custom fields

FIX: Prevent race condition when updating topic custom fields
From 17386fd658ffc53d1ad61d6483b35517f0c6cd2a Mon Sep 17 00:00:00 2001
From: David Taylor <david@taylorhq.com>
Date: Wed, 24 Oct 2018 00:26:56 +0100
Subject: [PATCH] FIX: Prevent race condition when updating topic custom fields


diff --git a/app/jobs/regular/process_grouped_alerts.rb b/app/jobs/regular/process_grouped_alerts.rb
index ba9c63c..3e0a8fe 100644
--- a/app/jobs/regular/process_grouped_alerts.rb
+++ b/app/jobs/regular/process_grouped_alerts.rb
@@ -42,47 +42,49 @@ module Jobs
 
     def mark_stale_alerts(receiver, data, graph_url)
       Topic.firing_alerts.each do |topic|
-        alerts = topic.custom_fields.dig(alert_history_key, 'alerts')
-        updated = false
-
-        alerts&.each do |alert|
-          if alert['graph_url'].include?(graph_url) && is_firing?(alert['status'])
-            is_stale = !current_alerts(data).any? do |current_alert|
-              current_alert['labels']['id'] == alert['id'] &&
-                DateTime.parse(current_alert['startsAt']).to_s == DateTime.parse(alert['starts_at']).to_s
-            end
-
-            if is_stale &&
-               STALE_DURATION.minute.since > DateTime.parse(alert["starts_at"])
-
-              alert["status"] = "stale"
-              updated = true
+        DistributedMutex.synchronize("prom_alert_receiver_topic_#{topic.id}") do
+          alerts = topic.custom_fields.dig(alert_history_key, 'alerts')
+          updated = false
+
+          alerts&.each do |alert|
+            if alert['graph_url'].include?(graph_url) && is_firing?(alert['status'])
+              is_stale = !current_alerts(data).any? do |current_alert|
+                current_alert['labels']['id'] == alert['id'] &&
+                  DateTime.parse(current_alert['startsAt']).to_s == DateTime.parse(alert['starts_at']).to_s
+              end
+
+              if is_stale &&
+                STALE_DURATION.minute.since > DateTime.parse(alert["starts_at"])
+
+                alert["status"] = "stale"
+                updated = true
+              end
             end
           end
-        end
 
-        if updated
-          topic.save_custom_fields(true)
-          klass = DiscoursePrometheusAlertReceiver
+          if updated
+            topic.save_custom_fields(true)
+            klass = DiscoursePrometheusAlertReceiver
 
-          raw = first_post_body(
-            receiver: receiver,
-            topic_body: topic.custom_fields[klass::TOPIC_BODY_CUSTOM_FIELD] || '',
-            alert_history: alerts,
-            prev_topic_id: topic.custom_fields[klass::PREVIOUS_TOPIC_CUSTOM_FIELD]
-          )
+            raw = first_post_body(
+              receiver: receiver,
+              topic_body: topic.custom_fields[klass::TOPIC_BODY_CUSTOM_FIELD] || '',
+              alert_history: alerts,
+              prev_topic_id: topic.custom_fields[klass::PREVIOUS_TOPIC_CUSTOM_FIELD]
+            )
 
-          title = topic.custom_fields[klass::TOPIC_TITLE_CUSTOM_FIELD] || ''
+            title = topic.custom_fields[klass::TOPIC_TITLE_CUSTOM_FIELD] || ''
 
-          revise_topic(
-            topic: topic,
-            title: title,
-            raw: raw,
-            datacenters: datacenters(alerts),
-            firing: alerts.any? { |alert| is_firing?(alert["status"]) }
-          )
+            revise_topic(
+              topic: topic,
+              title: title,
+              raw: raw,
+              datacenters: datacenters(alerts),
+              firing: alerts.any? { |alert| is_firing?(alert["status"]) }
+            )
 
-          publish_alert_counts
+            publish_alert_counts
+          end
         end
       end
     end
@@ -92,37 +94,39 @@ module Jobs
         topic = Topic.find_by(id: receiver[:topic_map][group["labels"]["alertname"]])
 
         if topic
-          group["blocks"].each do |block|
-            active_alerts = block["alerts"]
-            annotations = active_alerts.first["annotations"]
-
-            stored_alerts = begin
-              topic.custom_fields[alert_history_key]&.dig('alerts') || []
-            end
+          DistributedMutex.synchronize("prom_alert_receiver_topic_#{topic.id}") do
+            group["blocks"].each do |block|
+              active_alerts = block["alerts"]
+              annotations = active_alerts.first["annotations"]
 
-            silenced = silence_alerts(stored_alerts, active_alerts,
-              datacenter: group["labels"]["datacenter"]
-            )
-
-            if silenced
-              topic.save_custom_fields(true)
-
-              raw = first_post_body(
-                receiver: receiver,
-                topic_body: annotations["topic_body"],
-                alert_history: stored_alerts,
-                prev_topic_id: topic.custom_fields[::DiscoursePrometheusAlertReceiver::PREVIOUS_TOPIC_CUSTOM_FIELD]
-              )
+              stored_alerts = begin
+                topic.custom_fields[alert_history_key]&.dig('alerts') || []
+              end
 
-              revise_topic(
-                topic: topic,
-                title: annotations["topic_title"],
-                raw: raw,
-                datacenters: datacenters(stored_alerts),
-                firing: stored_alerts.any? { |alert| is_firing?(alert["status"]) }
+              silenced = silence_alerts(stored_alerts, active_alerts,
+                datacenter: group["labels"]["datacenter"]
               )
 
-              publish_alert_counts
+              if silenced
+                topic.save_custom_fields(true)
+
+                raw = first_post_body(
+                  receiver: receiver,
+                  topic_body: annotations["topic_body"],
+                  alert_history: stored_alerts,
+                  prev_topic_id: topic.custom_fields[::DiscoursePrometheusAlertReceiver::PREVIOUS_TOPIC_CUSTOM_FIELD]
+                )
+
+                revise_topic(
+                  topic: topic,
+                  title: annotations["topic_title"],
+                  raw: raw,
+                  datacenters: datacenters(stored_alerts),
+                  firing: stored_alerts.any? { |alert| is_firing?(alert["status"]) }
+                )
+
+                publish_alert_counts
+              end
             end
           end
         end

GitHub