FIX: Add multisite support to Sidekiq::Pausable. (#6960)

FIX: Add multisite support to Sidekiq::Pausable. (#6960)

Having a global Sidekiq pause switch is problematic because a site in the cluster can pause Sidekiq for the entire cluster.

diff --git a/lib/sidekiq/pausable.rb b/lib/sidekiq/pausable.rb
index 49ca1eb..16d4a71 100644
--- a/lib/sidekiq/pausable.rb
+++ b/lib/sidekiq/pausable.rb
@@ -1,15 +1,19 @@
 require 'thread'
 
 class SidekiqPauser
+  TTL = 60
+  PAUSED_KEY = "sidekiq_is_paused_v2"
+
   def initialize
     @mutex = Mutex.new
+    @dbs ||= Set.new
   end
 
   def pause!
-    redis.setex paused_key, 60, "paused"
+    $redis.setex PAUSED_KEY, TTL, "paused"
 
     @mutex.synchronize do
-      @extend_lease_thread ||= extend_lease_thread
+      extend_lease_thread
       sleep 0.001 while !paused?
     end
 
@@ -17,38 +21,38 @@ class SidekiqPauser
   end
 
   def paused?
-    !!redis.get(paused_key)
+    !!$redis.get(PAUSED_KEY)
   end
 
   def unpause!
     @mutex.synchronize do
-      @extend_lease_thread = nil
+      @dbs.delete(RailsMultisite::ConnectionManagement.current_db)
+      @extend_lease_thread = nil if @dbs.size == 0
     end
 
-    redis.del(paused_key)
+    $redis.del(PAUSED_KEY)
     true
   end
 
   private
 
   def extend_lease_thread
-    Thread.new do
+    @dbs << RailsMultisite::ConnectionManagement.current_db
+
+    @extend_lease_thread ||= Thread.new do
       while true do
         break unless @mutex.synchronize { @extend_lease_thread }
-        redis.expire paused_key, 60
-        sleep(Rails.env.test? ? 0.01 : 30)
-      end
-    end
-  end
 
-  def redis
-    $redis.without_namespace
-  end
+        @dbs.each do |db|
+          RailsMultisite::ConnectionManagement.with_connection(db) do
+            $redis.expire PAUSED_KEY, TTL
+          end
+        end
 
-  def paused_key
-    "sidekiq_is_paused_v2"
+        sleep(Rails.env.test? ? 0.01 : TTL / 2)
+      end
+    end
   end
-
 end
 
 module Sidekiq
@@ -74,7 +78,7 @@ class Sidekiq::Pausable
   end
 
   def call(worker, msg, queue)
-    if Sidekiq.paused? && !(Jobs::RunHeartbeat === worker)
+    if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker)
       worker.class.perform_in(@delay, *msg['args'])
     else
       start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@@ -85,4 +89,14 @@ class Sidekiq::Pausable
     end
   end
 
+  private
+
+  def sidekiq_paused?(msg)
+    if site_id = msg["args"]&.first&.dig("current_site_id")
+      RailsMultisite::ConnectionManagement.with_connection(site_id) do
+        Sidekiq.paused?
+      end
+    end
+  end
+
 end
diff --git a/spec/components/sidekiq/pausable_spec.rb b/spec/components/sidekiq/pausable_spec.rb
deleted file mode 100644
index 7a9f86c..0000000
--- a/spec/components/sidekiq/pausable_spec.rb
+++ /dev/null
@@ -1,36 +0,0 @@
-require 'rails_helper'
-require_dependency 'sidekiq/pausable'
-
-describe Sidekiq do
-  after do
-    Sidekiq.unpause!
-  end
-
-  it "can pause and unpause" do
-    Sidekiq.pause!
-    expect(Sidekiq.paused?).to eq(true)
-    Sidekiq.unpause!
-    expect(Sidekiq.paused?).to eq(false)
-  end
-
-  it "can still run heartbeats when paused" do
-    Sidekiq.pause!
-
-    freeze_time 1.week.from_now
-
-    jobs = Sidekiq::ScheduledSet.new
-
-    Sidekiq::Testing.disable! do
-      jobs.clear
-
-      middleware = Sidekiq::Pausable.new
-      middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do
-        "done"
-      end
-
-      jobs = Sidekiq::ScheduledSet.new
-      expect(jobs.size).to eq(0)
-    end
-
-  end
-end
diff --git a/spec/multisite/pausable_spec.rb b/spec/multisite/pausable_spec.rb
new file mode 100644
index 0000000..57297dd
--- /dev/null
+++ b/spec/multisite/pausable_spec.rb
@@ -0,0 +1,89 @@
+require 'rails_helper'
+require_dependency 'sidekiq/pausable'
+
+RSpec.describe "Pausing/Unpausing Sidekiq", type: :multisite do
+  after do
+    $redis.flushall
+  end
+
+  describe '#pause!, #unpause! and #paused?' do
+    it "can pause and unpause" do
+      Sidekiq.pause!
+      expect(Sidekiq.paused?).to eq(true)
+
+      test_multisite_connection('second') do
+        expect(Sidekiq.paused?).to eq(false)
+      end
+
+      Sidekiq.unpause!
+
+      expect(Sidekiq.paused?).to eq(false)
+
+      test_multisite_connection('second') do
+        Sidekiq.pause!
+        expect(Sidekiq.paused?).to eq(true)
+      end
+    end
+  end
+end
+
+RSpec.describe Sidekiq::Pausable do
+  after do
+    $redis.flushall
+  end
+
+  it "can still run heartbeats when paused" do
+    Sidekiq.pause!
+
+    freeze_time 1.week.from_now
+
+    jobs = Sidekiq::ScheduledSet.new
+    jobs.clear
+    middleware = Sidekiq::Pausable.new
+
+    middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do
+      "done"
+    end
+
+    jobs = Sidekiq::ScheduledSet.new
+    expect(jobs.size).to eq(0)
+  end
+
+  describe 'when sidekiq is paused', type: :multisite do
+    let(:middleware) { Sidekiq::Pausable.new }
+
+    def call_middleware(db = RailsMultisite::ConnectionManagement::DEFAULT)
+      middleware.call(Jobs::PostAlert.new, {
+        "args" => [{ "current_site_id" => db }]
+      }, "critical") do
+        yield
+      end
+    end
+
+    it 'should delay the job' do
+      Sidekiq.pause!
+
+      called = false
+      called2 = false
+      call_middleware { called = true }
+
+      expect(called).to eq(false)
+
+      test_multisite_connection('second') do
+        call_middleware('second') { called2 = true }
+        expect(called2).to eq(true)
+      end
+
+      Sidekiq.unpause!
+      call_middleware { called = true }
+
+      expect(called).to eq(true)
+
+      test_multisite_connection('second') do
+        Sidekiq.pause!
+        call_middleware('second') { called2 = false }
+        expect(called2).to eq(true)
+      end
+    end
+  end
+end

GitHub sha: 53d592ad

FEATURE: add APIS for unpausing all sites