DEV: Run jobs sequentially in test mode (#9897)

DEV: Run jobs sequentially in test mode (#9897)

When running jobs in tests, we use Jobs.run_immediately!. This means that jobs are run synchronously when they are enqueued. Jobs sometimes enqueue other jobs, which are also executed synchronously. This means that the outermost job will block until the inner jobs have finished executing. In some cases (e.g. process_post with hotlinked images) this can lead to a deadlock.

This commit changes the behavior slightly. Now we will never run jobs inside other jobs. Instead, we will queue them up and run them sequentially in the order they were enqueued. As a whole, they are still executed synchronously. Consider the example

class Jobs::InnerJob < Jobs::Base
  def execute(args)
    puts "Running inner job"
  end
end

class Jobs::OuterJob < Jobs::Base
  def execute(args)
    puts "Starting outer job"
    Jobs.enqueue(:inner_job)
    puts "Finished outer job"
  end
end

Jobs.enqueue(:outer_job)
puts "All jobs complete"

The old behavior would result in:

Starting outer job
Running inner job
Finished outer job
All jobs complete

The new behavior will result in:

Starting outer job
Finished outer job
Running inner job
All jobs complete
diff --git a/app/jobs/base.rb b/app/jobs/base.rb
index b4e1f2e..da1623f 100644
--- a/app/jobs/base.rb
+++ b/app/jobs/base.rb
@@ -281,8 +281,12 @@ module Jobs
     end
   end
 
-  def self.enqueue(job_name, opts = {})
-    klass = "::Jobs::#{job_name.to_s.camelcase}".constantize
+  def self.enqueue(job, opts = {})
+    if job.instance_of?(Class)
+      klass = job
+    else
+      klass = "::Jobs::#{job.to_s.camelcase}".constantize
+    end
 
     # Unless we want to work on all sites
     unless opts.delete(:all_sites)
@@ -319,7 +323,30 @@ module Jobs
           klass.new.perform(opts)
         end
       else
-        klass.new.perform(opts)
+        # Run the job synchronously
+        # But never run a job inside another job
+        # That could cause deadlocks during test runs
+        queue = Thread.current[:discourse_nested_job_queue]
+        outermost_job = !queue
+
+        if outermost_job
+          queue = Queue.new
+          Thread.current[:discourse_nested_job_queue] = queue
+        end
+
+        queue.push([klass, opts])
+
+        if outermost_job
+          # responsible for executing the queue
+          begin
+            until queue.empty?
+              queued_klass, queued_opts = queue.pop(true)
+              queued_klass.new.perform(queued_opts)
+            end
+          ensure
+            Thread.current[:discourse_nested_job_queue] = nil
+          end
+        end
       end
     end
 
diff --git a/spec/jobs/jobs_base_spec.rb b/spec/jobs/jobs_base_spec.rb
index 473710b..0779666 100644
--- a/spec/jobs/jobs_base_spec.rb
+++ b/spec/jobs/jobs_base_spec.rb
@@ -47,4 +47,41 @@ describe ::Jobs::Base do
     ::Jobs::Base.new.perform('hello' => 'world', 'sync_exec' => true)
   end
 
+  context "with fake jobs" do
+    let(:common_state) { [] }
+
+    let(:test_job_1) {
+      Class.new(Jobs::Base).tap do |klass|
+        state = common_state
+        klass.define_method(:execute) do |args|
+          state << "job_1_executed"
+        end
+      end
+    }
+
+    let(:test_job_2) {
+      Class.new(Jobs::Base).tap do |klass|
+        state = common_state
+        job_1 = test_job_1
+        klass.define_method(:execute) do |args|
+          state << "job_2_started"
+          Jobs.enqueue(job_1)
+          state << "job_2_finished"
+        end
+      end
+    }
+
+    it "runs jobs synchronously sequentially in tests" do
+      Jobs.run_immediately!
+      Jobs.enqueue(test_job_2)
+
+      expect(common_state).to eq([
+        "job_2_started",
+        "job_2_finished",
+        "job_1_executed"
+      ])
+    end
+
+  end
+
 end

GitHub sha: 8a3d9d70

2 Likes

This commit appears in #9897 which was approved by eviltrout. It was merged by davidtaylorhq.

I digg that commit message :clap:

2 Likes

:heart: just trying to protect future-me from confusing myself

2 Likes