FIX: Heartbeat check per sidekiq process (#7873)

FIX: Heartbeat check per sidekiq process (#7873)

  • FIX: Heartbeat check per sidekiq process

  • Rename method

  • Remove heartbeat queues of previous bootups

  • Regis feedback

  • Refactor before_start

  • Update lib/demon/sidekiq.rb

Co-Authored-By: Régis Hanol regis@hanol.fr

  • Update lib/demon/sidekiq.rb

Co-Authored-By: Régis Hanol regis@hanol.fr

  • Expire redis keys after 3600 seconds

  • Don’t use redis to store the list of queues

diff --git a/app/jobs/regular/run_heartbeat.rb b/app/jobs/regular/run_heartbeat.rb
index 364343d..7b6bd65 100644
--- a/app/jobs/regular/run_heartbeat.rb
+++ b/app/jobs/regular/run_heartbeat.rb
@@ -2,19 +2,8 @@
 
 module Jobs
   class RunHeartbeat < Jobs::Base
-
-    sidekiq_options queue: 'critical'
-
-    def self.heartbeat_key
-      'heartbeat_last_run'
-    end
-
     def execute(args)
-      $redis.set(self.class.heartbeat_key, Time.new.to_i.to_s)
-    end
-
-    def self.last_heartbeat
-      $redis.get(heartbeat_key).to_i
+      Demon::Sidekiq.trigger_heartbeat(args[:queue_name])
     end
   end
 end
diff --git a/app/jobs/scheduled/heartbeat.rb b/app/jobs/scheduled/heartbeat.rb
index c1b8a8c..efd8213 100644
--- a/app/jobs/scheduled/heartbeat.rb
+++ b/app/jobs/scheduled/heartbeat.rb
@@ -7,7 +7,9 @@ module Jobs
     every 3.minute
 
     def execute(args)
-      Jobs.enqueue(:run_heartbeat, {})
+      Demon::Sidekiq::QUEUE_IDS.each do |identifier|
+        Jobs.enqueue(:run_heartbeat, queue_name: identifier, queue: identifier)
+      end
     end
   end
 end
diff --git a/config/unicorn.conf.rb b/config/unicorn.conf.rb
index 85b2800..20d9f6d 100644
--- a/config/unicorn.conf.rb
+++ b/config/unicorn.conf.rb
@@ -144,25 +144,32 @@ before_fork do |server, worker|
           @sidekiq_next_heartbeat_check ||= Time.new.to_i + @sidekiq_heartbeat_interval
 
           if @sidekiq_next_heartbeat_check < Time.new.to_i
-
-            last_heartbeat = Jobs::RunHeartbeat.last_heartbeat
-            restart = false
+            @sidekiq_next_heartbeat_check = Time.new.to_i + @sidekiq_heartbeat_interval
+            restarted = false
 
             if out_of_memory?
               Rails.logger.warn("Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" % [(max_rss.to_f / 1.megabyte), ENV["DISCOURSE_HOSTNAME"]])
-              restart = true
+              Demon::Sidekiq.restart
+              restarted = true
             end
 
-            if last_heartbeat < Time.new.to_i - @sidekiq_heartbeat_interval
-              STDERR.puts "Sidekiq heartbeat test failed, restarting"
-              Rails.logger.warn "Sidekiq heartbeat test failed, restarting"
-
-              restart = true
+            if !restarted
+              Demon::Sidekiq::QUEUE_IDS.each do |identifier|
+                last_heartbeat = Demon::Sidekiq.get_queue_last_heartbeat(identifier)
+
+                if last_heartbeat < Time.new.to_i - @sidekiq_heartbeat_interval
+                  if demon = Demon::Sidekiq.demons.values.find { |d| d.identifier == identifier }
+                    STDERR.puts "Sidekiq heartbeat test for worker #{demon.pid} failed, restarting"
+                    Rails.logger.warn "Sidekiq heartbeat test for worker #{demon.pid} failed, restarting"
+                    demon.stop
+                    demon.start
+                    restarted = true
+                  end
+                end
+              end
             end
-            @sidekiq_next_heartbeat_check = Time.new.to_i + @sidekiq_heartbeat_interval
 
-            if restart
-              Demon::Sidekiq.restart
+            if restarted
               sleep 10
               force_kill_rogue_sidekiq
             end
diff --git a/lib/demon/base.rb b/lib/demon/base.rb
index d162611..e118189 100644
--- a/lib/demon/base.rb
+++ b/lib/demon/base.rb
@@ -11,6 +11,7 @@ class Demon::Base
 
   def self.start(count = 1, verbose: false)
     @demons ||= {}
+    before_start(count)
     count.times do |i|
       (@demons["#{prefix}_#{i}"] ||= new(i, verbose: verbose)).start
     end
@@ -37,7 +38,10 @@ class Demon::Base
     end
   end
 
-  attr_reader :pid, :parent_pid, :started, :index
+  def self.before_start(count)
+  end
+
+  attr_reader :pid, :parent_pid, :started, :index, :identifier
   attr_accessor :stop_timeout
 
   def initialize(index, rails_root: nil, parent_pid: nil, verbose: false)
diff --git a/lib/demon/sidekiq.rb b/lib/demon/sidekiq.rb
index d6da3b5..11fdf56 100644
--- a/lib/demon/sidekiq.rb
+++ b/lib/demon/sidekiq.rb
@@ -3,6 +3,38 @@
 require "demon/base"
 
 class Demon::Sidekiq < Demon::Base
+  RANDOM_HEX = SecureRandom.hex
+  QUEUE_IDS = []
+
+  def self.queues_last_heartbeat_hash_key
+    @@queues_last_heartbeat_hash_key ||= "#{RANDOM_HEX}_queues_last_heartbeat_hash"
+  end
+
+  def self.trigger_heartbeat(name)
+    $redis.hset(queues_last_heartbeat_hash_key, name, Time.new.to_i.to_s)
+    extend_expiry(queues_last_heartbeat_hash_key)
+  end
+
+  def self.get_queue_last_heartbeat(name)
+    extend_expiry(queues_last_heartbeat_hash_key)
+    $redis.hget(queues_last_heartbeat_hash_key, name).to_i
+  end
+
+  def self.clear_heartbeat_queues!
+    $redis.del(queues_last_heartbeat_hash_key)
+  end
+
+  def self.before_start(count)
+    # cleans up heartbeat queues from previous boot up
+    Sidekiq::Queue.all.each { |queue| queue.clear if queue.name[/^\h{32}$/] }
+    count.times do
+      QUEUE_IDS << SecureRandom.hex
+    end
+  end
+
+  def self.extend_expiry(key)
+    $redis.expire(key, 60 * 60)
+  end
 
   def self.prefix
     "sidekiq"
@@ -12,6 +44,11 @@ class Demon::Sidekiq < Demon::Base
     blk ? (@blk = blk) : @blk
   end
 
+  def run
+    @identifier = QUEUE_IDS[@index]
+    super
+  end
+
   private
 
   def suppress_stdout
@@ -36,7 +73,7 @@ class Demon::Sidekiq < Demon::Base
 
     options = ["-c", GlobalSetting.sidekiq_workers.to_s]
 
-    [['critical', 8], ['default', 4], ['low', 2], ['ultra_low', 1]].each do |queue_name, weight|
+    [['critical', 8], [@identifier, 8], ['default', 4], ['low', 2], ['ultra_low', 1]].each do |queue_name, weight|
       custom_queue_hostname = ENV["UNICORN_SIDEKIQ_#{queue_name.upcase}_QUEUE_HOSTNAME"]
 
       if !custom_queue_hostname || custom_queue_hostname.split(',').include?(`hostname`.strip)
diff --git a/spec/jobs/heartbeat_spec.rb b/spec/jobs/heartbeat_spec.rb
index 315fe41..c132e62 100644
--- a/spec/jobs/heartbeat_spec.rb
+++ b/spec/jobs/heartbeat_spec.rb
@@ -2,6 +2,7 @@
 
 require 'rails_helper'
 require_dependency 'jobs/base'
+require_dependency 'demon/sidekiq'
 
 describe Jobs::Heartbeat do
   after do
@@ -10,12 +11,14 @@ describe Jobs::Heartbeat do
 
   it "still enqueues heartbeats in readonly mode" do
     freeze_time 1.week.from_now
+    Demon::Sidekiq.clear_heartbeat_queues!
+    Jobs.run_immediately!
 
     Discourse.enable_readonly_mode
 
-    Sidekiq::Testing.inline! do
-      Jobs::Heartbeat.new.perform(nil)
-      expect(Jobs::RunHeartbeat.last_heartbeat).to eq(Time.new.to_i)
-    end
+    queue = SecureRandom.hex
+    Demon::Sidekiq::QUEUE_IDS << queue
+    Jobs::Heartbeat.new.perform(nil)
+    expect(Demon::Sidekiq.get_queue_last_heartbeat(queue)).to eq(Time.new.to_i)
   end
 end

GitHub sha: 340855da

2 Likes

Revert "FIX: Heartbeat check per sidekiq process (#7873)"